Changeset - eea19c23b741
[Not reviewed]
default
0 15 0
Søren Løvborg - 9 years ago 2017-02-23 18:00:56
sorenl@unity3d.com
cleanup: refer less to User.DEFAULT_USER

Down the road we might want to identify the default user in another
way than by username.
15 files changed with 23 insertions and 22 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/admin/my_account.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.admin.my_account
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
my account controller for Kallithea admin
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: August 20, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 
import formencode
 

	
 
from sqlalchemy import func
 
from formencode import htmlfill
 
from pylons import request, tmpl_context as c
 
from pylons.i18n.translation import _
 
from webob.exc import HTTPFound
 

	
 
from kallithea.config.routing import url
 
from kallithea.lib import helpers as h
 
from kallithea.lib import auth_modules
 
from kallithea.lib.auth import LoginRequired, NotAnonymous, AuthUser
 
from kallithea.lib.base import BaseController, render
 
from kallithea.lib.utils2 import generate_api_key, safe_int
 
from kallithea.lib.compat import json
 
from kallithea.model.db import Repository, UserEmailMap, User, UserFollowing
 
from kallithea.model.forms import UserForm, PasswordChangeForm
 
from kallithea.model.user import UserModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.api_key import ApiKeyModel
 
from kallithea.model.meta import Session
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class MyAccountController(BaseController):
 
    """REST Controller styled on the Atom Publishing Protocol"""
 
    # To properly map this controller, ensure your config/routing.py
 
    # file has a resource setup:
 
    #     map.resource('setting', 'settings', controller='admin/settings',
 
    #         path_prefix='/admin', name_prefix='admin_')
 

	
 
    @LoginRequired()
 
    @NotAnonymous()
 
    def __before__(self):
 
        super(MyAccountController, self).__before__()
 

	
 
    def __load_data(self):
 
        c.user = User.get(request.authuser.user_id)
 
        if c.user.username == User.DEFAULT_USER:
 
        if c.user.is_default_user:
 
            h.flash(_("You can't edit this user since it's"
 
                      " crucial for entire application"), category='warning')
 
            raise HTTPFound(location=url('users'))
 

	
 
    def _load_my_repos_data(self, watched=False):
 
        if watched:
 
            admin = False
 
            repos_list = Session().query(Repository) \
 
                         .join(UserFollowing) \
 
                         .filter(UserFollowing.user_id ==
 
                                 request.authuser.user_id).all()
 
        else:
 
            admin = True
 
            repos_list = Session().query(Repository) \
 
                         .filter(Repository.owner_id ==
 
                                 request.authuser.user_id).all()
 

	
 
        repos_data = RepoModel().get_repos_as_dict(repos_list=repos_list,
 
                                                   admin=admin)
 
        #json used to render the grid
 
        return json.dumps(repos_data)
 

	
 
    def my_account(self):
 
        c.active = 'profile'
 
        self.__load_data()
 
        c.perm_user = AuthUser(user_id=request.authuser.user_id)
 
        managed_fields = auth_modules.get_managed_fields(c.user)
 
        def_user_perms = User.get_default_user().AuthUser.permissions['global']
 
        if 'hg.register.none' in def_user_perms:
 
            managed_fields.extend(['username', 'firstname', 'lastname', 'email'])
 

	
 
        c.readonly = lambda n: 'readonly' if n in managed_fields else None
 

	
 
        defaults = c.user.get_dict()
 
        update = False
 
        if request.POST:
 
            _form = UserForm(edit=True,
 
                             old_data={'user_id': request.authuser.user_id,
 
                                       'email': request.authuser.email})()
 
            form_result = {}
 
            try:
 
                post_data = dict(request.POST)
 
                post_data['new_password'] = ''
 
                post_data['password_confirmation'] = ''
 
                form_result = _form.to_python(post_data)
 
                # skip updating those attrs for my account
 
                skip_attrs = ['admin', 'active', 'extern_type', 'extern_name',
 
                              'new_password', 'password_confirmation',
 
                             ] + managed_fields
 

	
 
                UserModel().update(request.authuser.user_id, form_result,
 
                                   skip_attrs=skip_attrs)
 
                h.flash(_('Your account was updated successfully'),
 
                        category='success')
 
                Session().commit()
 
                update = True
 

	
 
            except formencode.Invalid as errors:
 
                return htmlfill.render(
 
                    render('admin/my_account/my_account.html'),
 
                    defaults=errors.value,
 
                    errors=errors.error_dict or {},
 
                    prefix_error=False,
 
                    encoding="UTF-8",
 
                    force_defaults=False)
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                h.flash(_('Error occurred during update of user %s') \
 
                        % form_result.get('username'), category='error')
 
        if update:
 
            raise HTTPFound(location='my_account')
 
        return htmlfill.render(
 
            render('admin/my_account/my_account.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def my_account_password(self):
 
        c.active = 'password'
 
        self.__load_data()
 

	
 
        managed_fields = auth_modules.get_managed_fields(c.user)
 
        c.can_change_password = 'password' not in managed_fields
 

	
 
        if request.POST and c.can_change_password:
 
            _form = PasswordChangeForm(request.authuser.username)()
 
            try:
 
                form_result = _form.to_python(request.POST)
 
                UserModel().update(request.authuser.user_id, form_result)
 
                Session().commit()
 
                h.flash(_("Successfully updated password"), category='success')
 
            except formencode.Invalid as errors:
 
                return htmlfill.render(
 
                    render('admin/my_account/my_account.html'),
 
                    defaults=errors.value,
 
                    errors=errors.error_dict or {},
kallithea/controllers/admin/users.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.admin.users
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Users crud controller
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 4, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 
import formencode
 

	
 
from formencode import htmlfill
 
from pylons import request, tmpl_context as c, config
 
from pylons.i18n.translation import _
 
from sqlalchemy.sql.expression import func
 
from webob.exc import HTTPFound, HTTPNotFound
 

	
 
import kallithea
 
from kallithea.config.routing import url
 
from kallithea.lib.exceptions import DefaultUserException, \
 
    UserOwnsReposException, UserCreationError
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import LoginRequired, HasPermissionAnyDecorator, \
 
    AuthUser
 
from kallithea.lib import auth_modules
 
from kallithea.lib.base import BaseController, render
 
from kallithea.model.api_key import ApiKeyModel
 

	
 
from kallithea.model.db import User, UserEmailMap, UserIpMap, UserToPerm
 
from kallithea.model.forms import UserForm, CustomDefaultPermissionsForm
 
from kallithea.model.user import UserModel
 
from kallithea.model.meta import Session
 
from kallithea.lib.utils import action_logger
 
from kallithea.lib.compat import json
 
from kallithea.lib.utils2 import datetime_to_time, safe_int, generate_api_key
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UsersController(BaseController):
 
    """REST Controller styled on the Atom Publishing Protocol"""
 

	
 
    @LoginRequired()
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def __before__(self):
 
        super(UsersController, self).__before__()
 
        c.available_permissions = config['available_permissions']
 

	
 
    def index(self, format='html'):
 
        c.users_list = User.query().order_by(User.username) \
 
                        .filter(User.username != User.DEFAULT_USER) \
 
                        .filter_by(is_default_user=False) \
 
                        .order_by(func.lower(User.username)) \
 
                        .all()
 

	
 
        users_data = []
 
        total_records = len(c.users_list)
 
        _tmpl_lookup = kallithea.CONFIG['pylons.app_globals'].mako_lookup
 
        template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
 

	
 
        grav_tmpl = '<div class="gravatar">%s</div>'
 

	
 
        username = lambda user_id, username: (
 
                template.get_def("user_name")
 
                .render(user_id, username, _=_, h=h, c=c))
 

	
 
        user_actions = lambda user_id, username: (
 
                template.get_def("user_actions")
 
                .render(user_id, username, _=_, h=h, c=c))
 

	
 
        for user in c.users_list:
 
            users_data.append({
 
                "gravatar": grav_tmpl % h.gravatar(user.email, size=20),
 
                "raw_name": user.username,
 
                "username": username(user.user_id, user.username),
 
                "firstname": h.escape(user.name),
 
                "lastname": h.escape(user.lastname),
 
                "last_login": h.fmt_date(user.last_login),
 
                "last_login_raw": datetime_to_time(user.last_login),
 
                "active": h.boolicon(user.active),
 
                "admin": h.boolicon(user.admin),
 
                "extern_type": user.extern_type,
 
                "extern_name": user.extern_name,
 
                "action": user_actions(user.user_id, user.username),
 
            })
 

	
 
        c.data = json.dumps({
 
            "totalRecords": total_records,
 
            "startIndex": 0,
 
            "sort": None,
 
            "dir": "asc",
 
            "records": users_data
 
        })
 

	
 
        return render('admin/users/users.html')
 

	
 
    def create(self):
 
        c.default_extern_type = User.DEFAULT_AUTH_TYPE
 
        c.default_extern_name = ''
 
        user_model = UserModel()
 
        user_form = UserForm()()
 
        try:
 
            form_result = user_form.to_python(dict(request.POST))
 
            user = user_model.create(form_result)
 
            action_logger(request.authuser, 'admin_created_user:%s' % user.username,
 
                          None, request.ip_addr, self.sa)
 
            h.flash(_('Created user %s') % user.username,
 
                    category='success')
 
            Session().commit()
 
        except formencode.Invalid as errors:
 
            return htmlfill.render(
 
                render('admin/users/user_add.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8",
 
                force_defaults=False)
 
        except UserCreationError as e:
 
            h.flash(e, 'error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during creation of user %s') \
 
                    % request.POST.get('username'), category='error')
 
        raise HTTPFound(location=url('edit_user', id=user.user_id))
 

	
 
    def new(self, format='html'):
 
        c.default_extern_type = User.DEFAULT_AUTH_TYPE
 
        c.default_extern_name = ''
 
        return render('admin/users/user_add.html')
 

	
 
    def update(self, id):
 
        user_model = UserModel()
 
        user = user_model.get(id)
 
        _form = UserForm(edit=True, old_data={'user_id': id,
 
                                              'email': user.email})()
 
        form_result = {}
 
        try:
 
            form_result = _form.to_python(dict(request.POST))
 
            skip_attrs = ['extern_type', 'extern_name',
 
                         ] + auth_modules.get_managed_fields(user)
 

	
 
            user_model.update(id, form_result, skip_attrs=skip_attrs)
 
            usr = form_result['username']
 
            action_logger(request.authuser, 'admin_updated_user:%s' % usr,
 
                          None, request.ip_addr, self.sa)
 
            h.flash(_('User updated successfully'), category='success')
 
            Session().commit()
 
        except formencode.Invalid as errors:
kallithea/controllers/api/api.py
Show inline comments
 
@@ -495,193 +495,193 @@ class ApiController(JSONRPCController):
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def get_server_info(self):
 
        """
 
        return server info, including Kallithea version and installed packages
 

	
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            'modules': [<module name>,...]
 
            'py_version': <python version>,
 
            'platform': <platform type>,
 
            'kallithea_version': <kallithea version>
 
          }
 
          error :  null
 
        """
 
        return Setting.get_server_info()
 

	
 
    def get_user(self, userid=Optional(OAttr('apiuser'))):
 
        """
 
        Gets a user by username or user_id, Returns empty result if user is
 
        not found. If userid param is skipped it is set to id of user who is
 
        calling this method. This command can be executed only using api_key
 
        belonging to user with admin rights, or regular users that cannot
 
        specify different userid than theirs
 

	
 
        :param userid: user to get data for
 
        :type userid: Optional(str or int)
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: None if user does not exist or
 
                    {
 
                        "user_id" :     "<user_id>",
 
                        "api_key" :     "<api_key>",
 
                        "api_keys":     "[<list of all API keys including additional ones>]"
 
                        "username" :    "<username>",
 
                        "firstname":    "<firstname>",
 
                        "lastname" :    "<lastname>",
 
                        "email" :       "<email>",
 
                        "emails":       "[<list of all emails including additional ones>]",
 
                        "ip_addresses": "[<ip_address_for_user>,...]",
 
                        "active" :      "<bool: user active>",
 
                        "admin" :       "<bool: user is admin>",
 
                        "extern_name" : "<extern_name>",
 
                        "extern_type" : "<extern type>
 
                        "last_login":   "<last_login>",
 
                        "permissions": {
 
                            "global": ["hg.create.repository",
 
                                       "repository.read",
 
                                       "hg.register.manual_activate"],
 
                            "repositories": {"repo1": "repository.none"},
 
                            "repositories_groups": {"Group1": "group.read"}
 
                         },
 
                    }
 

	
 
            error:  null
 

	
 
        """
 
        if not HasPermissionAny('hg.admin')():
 
            # make sure normal user does not pass someone else userid,
 
            # he is not allowed to do that
 
            if not isinstance(userid, Optional) and userid != request.authuser.user_id:
 
                raise JSONRPCError(
 
                    'userid is not the same as your user'
 
                )
 

	
 
        if isinstance(userid, Optional):
 
            userid = request.authuser.user_id
 

	
 
        user = get_user_or_error(userid)
 
        data = user.get_api_data()
 
        data['permissions'] = AuthUser(user_id=user.user_id).permissions
 
        return data
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def get_users(self):
 
        """
 
        Lists all existing users. This command can be executed only using api_key
 
        belonging to user with admin rights.
 

	
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: [<user_object>, ...]
 
            error:  null
 
        """
 

	
 
        return [
 
            user.get_api_data()
 
            for user in User.query()
 
                .order_by(User.username)
 
                .filter(User.username != User.DEFAULT_USER)
 
                .filter_by(is_default_user=False)
 
        ]
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def create_user(self, username, email, password=Optional(''),
 
                    firstname=Optional(''), lastname=Optional(''),
 
                    active=Optional(True), admin=Optional(False),
 
                    extern_type=Optional(User.DEFAULT_AUTH_TYPE),
 
                    extern_name=Optional('')):
 
        """
 
        Creates new user. Returns new user object. This command can
 
        be executed only using api_key belonging to user with admin rights.
 

	
 
        :param username: new username
 
        :type username: str or int
 
        :param email: email
 
        :type email: str
 
        :param password: password
 
        :type password: Optional(str)
 
        :param firstname: firstname
 
        :type firstname: Optional(str)
 
        :param lastname: lastname
 
        :type lastname: Optional(str)
 
        :param active: active
 
        :type active: Optional(bool)
 
        :param admin: admin
 
        :type admin: Optional(bool)
 
        :param extern_name: name of extern
 
        :type extern_name: Optional(str)
 
        :param extern_type: extern_type
 
        :type extern_type: Optional(str)
 

	
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "created new user `<username>`",
 
                      "user": <user_obj>
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "user `<username>` already exist"
 
            or
 
            "email `<email>` already exist"
 
            or
 
            "failed to create user `<username>`"
 
          }
 

	
 
        """
 

	
 
        if User.get_by_username(username):
 
            raise JSONRPCError("user `%s` already exist" % (username,))
 

	
 
        if User.get_by_email(email):
 
            raise JSONRPCError("email `%s` already exist" % (email,))
 

	
 
        try:
 
            user = UserModel().create_or_update(
 
                username=Optional.extract(username),
 
                password=Optional.extract(password),
 
                email=Optional.extract(email),
 
                firstname=Optional.extract(firstname),
 
                lastname=Optional.extract(lastname),
 
                active=Optional.extract(active),
 
                admin=Optional.extract(admin),
 
                extern_type=Optional.extract(extern_type),
 
                extern_name=Optional.extract(extern_name)
 
            )
 
            Session().commit()
 
            return dict(
 
                msg='created new user `%s`' % username,
 
                user=user.get_api_data()
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to create user `%s`' % (username,))
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def update_user(self, userid, username=Optional(None),
 
                    email=Optional(None), password=Optional(None),
 
                    firstname=Optional(None), lastname=Optional(None),
 
                    active=Optional(None), admin=Optional(None),
 
                    extern_type=Optional(None), extern_name=Optional(None)):
 
        """
 
        updates given user if such user exists. This command can
 
        be executed only using api_key belonging to user with admin rights.
 

	
 
        :param userid: userid to update
 
        :type userid: str or int
 
        :param username: new username
 
        :type username: str or int
kallithea/lib/auth_modules/auth_internal.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.auth_modules.auth_internal
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Kallithea authentication plugin for built in internal auth
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Created on Nov 17, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import logging
 

	
 
from kallithea.lib import auth_modules
 
from kallithea.lib.compat import formatted_json, hybrid_property
 
from kallithea.model.db import User
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class KallitheaAuthPlugin(auth_modules.KallitheaAuthPluginBase):
 
    def __init__(self):
 
        pass
 

	
 
    @hybrid_property
 
    def name(self):
 
        # Also found as kallithea.lib.model.db.User.DEFAULT_AUTH_TYPE
 
        return 'internal'
 

	
 
    def settings(self):
 
        return []
 

	
 
    def user_activation_state(self):
 
        def_user_perms = User.get_default_user().AuthUser.permissions['global']
 
        return 'hg.register.auto_activate' in def_user_perms
 

	
 
    def accepts(self, user, accepts_empty=True):
 
        """
 
        Custom accepts for this auth that doesn't accept empty users. We
 
        know that user exists in database.
 
        """
 
        return super(KallitheaAuthPlugin, self).accepts(user,
 
                                                        accepts_empty=False)
 

	
 
    def auth(self, userobj, username, password, settings, **kwargs):
 
        if not userobj:
 
            log.debug('userobj was:%s skipping', userobj)
 
            return None
 
        if userobj.extern_type != self.name:
 
            log.warning("userobj:%s extern_type mismatch got:`%s` expected:`%s`",
 
                     userobj, userobj.extern_type, self.name)
 
            return None
 
        if not username:
 
            log.debug('Empty username - skipping...')
 
            return None
 

	
 
        user_data = {
 
            "username": userobj.username,
 
            "firstname": userobj.firstname,
 
            "lastname": userobj.lastname,
 
            "groups": [],
 
            "email": userobj.email,
 
            "admin": userobj.admin,
 
            "active": userobj.active,
 
            "active_from_extern": userobj.active,
 
            "extern_name": userobj.user_id,
 
        }
 

	
 
        log.debug(formatted_json(user_data))
 
        if userobj.active:
 
            from kallithea.lib import auth
 
            password_match = auth.check_password(password, userobj.password)
 
            if userobj.username == User.DEFAULT_USER and userobj.active:
 
            if userobj.is_default_user and userobj.active:
 
                log.info('user %s authenticated correctly as anonymous user',
 
                         username)
 
                return user_data
 

	
 
            elif userobj.username == username and password_match:
 
                log.info('user %s authenticated correctly', user_data['username'])
 
                return user_data
 
            log.error("user %s had a bad password", username)
 
            return None
 
        else:
 
            log.warning('user %s tried auth but is disabled', username)
 
            return None
 

	
 
    def get_managed_fields(self):
 
        # Note: 'username' should only be editable (at least for user) if self registration is enabled
 
        return []
kallithea/lib/db_manage.py
Show inline comments
 
@@ -47,195 +47,193 @@ from sqlalchemy.engine import create_eng
 
from kallithea.model.repo_group import RepoGroupModel
 
#from kallithea.model import meta
 
from kallithea.model.meta import Session, Base
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.permission import PermissionModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def notify(msg):
 
    """
 
    Notification for migrations messages
 
    """
 
    ml = len(msg) + (4 * 2)
 
    print('\n%s\n*** %s ***\n%s' % ('*' * ml, msg, '*' * ml)).upper()
 

	
 

	
 
class DbManage(object):
 
    def __init__(self, log_sql, dbconf, root, tests=False, SESSION=None, cli_args=None):
 
        self.dbname = dbconf.split('/')[-1]
 
        self.tests = tests
 
        self.root = root
 
        self.dburi = dbconf
 
        self.log_sql = log_sql
 
        self.db_exists = False
 
        self.cli_args = cli_args or {}
 
        self.init_db(SESSION=SESSION)
 

	
 
    def _ask_ok(self, msg):
 
        """Invoke ask_ok unless the force_ask option provides the answer"""
 
        force_ask = self.cli_args.get('force_ask')
 
        if force_ask is not None:
 
            return force_ask
 
        return ask_ok(msg)
 

	
 
    def init_db(self, SESSION=None):
 
        if SESSION:
 
            self.sa = SESSION
 
        else:
 
            #init new sessions
 
            engine = create_engine(self.dburi, echo=self.log_sql)
 
            init_model(engine)
 
            self.sa = Session()
 

	
 
    def create_tables(self, override=False):
 
        """
 
        Create a auth database
 
        """
 

	
 
        log.info("Any existing database is going to be destroyed")
 
        if self.tests:
 
            destroy = True
 
        else:
 
            destroy = self._ask_ok('Are you sure to destroy old database ? [y/n]')
 
        if not destroy:
 
            print 'Nothing done.'
 
            sys.exit(0)
 
        if destroy:
 
            Base.metadata.drop_all()
 

	
 
        checkfirst = not override
 
        Base.metadata.create_all(checkfirst=checkfirst)
 

	
 
        # Create an Alembic configuration and generate the version table,
 
        # "stamping" it with the most recent Alembic migration revision, to
 
        # tell Alembic that all the schema upgrades are already in effect.
 
        alembic_cfg = alembic.config.Config()
 
        alembic_cfg.set_main_option('script_location', 'kallithea:alembic')
 
        alembic_cfg.set_main_option('sqlalchemy.url', self.dburi)
 
        # This command will give an error in an Alembic multi-head scenario,
 
        # but in practice, such a scenario should not come up during database
 
        # creation, even during development.
 
        alembic.command.stamp(alembic_cfg, 'head')
 

	
 
        log.info('Created tables for %s', self.dbname)
 

	
 
    def fix_repo_paths(self):
 
        """
 
        Fixes a old kallithea version path into new one without a '*'
 
        """
 

	
 
        paths = self.sa.query(Ui) \
 
                .filter(Ui.ui_key == '/') \
 
                .scalar()
 

	
 
        paths.ui_value = paths.ui_value.replace('*', '')
 

	
 
        self.sa.add(paths)
 
        self.sa.commit()
 

	
 
    def fix_default_user(self):
 
        """
 
        Fixes a old default user with some 'nicer' default values,
 
        used mostly for anonymous access
 
        """
 
        def_user = self.sa.query(User) \
 
                .filter(User.username == User.DEFAULT_USER) \
 
                .one()
 
        def_user = self.sa.query(User).filter_by(is_default_user=True).one()
 

	
 
        def_user.name = 'Anonymous'
 
        def_user.lastname = 'User'
 
        def_user.email = 'anonymous@kallithea-scm.org'
 

	
 
        self.sa.add(def_user)
 
        self.sa.commit()
 

	
 
    def fix_settings(self):
 
        """
 
        Fixes kallithea settings adds ga_code key for google analytics
 
        """
 

	
 
        hgsettings3 = Setting('ga_code', '')
 

	
 
        self.sa.add(hgsettings3)
 
        self.sa.commit()
 

	
 
    def admin_prompt(self, second=False):
 
        if not self.tests:
 
            import getpass
 

	
 
            username = self.cli_args.get('username')
 
            password = self.cli_args.get('password')
 
            email = self.cli_args.get('email')
 

	
 
            def get_password():
 
                password = getpass.getpass('Specify admin password '
 
                                           '(min 6 chars):')
 
                confirm = getpass.getpass('Confirm password:')
 

	
 
                if password != confirm:
 
                    log.error('passwords mismatch')
 
                    return False
 
                if len(password) < 6:
 
                    log.error('password is to short use at least 6 characters')
 
                    return False
 

	
 
                return password
 
            if username is None:
 
                username = raw_input('Specify admin username:')
 
            if password is None:
 
                password = get_password()
 
                if not password:
 
                    #second try
 
                    password = get_password()
 
                    if not password:
 
                        sys.exit()
 
            if email is None:
 
                email = raw_input('Specify admin email:')
 
            self.create_user(username, password, email, True)
 
        else:
 
            log.info('creating admin and regular test users')
 
            from kallithea.tests.base import TEST_USER_ADMIN_LOGIN, \
 
            TEST_USER_ADMIN_PASS, TEST_USER_ADMIN_EMAIL, \
 
            TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, \
 
            TEST_USER_REGULAR_EMAIL, TEST_USER_REGULAR2_LOGIN, \
 
            TEST_USER_REGULAR2_PASS, TEST_USER_REGULAR2_EMAIL
 

	
 
            self.create_user(TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS,
 
                             TEST_USER_ADMIN_EMAIL, True)
 

	
 
            self.create_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS,
 
                             TEST_USER_REGULAR_EMAIL, False)
 

	
 
            self.create_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS,
 
                             TEST_USER_REGULAR2_EMAIL, False)
 

	
 
    def create_ui_settings(self, repo_store_path):
 
        """
 
        Creates ui settings, fills out hooks
 
        """
 

	
 
        #HOOKS
 
        hooks1_key = Ui.HOOK_UPDATE
 
        hooks1_ = self.sa.query(Ui) \
 
            .filter(Ui.ui_key == hooks1_key).scalar()
 

	
 
        hooks1 = Ui() if hooks1_ is None else hooks1_
 
        hooks1.ui_section = 'hooks'
 
        hooks1.ui_key = hooks1_key
 
        hooks1.ui_value = 'hg update >&2'
 
        hooks1.ui_active = False
 
        self.sa.add(hooks1)
 

	
 
        hooks2_key = Ui.HOOK_REPO_SIZE
 
        hooks2_ = self.sa.query(Ui) \
 
            .filter(Ui.ui_key == hooks2_key).scalar()
 
        hooks2 = Ui() if hooks2_ is None else hooks2_
 
        hooks2.ui_section = 'hooks'
 
        hooks2.ui_key = hooks2_key
 
        hooks2.ui_value = 'python:kallithea.lib.hooks.repo_size'
 
        self.sa.add(hooks2)
 

	
 
        hooks3 = Ui()
 
        hooks3.ui_section = 'hooks'
kallithea/lib/middleware/simplegit.py
Show inline comments
 
@@ -29,193 +29,193 @@ Original author and date, and relevant c
 

	
 

	
 
import os
 
import re
 
import logging
 
import traceback
 

	
 
from paste.httpheaders import REMOTE_USER, AUTH_TYPE
 
from webob.exc import HTTPNotFound, HTTPForbidden, HTTPInternalServerError, \
 
    HTTPNotAcceptable
 
from kallithea.model.db import User, Ui
 

	
 
from kallithea.lib.utils2 import safe_str, safe_unicode, fix_PATH, get_server_url, \
 
    _set_extras
 
from kallithea.lib.base import BaseVCSController, WSGIResultCloseCallback
 
from kallithea.lib.utils import make_ui, is_valid_repo
 
from kallithea.lib.exceptions import HTTPLockedRC
 
from kallithea.lib.hooks import pre_pull
 
from kallithea.lib import auth_modules
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
GIT_PROTO_PAT = re.compile(r'^/(.+)/(info/refs|git-upload-pack|git-receive-pack)')
 

	
 

	
 
def is_git(environ):
 
    path_info = environ['PATH_INFO']
 
    isgit_path = GIT_PROTO_PAT.match(path_info)
 
    log.debug('pathinfo: %s detected as Git %s',
 
        path_info, isgit_path is not None
 
    )
 
    return isgit_path
 

	
 

	
 
class SimpleGit(BaseVCSController):
 

	
 
    def _handle_request(self, environ, start_response):
 
        if not is_git(environ):
 
            return self.application(environ, start_response)
 

	
 
        ip_addr = self._get_ip_addr(environ)
 
        self._git_first_op = False
 
        # skip passing error to error controller
 
        environ['pylons.status_code_redirect'] = True
 

	
 
        #======================================================================
 
        # EXTRACT REPOSITORY NAME FROM ENV
 
        #======================================================================
 
        try:
 
            str_repo_name = self.__get_repository(environ)
 
            repo_name = safe_unicode(str_repo_name)
 
            log.debug('Extracted repo name is %s', repo_name)
 
        except Exception as e:
 
            log.error('error extracting repo_name: %r', e)
 
            return HTTPInternalServerError()(environ, start_response)
 

	
 
        # quick check if that dir exists...
 
        if not is_valid_repo(repo_name, self.basepath, 'git'):
 
            return HTTPNotFound()(environ, start_response)
 

	
 
        #======================================================================
 
        # GET ACTION PULL or PUSH
 
        #======================================================================
 
        action = self.__get_action(environ)
 

	
 
        #======================================================================
 
        # CHECK PERMISSIONS
 
        #======================================================================
 
        user, response_app = self._authorize(environ, start_response, action, repo_name, ip_addr)
 
        if response_app is not None:
 
            return response_app(environ, start_response)
 

	
 
        # extras are injected into UI object and later available
 
        # in hooks executed by kallithea
 
        from kallithea import CONFIG
 
        server_url = get_server_url(environ)
 
        extras = {
 
            'ip': ip_addr,
 
            'username': user.username,
 
            'action': action,
 
            'repository': repo_name,
 
            'scm': 'git',
 
            'config': CONFIG['__file__'],
 
            'server_url': server_url,
 
            'make_lock': None,
 
            'locked_by': [None, None]
 
        }
 

	
 
        #===================================================================
 
        # GIT REQUEST HANDLING
 
        #===================================================================
 
        repo_path = os.path.join(safe_str(self.basepath),str_repo_name)
 
        log.debug('Repository path is %s', repo_path)
 

	
 
        # CHECK LOCKING only if it's not ANONYMOUS USER
 
        if user.username != User.DEFAULT_USER:
 
        if not user.is_default_user:
 
            log.debug('Checking locking on repository')
 
            (make_lock,
 
             locked,
 
             locked_by) = self._check_locking_state(
 
                            environ=environ, action=action,
 
                            repo=repo_name, user_id=user.user_id
 
                       )
 
            # store the make_lock for later evaluation in hooks
 
            extras.update({'make_lock': make_lock,
 
                           'locked_by': locked_by})
 

	
 
        fix_PATH()
 
        log.debug('HOOKS extras is %s', extras)
 
        baseui = make_ui('db')
 
        self.__inject_extras(repo_path, baseui, extras)
 

	
 
        try:
 
            self._handle_githooks(repo_name, action, baseui, environ)
 
            log.info('%s action on Git repo "%s" by "%s" from %s',
 
                     action, str_repo_name, safe_str(user.username), ip_addr)
 
            app = self.__make_app(repo_name, repo_path, extras)
 
            result = app(environ, start_response)
 
            if action == 'push':
 
                result = WSGIResultCloseCallback(result,
 
                    lambda: self._invalidate_cache(repo_name))
 
            return result
 
        except HTTPLockedRC as e:
 
            log.debug('Locked, response %s: %s', e.code, e.title)
 
            return e(environ, start_response)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            return HTTPInternalServerError()(environ, start_response)
 

	
 
    def __make_app(self, repo_name, repo_path, extras):
 
        """
 
        Make an wsgi application using dulserver
 

	
 
        :param repo_name: name of the repository
 
        :param repo_path: full path to the repository
 
        """
 

	
 
        from kallithea.lib.middleware.pygrack import make_wsgi_app
 
        app = make_wsgi_app(
 
            repo_root=safe_str(self.basepath),
 
            repo_name=safe_unicode(repo_name),
 
            extras=extras,
 
        )
 
        return app
 

	
 
    def __get_repository(self, environ):
 
        """
 
        Gets repository name out of PATH_INFO header
 

	
 
        :param environ: environ where PATH_INFO is stored
 
        """
 
        try:
 
            environ['PATH_INFO'] = self._get_by_id(environ['PATH_INFO'])
 
            repo_name = GIT_PROTO_PAT.match(environ['PATH_INFO']).group(1)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
        return repo_name
 

	
 
    def __get_action(self, environ):
 
        """
 
        Maps git request commands into a pull or push command.
 

	
 
        :param environ:
 
        """
 
        service = environ['QUERY_STRING'].split('=')
 

	
 
        if len(service) > 1:
 
            service_cmd = service[1]
 
            mapping = {
 
                'git-receive-pack': 'push',
 
                'git-upload-pack': 'pull',
 
            }
 
            op = mapping[service_cmd]
 
            self._git_stored_op = op
 
            return op
 
        else:
 
            # try to fallback to stored variable as we don't know if the last
 
            # operation is pull/push
 
            op = getattr(self, '_git_stored_op', 'pull')
 
        return op
 

	
 
    def _handle_githooks(self, repo_name, action, baseui, environ):
 
        """
 
        Handles pull action, push is handled by post-receive hook
 
        """
 
        from kallithea.lib.hooks import log_pull_action
 
        service = environ['QUERY_STRING'].split('=')
 

	
 
        if len(service) < 2:
 
            return
kallithea/lib/middleware/simplehg.py
Show inline comments
 
@@ -34,193 +34,193 @@ import traceback
 

	
 
from webob.exc import HTTPNotFound, HTTPForbidden, HTTPInternalServerError, \
 
    HTTPNotAcceptable, HTTPBadRequest
 
from kallithea.model.db import User
 

	
 
from kallithea.lib.utils2 import safe_str, safe_unicode, fix_PATH, get_server_url, \
 
    _set_extras
 
from kallithea.lib.base import BaseVCSController, WSGIResultCloseCallback
 
from kallithea.lib.utils import make_ui, is_valid_repo, ui_sections
 
from kallithea.lib.vcs.utils.hgcompat import RepoError, hgweb_mod
 
from kallithea.lib.exceptions import HTTPLockedRC
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def is_mercurial(environ):
 
    """
 
    Returns True if request's target is mercurial server - header
 
    ``HTTP_ACCEPT`` of such request would start with ``application/mercurial``.
 
    """
 
    http_accept = environ.get('HTTP_ACCEPT')
 
    path_info = environ['PATH_INFO']
 
    if http_accept and http_accept.startswith('application/mercurial'):
 
        ishg_path = True
 
    else:
 
        ishg_path = False
 

	
 
    log.debug('pathinfo: %s detected as Mercurial %s',
 
        path_info, ishg_path
 
    )
 
    return ishg_path
 

	
 
class SimpleHg(BaseVCSController):
 

	
 
    def _handle_request(self, environ, start_response):
 
        if not is_mercurial(environ):
 
            return self.application(environ, start_response)
 

	
 
        ip_addr = self._get_ip_addr(environ)
 
        # skip passing error to error controller
 
        environ['pylons.status_code_redirect'] = True
 

	
 
        #======================================================================
 
        # EXTRACT REPOSITORY NAME FROM ENV
 
        #======================================================================
 
        try:
 
            str_repo_name = environ['REPO_NAME'] = self.__get_repository(environ)
 
            assert isinstance(str_repo_name, str), str_repo_name
 
            repo_name = safe_unicode(str_repo_name)
 
            assert safe_str(repo_name) == str_repo_name, (str_repo_name, repo_name)
 
            log.debug('Extracted repo name is %s', repo_name)
 
        except Exception as e:
 
            log.error('error extracting repo_name: %r', e)
 
            return HTTPInternalServerError()(environ, start_response)
 

	
 
        # quick check if that dir exists...
 
        if not is_valid_repo(repo_name, self.basepath, 'hg'):
 
            return HTTPNotFound()(environ, start_response)
 

	
 
        #======================================================================
 
        # GET ACTION PULL or PUSH
 
        #======================================================================
 
        try:
 
            action = self.__get_action(environ)
 
        except HTTPBadRequest as e:
 
            return e(environ, start_response)
 

	
 
        #======================================================================
 
        # CHECK PERMISSIONS
 
        #======================================================================
 
        user, response_app = self._authorize(environ, start_response, action, repo_name, ip_addr)
 
        if response_app is not None:
 
            return response_app(environ, start_response)
 

	
 
        # extras are injected into mercurial UI object and later available
 
        # in hg hooks executed by kallithea
 
        from kallithea import CONFIG
 
        server_url = get_server_url(environ)
 
        extras = {
 
            'ip': ip_addr,
 
            'username': user.username,
 
            'action': action,
 
            'repository': repo_name,
 
            'scm': 'hg',
 
            'config': CONFIG['__file__'],
 
            'server_url': server_url,
 
            'make_lock': None,
 
            'locked_by': [None, None]
 
        }
 
        #======================================================================
 
        # MERCURIAL REQUEST HANDLING
 
        #======================================================================
 
        repo_path = os.path.join(safe_str(self.basepath), str_repo_name)
 
        log.debug('Repository path is %s', repo_path)
 

	
 
        # CHECK LOCKING only if it's not ANONYMOUS USER
 
        if user.username != User.DEFAULT_USER:
 
        if not user.is_default_user:
 
            log.debug('Checking locking on repository')
 
            (make_lock,
 
             locked,
 
             locked_by) = self._check_locking_state(
 
                            environ=environ, action=action,
 
                            repo=repo_name, user_id=user.user_id
 
                       )
 
            # store the make_lock for later evaluation in hooks
 
            extras.update({'make_lock': make_lock,
 
                           'locked_by': locked_by})
 

	
 
        fix_PATH()
 
        log.debug('HOOKS extras is %s', extras)
 
        baseui = make_ui('db')
 
        self.__inject_extras(repo_path, baseui, extras)
 

	
 
        try:
 
            log.info('%s action on Mercurial repo "%s" by "%s" from %s',
 
                     action, str_repo_name, safe_str(user.username), ip_addr)
 
            app = self.__make_app(repo_path, baseui, extras)
 
            result = app(environ, start_response)
 
            if action == 'push':
 
                result = WSGIResultCloseCallback(result,
 
                    lambda: self._invalidate_cache(repo_name))
 
            return result
 
        except RepoError as e:
 
            if str(e).find('not found') != -1:
 
                return HTTPNotFound()(environ, start_response)
 
        except HTTPLockedRC as e:
 
            # Before Mercurial 3.6, lock exceptions were caught here
 
            log.debug('Locked, response %s: %s', e.code, e.title)
 
            return e(environ, start_response)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            return HTTPInternalServerError()(environ, start_response)
 

	
 
    def __make_app(self, repo_name, baseui, extras):
 
        """
 
        Make an wsgi application using hgweb, and inject generated baseui
 
        instance, additionally inject some extras into ui object
 
        """
 
        class HgWebWrapper(hgweb_mod.hgweb):
 
            # Work-around for Mercurial 3.6+ causing lock exceptions to be
 
            # thrown late
 
            def _runwsgi(self, req, repo):
 
                try:
 
                    return super(HgWebWrapper, self)._runwsgi(req, repo)
 
                except HTTPLockedRC as e:
 
                    log.debug('Locked, response %s: %s', e.code, e.title)
 
                    req.respond(e.status, 'text/plain')
 
                    return ''
 

	
 
        return HgWebWrapper(repo_name, name=repo_name, baseui=baseui)
 

	
 
    def __get_repository(self, environ):
 
        """
 
        Gets repository name out of PATH_INFO header
 

	
 
        :param environ: environ where PATH_INFO is stored
 
        """
 
        try:
 
            environ['PATH_INFO'] = self._get_by_id(environ['PATH_INFO'])
 
            repo_name = '/'.join(environ['PATH_INFO'].split('/')[1:])
 
            if repo_name.endswith('/'):
 
                repo_name = repo_name.rstrip('/')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
        return repo_name
 

	
 
    def __get_action(self, environ):
 
        """
 
        Maps mercurial request commands into a pull or push command.
 

	
 
        Raises HTTPBadRequest if the request environment doesn't look like a hg client.
 
        """
 
        mapping = {'unbundle': 'push',
 
                   'pushkey': 'push'}
 
        for qry in environ['QUERY_STRING'].split('&'):
 
            if qry.startswith('cmd'):
 
                cmd = qry.split('=')[-1]
 
                return mapping.get(cmd, 'pull')
 

	
 
        # Note: the client doesn't get the helpful error message
 
        raise HTTPBadRequest('Unable to detect pull/push action! Are you using non standard command or client?')
 

	
 
    def __inject_extras(self, repo_path, baseui, extras=None):
 
        """
 
        Injects some extra params into baseui instance
 

	
 
        also overwrites global settings with those takes from local hgrc file
 

	
 
        :param baseui: baseui instance
 
        :param extras: dict with extra params to put into baseui
 
        """
kallithea/lib/utils2.py
Show inline comments
 
@@ -453,193 +453,193 @@ def uri_filter(uri):
 
def credentials_filter(uri):
 
    """
 
    Returns a url with removed credentials
 

	
 
    :param uri:
 
    """
 

	
 
    uri = uri_filter(uri)
 
    #check if we have port
 
    if len(uri) > 2 and uri[2]:
 
        uri[2] = ':' + uri[2]
 

	
 
    return ''.join(uri)
 

	
 

	
 
def get_clone_url(uri_tmpl, qualified_home_url, repo_name, repo_id, **override):
 
    parsed_url = urlobject.URLObject(qualified_home_url)
 
    decoded_path = safe_unicode(urllib.unquote(parsed_url.path.rstrip('/')))
 
    args = {
 
        'scheme': parsed_url.scheme,
 
        'user': '',
 
        'netloc': parsed_url.netloc+decoded_path,  # path if we use proxy-prefix
 
        'prefix': decoded_path,
 
        'repo': repo_name,
 
        'repoid': str(repo_id)
 
    }
 
    args.update(override)
 
    args['user'] = urllib.quote(safe_str(args['user']))
 

	
 
    for k, v in args.items():
 
        uri_tmpl = uri_tmpl.replace('{%s}' % k, v)
 

	
 
    # remove leading @ sign if it's present. Case of empty user
 
    url_obj = urlobject.URLObject(uri_tmpl)
 
    url = url_obj.with_netloc(url_obj.netloc.lstrip('@'))
 

	
 
    return safe_unicode(url)
 

	
 

	
 
def get_changeset_safe(repo, rev):
 
    """
 
    Safe version of get_changeset if this changeset doesn't exists for a
 
    repo it returns a Dummy one instead
 

	
 
    :param repo:
 
    :param rev:
 
    """
 
    from kallithea.lib.vcs.backends.base import BaseRepository
 
    from kallithea.lib.vcs.exceptions import RepositoryError
 
    from kallithea.lib.vcs.backends.base import EmptyChangeset
 
    if not isinstance(repo, BaseRepository):
 
        raise Exception('You must pass an Repository '
 
                        'object as first argument got %s', type(repo))
 

	
 
    try:
 
        cs = repo.get_changeset(rev)
 
    except (RepositoryError, LookupError):
 
        cs = EmptyChangeset(requested_revision=rev)
 
    return cs
 

	
 

	
 
def datetime_to_time(dt):
 
    if dt:
 
        return time.mktime(dt.timetuple())
 

	
 

	
 
def time_to_datetime(tm):
 
    if tm:
 
        if isinstance(tm, basestring):
 
            try:
 
                tm = float(tm)
 
            except ValueError:
 
                return
 
        return datetime.datetime.fromtimestamp(tm)
 

	
 

	
 
# Must match regexp in kallithea/public/js/base.js MentionsAutoComplete()
 
# Check char before @ - it must not look like we are in an email addresses.
 
# Matching is greedy so we don't have to look beyond the end.
 
MENTIONS_REGEX = re.compile(r'(?:^|(?<=[^a-zA-Z0-9]))@([a-zA-Z0-9][-_.a-zA-Z0-9]*[a-zA-Z0-9])')
 

	
 
def extract_mentioned_usernames(text):
 
    r"""
 
    Returns list of (possible) usernames @mentioned in given text.
 

	
 
    >>> extract_mentioned_usernames('@1-2.a_X,@1234 not@not @ddd@not @n @ee @ff @gg, @gg;@hh @n\n@zz,')
 
    ['1-2.a_X', '1234', 'ddd', 'ee', 'ff', 'gg', 'hh', 'zz']
 
    """
 
    return MENTIONS_REGEX.findall(text)
 

	
 
def extract_mentioned_users(text):
 
    """ Returns set of actual database Users @mentioned in given text. """
 
    from kallithea.model.db import User
 
    result = set()
 
    for name in extract_mentioned_usernames(text):
 
        user = User.get_by_username(name, case_insensitive=True)
 
        if user is not None and user.username != User.DEFAULT_USER:
 
        if user is not None and not user.is_default_user:
 
            result.add(user)
 
    return result
 

	
 

	
 
class AttributeDict(dict):
 
    def __getattr__(self, attr):
 
        return self.get(attr, None)
 
    __setattr__ = dict.__setitem__
 
    __delattr__ = dict.__delitem__
 

	
 

	
 
def fix_PATH(os_=None):
 
    """
 
    Get current active python path, and append it to PATH variable to fix issues
 
    of subprocess calls and different python versions
 
    """
 
    if os_ is None:
 
        import os
 
    else:
 
        os = os_
 

	
 
    cur_path = os.path.split(sys.executable)[0]
 
    if not os.environ['PATH'].startswith(cur_path):
 
        os.environ['PATH'] = '%s:%s' % (cur_path, os.environ['PATH'])
 

	
 

	
 
def obfuscate_url_pw(engine):
 
    from sqlalchemy.engine import url as sa_url
 
    from sqlalchemy.exc import ArgumentError
 
    try:
 
        _url = sa_url.make_url(engine or '')
 
    except ArgumentError:
 
        return engine
 
    if _url.password:
 
        _url.password = 'XXXXX'
 
    return str(_url)
 

	
 

	
 
def get_server_url(environ):
 
    req = webob.Request(environ)
 
    return req.host_url + req.script_name
 

	
 

	
 
def _extract_extras(env=None):
 
    """
 
    Extracts the Kallithea extras data from os.environ, and wraps it into named
 
    AttributeDict object
 
    """
 
    if not env:
 
        env = os.environ
 

	
 
    try:
 
        extras = json.loads(env['KALLITHEA_EXTRAS'])
 
    except KeyError:
 
        extras = {}
 

	
 
    try:
 
        for k in ['username', 'repository', 'locked_by', 'scm', 'make_lock',
 
                  'action', 'ip']:
 
            extras[k]
 
    except KeyError as e:
 
        raise Exception('Missing key %s in os.environ %s' % (e, extras))
 

	
 
    return AttributeDict(extras)
 

	
 

	
 
def _set_extras(extras):
 
    # RC_SCM_DATA can probably be removed in the future, but for compatibility now...
 
    os.environ['KALLITHEA_EXTRAS'] = os.environ['RC_SCM_DATA'] = json.dumps(extras)
 

	
 

	
 
def get_current_authuser():
 
    """
 
    Gets kallithea user from threadlocal tmpl_context variable if it's
 
    defined, else returns None.
 
    """
 
    from pylons import tmpl_context
 
    if hasattr(tmpl_context, 'authuser'):
 
        return tmpl_context.authuser
 

	
 
    return None
 

	
 

	
 
class OptionalAttr(object):
 
    """
 
    Special Optional Option that defines other attribute. Example::
 

	
 
        def test(apiuser, userid=Optional(OAttr('apiuser')):
 
            user = Optional.extract(userid)
 
            # calls
 

	
 
    """
 

	
 
    def __init__(self, attr_name):
 
        self.attr_name = attr_name
 

	
kallithea/model/db.py
Show inline comments
 
@@ -437,235 +437,238 @@ class User(Base, BaseDbModel):
 
    _email = Column("email", String(255), nullable=True, unique=True) # FIXME: not nullable?
 
    last_login = Column(DateTime(timezone=False), nullable=True)
 
    extern_type = Column(String(255), nullable=True) # FIXME: not nullable?
 
    extern_name = Column(String(255), nullable=True) # FIXME: not nullable?
 
    api_key = Column(String(255), nullable=False)
 
    inherit_default_permissions = Column(Boolean(), nullable=False, default=True)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _user_data = Column("user_data", LargeBinary(), nullable=True)  # JSON data # FIXME: not nullable?
 

	
 
    user_log = relationship('UserLog')
 
    user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
 

	
 
    repositories = relationship('Repository')
 
    repo_groups = relationship('RepoGroup')
 
    user_groups = relationship('UserGroup')
 
    user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
 
    followings = relationship('UserFollowing', primaryjoin='UserFollowing.user_id==User.user_id', cascade='all')
 

	
 
    repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
 
    repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
 

	
 
    group_member = relationship('UserGroupMember', cascade='all')
 

	
 
    notifications = relationship('UserNotification', cascade='all')
 
    # notifications assigned to this user
 
    user_created_notifications = relationship('Notification', cascade='all')
 
    # comments created by this user
 
    user_comments = relationship('ChangesetComment', cascade='all')
 
    #extra emails for this user
 
    user_emails = relationship('UserEmailMap', cascade='all')
 
    #extra API keys
 
    user_api_keys = relationship('UserApiKeys', cascade='all')
 

	
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 
    @property
 
    def firstname(self):
 
        # alias for future
 
        return self.name
 

	
 
    @property
 
    def emails(self):
 
        other = UserEmailMap.query().filter(UserEmailMap.user==self).all()
 
        return [self.email] + [x.email for x in other]
 

	
 
    @property
 
    def api_keys(self):
 
        other = UserApiKeys.query().filter(UserApiKeys.user==self).all()
 
        return [self.api_key] + [x.api_key for x in other]
 

	
 
    @property
 
    def ip_addresses(self):
 
        ret = UserIpMap.query().filter(UserIpMap.user == self).all()
 
        return [x.ip_addr for x in ret]
 

	
 
    @property
 
    def full_name(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def full_name_or_username(self):
 
        """
 
        Show full name.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s' % (self.firstname, self.lastname)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_name_and_username(self):
 
        """
 
        Show full name and username as 'Firstname Lastname (username)'.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s (%s)' % (self.firstname, self.lastname, self.username)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_contact(self):
 
        return '%s %s <%s>' % (self.firstname, self.lastname, self.email)
 

	
 
    @property
 
    def short_contact(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def is_admin(self):
 
        return self.admin
 

	
 
    @hybrid_property
 
    def is_default_user(self):
 
        return self.username == User.DEFAULT_USER
 

	
 
    @property
 
    def AuthUser(self):
 
        """
 
        Returns instance of AuthUser for this user
 
        """
 
        from kallithea.lib.auth import AuthUser
 
        return AuthUser(dbuser=self)
 

	
 
    @hybrid_property
 
    def user_data(self):
 
        if not self._user_data:
 
            return {}
 

	
 
        try:
 
            return json.loads(self._user_data)
 
        except TypeError:
 
            return {}
 

	
 
    @user_data.setter
 
    def user_data(self, val):
 
        try:
 
            self._user_data = json.dumps(val)
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
 
                                      self.user_id, self.username)
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(User, cls).guess_instance(value, User.get_by_username)
 

	
 
    @classmethod
 
    def get_or_404(cls, id_, allow_default=True):
 
        '''
 
        Overridden version of BaseDbModel.get_or_404, with an extra check on
 
        the default user.
 
        '''
 
        user = super(User, cls).get_or_404(id_)
 
        if allow_default == False:
 
            if user.username == User.DEFAULT_USER:
 
                raise DefaultUserException
 
        if not allow_default and user.is_default_user:
 
            raise DefaultUserException()
 
        return user
 

	
 
    @classmethod
 
    def get_by_username_or_email(cls, username_or_email, case_insensitive=False, cache=False):
 
        """
 
        For anything that looks like an email address, look up by the email address (matching
 
        case insensitively).
 
        For anything else, try to look up by the user name.
 

	
 
        This assumes no normal username can have '@' symbol.
 
        """
 
        if '@' in username_or_email:
 
            return User.get_by_email(username_or_email, cache=cache)
 
        else:
 
            return User.get_by_username(username_or_email, case_insensitive=case_insensitive, cache=cache)
 

	
 
    @classmethod
 
    def get_by_username(cls, username, case_insensitive=False, cache=False):
 
        if case_insensitive:
 
            q = cls.query().filter(func.lower(cls.username) == func.lower(username))
 
        else:
 
            q = cls.query().filter(cls.username == username)
 

	
 
        if cache:
 
            q = q.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_user_%s" % _hash_key(username)
 
                          )
 
            )
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get_by_api_key(cls, api_key, cache=False, fallback=True):
 
        if len(api_key) != 40 or not api_key.isalnum():
 
            return None
 

	
 
        q = cls.query().filter(cls.api_key == api_key)
 

	
 
        if cache:
 
            q = q.options(FromCache("sql_cache_short",
 
                                    "get_api_key_%s" % api_key))
 
        res = q.scalar()
 

	
 
        if fallback and not res:
 
            #fallback to additional keys
 
            _res = UserApiKeys.query().filter_by(api_key=api_key, is_expired=False).first()
 
            if _res:
 
                res = _res.user
 
        return res
 

	
 
    @classmethod
 
    def get_by_email(cls, email, cache=False):
 
        q = cls.query().filter(func.lower(cls.email) == func.lower(email))
 

	
 
        if cache:
 
            q = q.options(FromCache("sql_cache_short",
 
                                    "get_email_key_%s" % email))
 

	
 
        ret = q.scalar()
 
        if ret is None:
 
            q = UserEmailMap.query()
 
            # try fetching in alternate email map
 
            q = q.filter(func.lower(UserEmailMap.email) == func.lower(email))
 
            q = q.options(joinedload(UserEmailMap.user))
 
            if cache:
 
                q = q.options(FromCache("sql_cache_short",
 
                                        "get_email_map_key_%s" % email))
 
            ret = getattr(q.scalar(), 'user', None)
 

	
 
        return ret
 

	
 
    @classmethod
 
    def get_from_cs_author(cls, author):
 
        """
 
        Tries to get User objects out of commit author string
 

	
 
        :param author:
 
        """
 
        from kallithea.lib.helpers import email, author_name
 
        # Valid email in the attribute passed, see if they're in the system
 
        _email = email(author)
 
        if _email:
 
            user = cls.get_by_email(_email)
 
            if user is not None:
 
                return user
 
        # Maybe we can match by username?
 
        _author = author_name(author)
 
        user = cls.get_by_username(_author, case_insensitive=True)
 
        if user is not None:
 
            return user
 

	
 
    def update_lastlogin(self):
 
        """Update user lastlogin"""
 
        self.last_login = datetime.datetime.now()
 
        log.debug('updated user %s lastlogin', self.username)
 

	
kallithea/model/permission.py
Show inline comments
 
@@ -6,168 +6,168 @@
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.permission
 
~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
permissions model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Aug 20, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import logging
 
import traceback
 

	
 
from sqlalchemy.exc import DatabaseError
 

	
 
from kallithea.model.base import BaseModel
 
from kallithea.model.db import User, Permission, UserToPerm, UserRepoToPerm, \
 
    UserRepoGroupToPerm, UserUserGroupToPerm
 
from kallithea.lib.utils2 import str2bool
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class PermissionModel(BaseModel):
 
    """
 
    Permissions model for Kallithea
 
    """
 

	
 
    def create_permissions(self):
 
        """
 
        Create permissions for whole system
 
        """
 
        for p in Permission.PERMS:
 
            if not Permission.get_by_key(p[0]):
 
                new_perm = Permission()
 
                new_perm.permission_name = p[0]
 
                self.sa.add(new_perm)
 

	
 
    def create_default_permissions(self, user, force=False):
 
        """
 
        Create missing default permissions for user. If force is set, the default
 
        permissions for the user are reset, otherwise only missing permissions are
 
        created.
 

	
 
        :param user:
 
        """
 
        user = User.guess_instance(user)
 

	
 
        def _make_perm(perm):
 
            new_perm = UserToPerm()
 
            new_perm.user = user
 
            new_perm.permission = Permission.get_by_key(perm)
 
            return new_perm
 

	
 
        def _get_group(perm_name):
 
            return '.'.join(perm_name.split('.')[:1])
 

	
 
        perms = UserToPerm.query().filter(UserToPerm.user == user).all()
 
        defined_perms_groups = map(_get_group,
 
                                (x.permission.permission_name for x in perms))
 
        log.debug('GOT ALREADY DEFINED:%s', perms)
 
        DEFAULT_PERMS = Permission.DEFAULT_USER_PERMISSIONS
 

	
 
        if force:
 
            for perm in perms:
 
                self.sa.delete(perm)
 
            self.sa.commit()
 
            defined_perms_groups = []
 
        # For every default permission that needs to be created, we check if
 
        # its group is already defined. If it's not, we create default permission.
 
        for perm_name in DEFAULT_PERMS:
 
            gr = _get_group(perm_name)
 
            if gr not in defined_perms_groups:
 
                log.debug('GR:%s not found, creating permission %s',
 
                          gr, perm_name)
 
                new_perm = _make_perm(perm_name)
 
                self.sa.add(new_perm)
 

	
 
    def update(self, form_result):
 
        perm_user = User.get_by_username(username=form_result['perm_user_name'])
 

	
 
        try:
 
            # stage 1 set anonymous access
 
            if perm_user.username == User.DEFAULT_USER:
 
            if perm_user.is_default_user:
 
                perm_user.active = str2bool(form_result['anonymous'])
 
                self.sa.add(perm_user)
 

	
 
            # stage 2 reset defaults and set them from form data
 
            def _make_new(usr, perm_name):
 
                log.debug('Creating new permission:%s', perm_name)
 
                new = UserToPerm()
 
                new.user = usr
 
                new.permission = Permission.get_by_key(perm_name)
 
                return new
 
            # clear current entries, to make this function idempotent
 
            # it will fix even if we define more permissions or permissions
 
            # are somehow missing
 
            u2p = self.sa.query(UserToPerm) \
 
                .filter(UserToPerm.user == perm_user) \
 
                .all()
 
            for p in u2p:
 
                self.sa.delete(p)
 
            #create fresh set of permissions
 
            for def_perm_key in ['default_repo_perm',
 
                                 'default_group_perm',
 
                                 'default_user_group_perm',
 
                                 'default_repo_create',
 
                                 'create_on_write', # special case for create repos on write access to group
 
                                 #'default_repo_group_create', #not implemented yet
 
                                 'default_user_group_create',
 
                                 'default_fork',
 
                                 'default_register',
 
                                 'default_extern_activate']:
 
                p = _make_new(perm_user, form_result[def_perm_key])
 
                self.sa.add(p)
 

	
 
            #stage 3 update all default permissions for repos if checked
 
            if form_result['overwrite_default_repo']:
 
                _def_name = form_result['default_repo_perm'].split('repository.')[-1]
 
                _def = Permission.get_by_key('repository.' + _def_name)
 
                # repos
 
                for r2p in self.sa.query(UserRepoToPerm) \
 
                               .filter(UserRepoToPerm.user == perm_user) \
 
                               .all():
 

	
 
                    #don't reset PRIVATE repositories
 
                    if not r2p.repository.private:
 
                        r2p.permission = _def
 
                        self.sa.add(r2p)
 

	
 
            if form_result['overwrite_default_group']:
 
                _def_name = form_result['default_group_perm'].split('group.')[-1]
 
                # groups
 
                _def = Permission.get_by_key('group.' + _def_name)
 
                for g2p in self.sa.query(UserRepoGroupToPerm) \
 
                               .filter(UserRepoGroupToPerm.user == perm_user) \
 
                               .all():
 
                    g2p.permission = _def
 
                    self.sa.add(g2p)
 

	
 
            if form_result['overwrite_default_user_group']:
 
                _def_name = form_result['default_user_group_perm'].split('usergroup.')[-1]
 
                # groups
 
                _def = Permission.get_by_key('usergroup.' + _def_name)
 
                for g2p in self.sa.query(UserUserGroupToPerm) \
 
                               .filter(UserUserGroupToPerm.user == perm_user) \
 
                               .all():
 
                    g2p.permission = _def
 
                    self.sa.add(g2p)
 

	
 
            self.sa.commit()
 
        except (DatabaseError,):
 
            log.error(traceback.format_exc())
 
            self.sa.rollback()
 
            raise
kallithea/model/pull_request.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.pull_request
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
pull request model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 6, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import datetime
 

	
 
from pylons.i18n.translation import _
 

	
 
from sqlalchemy.orm import joinedload
 

	
 
from kallithea.model.meta import Session
 
from kallithea.lib import helpers as h
 
from kallithea.lib.exceptions import UserInvalidException
 
from kallithea.model.base import BaseModel
 
from kallithea.model.db import PullRequest, PullRequestReviewer, Notification, \
 
    ChangesetStatus, User
 
from kallithea.model.notification import NotificationModel
 
from kallithea.lib.utils2 import extract_mentioned_users, safe_unicode
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class PullRequestModel(BaseModel):
 

	
 
    def _get_valid_reviewers(self, seq):
 
        """ Generate User objects from a sequence of user IDs, usernames or
 
        User objects. Raises UserInvalidException if the DEFAULT user is
 
        specified, or if a given ID or username does not match any user.
 
        """
 
        for user_spec in seq:
 
            user = User.guess_instance(user_spec)
 
            if user is None or user.username == User.DEFAULT_USER:
 
            if user is None or user.is_default_user:
 
                raise UserInvalidException(user_spec)
 
            yield user
 

	
 
    def create(self, created_by, org_repo, org_ref, other_repo, other_ref,
 
               revisions, title, description, reviewers):
 
        pr = PullRequest()
 
        pr.org_repo = org_repo
 
        pr.org_ref = org_ref
 
        pr.other_repo = other_repo
 
        pr.other_ref = other_ref
 
        pr.revisions = revisions
 
        pr.title = title
 
        pr.description = description
 
        pr.owner = created_by
 
        Session().add(pr)
 
        Session().flush() # make database assign pull_request_id
 

	
 
        #reset state to under-review
 
        from kallithea.model.changeset_status import ChangesetStatusModel
 
        from kallithea.model.comment import ChangesetCommentsModel
 
        comment = ChangesetCommentsModel().create(
 
            text=u'',
 
            repo=org_repo,
 
            author=created_by,
 
            pull_request=pr,
 
            send_email=False,
 
            status_change=ChangesetStatus.STATUS_UNDER_REVIEW,
 
        )
 
        ChangesetStatusModel().set_status(
 
            org_repo,
 
            ChangesetStatus.STATUS_UNDER_REVIEW,
 
            created_by,
 
            comment,
 
            pull_request=pr,
 
        )
 

	
 
        reviewers = set(self._get_valid_reviewers(reviewers))
 
        mention_recipients = extract_mentioned_users(description)
 
        self.add_reviewers(created_by, pr, reviewers, mention_recipients)
 

	
 
        return pr
 

	
 
    def add_reviewers(self, user, pr, reviewers, mention_recipients=None):
 
        """Add reviewer and send notification to them.
 
        """
 
        reviewer_users = set(self._get_valid_reviewers(reviewers))
 
        #members
 
        for reviewer in reviewer_users:
 
            prr = PullRequestReviewer(reviewer, pr)
 
            Session().add(prr)
 

	
 
        #notification to reviewers
 
        pr_url = pr.url(canonical=True)
 
        threading = ['%s-pr-%s@%s' % (pr.other_repo.repo_name,
 
                                      pr.pull_request_id,
 
                                      h.canonical_hostname())]
 
        subject = safe_unicode(
 
            h.link_to(
 
              _('%(user)s wants you to review pull request %(pr_nice_id)s: %(pr_title)s') % \
 
                {'user': user.username,
 
                 'pr_title': pr.title,
 
                 'pr_nice_id': pr.nice_id()},
 
                pr_url)
 
            )
 
        body = pr.description
 
        _org_ref_type, org_ref_name, _org_rev = pr.org_ref.split(':')
 
        _other_ref_type, other_ref_name, _other_rev = pr.other_ref.split(':')
 
        revision_data = [(x.raw_id, x.message)
 
                         for x in map(pr.org_repo.get_changeset, pr.revisions)]
 
        email_kwargs = {
 
            'pr_title': pr.title,
 
            'pr_title_short': h.shorter(pr.title, 50),
 
            'pr_user_created': user.full_name_and_username,
 
            'pr_repo_url': h.canonical_url('summary_home', repo_name=pr.other_repo.repo_name),
 
            'pr_url': pr_url,
 
            'pr_revisions': revision_data,
 
            'repo_name': pr.other_repo.repo_name,
 
            'org_repo_name': pr.org_repo.repo_name,
 
            'pr_nice_id': pr.nice_id(),
 
            'pr_target_repo': h.canonical_url('summary_home',
 
                               repo_name=pr.other_repo.repo_name),
 
            'pr_target_branch': other_ref_name,
 
            'pr_source_repo': h.canonical_url('summary_home',
 
                               repo_name=pr.org_repo.repo_name),
 
            'pr_source_branch': org_ref_name,
 
            'pr_owner': pr.owner,
 
            'pr_owner_username': pr.owner.username,
 
            'pr_username': user.username,
 
            'threading': threading,
 
            'is_mention': False,
 
            }
 
        if reviewers:
 
            NotificationModel().create(created_by=user, subject=subject, body=body,
 
                                       recipients=reviewers,
 
                                       type_=Notification.TYPE_PULL_REQUEST,
 
                                       email_kwargs=email_kwargs)
kallithea/model/repo_group.py
Show inline comments
 
@@ -109,193 +109,193 @@ class RepoGroupModel(BaseModel):
 
        shutil.move(old_path, new_path)
 

	
 
    def _delete_group(self, group, force_delete=False):
 
        """
 
        Deletes a group from a filesystem
 

	
 
        :param group: instance of group from database
 
        :param force_delete: use shutil rmtree to remove all objects
 
        """
 
        paths = group.full_path.split(RepoGroup.url_sep())
 
        paths = os.sep.join(paths)
 

	
 
        rm_path = os.path.join(self.repos_path, paths)
 
        log.info("Removing group %s", rm_path)
 
        # delete only if that path really exists
 
        if os.path.isdir(rm_path):
 
            if force_delete:
 
                shutil.rmtree(rm_path)
 
            else:
 
                #archive that group`
 
                _now = datetime.datetime.now()
 
                _ms = str(_now.microsecond).rjust(6, '0')
 
                _d = 'rm__%s_GROUP_%s' % (_now.strftime('%Y%m%d_%H%M%S_' + _ms),
 
                                          group.name)
 
                shutil.move(rm_path, os.path.join(self.repos_path, _d))
 

	
 
    def create(self, group_name, group_description, owner, parent=None,
 
               just_db=False, copy_permissions=False):
 
        try:
 
            owner = User.guess_instance(owner)
 
            parent_group = RepoGroup.guess_instance(parent)
 
            new_repo_group = RepoGroup()
 
            new_repo_group.owner = owner
 
            new_repo_group.group_description = group_description or group_name
 
            new_repo_group.parent_group = parent_group
 
            new_repo_group.group_name = new_repo_group.get_new_name(group_name)
 

	
 
            self.sa.add(new_repo_group)
 

	
 
            # create an ADMIN permission for owner except if we're super admin,
 
            # later owner should go into the owner field of groups
 
            if not owner.is_admin:
 
                self.grant_user_permission(repo_group=new_repo_group,
 
                                           user=owner, perm='group.admin')
 

	
 
            if parent_group and copy_permissions:
 
                # copy permissions from parent
 
                user_perms = UserRepoGroupToPerm.query() \
 
                    .filter(UserRepoGroupToPerm.group == parent_group).all()
 

	
 
                group_perms = UserGroupRepoGroupToPerm.query() \
 
                    .filter(UserGroupRepoGroupToPerm.group == parent_group).all()
 

	
 
                for perm in user_perms:
 
                    # don't copy over the permission for user who is creating
 
                    # this group, if he is not super admin he get's admin
 
                    # permission set above
 
                    if perm.user != owner or owner.is_admin:
 
                        UserRepoGroupToPerm.create(perm.user, new_repo_group, perm.permission)
 

	
 
                for perm in group_perms:
 
                    UserGroupRepoGroupToPerm.create(perm.users_group, new_repo_group, perm.permission)
 
            else:
 
                perm_obj = self._create_default_perms(new_repo_group)
 
                self.sa.add(perm_obj)
 

	
 
            if not just_db:
 
                # we need to flush here, in order to check if database won't
 
                # throw any exceptions, create filesystem dirs at the very end
 
                self.sa.flush()
 
                self._create_group(new_repo_group.group_name)
 

	
 
            return new_repo_group
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def _update_permissions(self, repo_group, perms_new=None,
 
                            perms_updates=None, recursive=None,
 
                            check_perms=True):
 
        from kallithea.model.repo import RepoModel
 
        from kallithea.lib.auth import HasUserGroupPermissionLevel
 

	
 
        if not perms_new:
 
            perms_new = []
 
        if not perms_updates:
 
            perms_updates = []
 

	
 
        def _set_perm_user(obj, user, perm):
 
            if isinstance(obj, RepoGroup):
 
                self.grant_user_permission(repo_group=obj, user=user, perm=perm)
 
            elif isinstance(obj, Repository):
 
                user = User.guess_instance(user)
 

	
 
                # private repos will not allow to change the default permissions
 
                # using recursive mode
 
                if obj.private and user.username == User.DEFAULT_USER:
 
                if obj.private and user.is_default_user:
 
                    return
 

	
 
                # we set group permission but we have to switch to repo
 
                # permission
 
                perm = perm.replace('group.', 'repository.')
 
                RepoModel().grant_user_permission(
 
                    repo=obj, user=user, perm=perm
 
                )
 

	
 
        def _set_perm_group(obj, users_group, perm):
 
            if isinstance(obj, RepoGroup):
 
                self.grant_user_group_permission(repo_group=obj,
 
                                                  group_name=users_group,
 
                                                  perm=perm)
 
            elif isinstance(obj, Repository):
 
                # we set group permission but we have to switch to repo
 
                # permission
 
                perm = perm.replace('group.', 'repository.')
 
                RepoModel().grant_user_group_permission(
 
                    repo=obj, group_name=users_group, perm=perm
 
                )
 

	
 
        # start updates
 
        updates = []
 
        log.debug('Now updating permissions for %s in recursive mode:%s',
 
                  repo_group, recursive)
 

	
 
        for obj in repo_group.recursive_groups_and_repos():
 
            # iterated obj is an instance of a repos group or repository in
 
            # that group, recursive option can be: none, repos, groups, all
 
            if recursive == 'all':
 
                pass
 
            elif recursive == 'repos':
 
                # skip groups, other than this one
 
                if isinstance(obj, RepoGroup) and not obj == repo_group:
 
                    continue
 
            elif recursive == 'groups':
 
                # skip repos
 
                if isinstance(obj, Repository):
 
                    continue
 
            else:  # recursive == 'none': # DEFAULT don't apply to iterated objects
 
                obj = repo_group
 
                # also we do a break at the end of this loop.
 

	
 
            # update permissions
 
            for member, perm, member_type in perms_updates:
 
                ## set for user
 
                if member_type == 'user':
 
                    # this updates also current one if found
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                ## set for user group
 
                else:
 
                    #check if we have permissions to alter this usergroup's access
 
                    if not check_perms or HasUserGroupPermissionLevel('read')(member):
 
                        _set_perm_group(obj, users_group=member, perm=perm)
 
            # set new permissions
 
            for member, perm, member_type in perms_new:
 
                if member_type == 'user':
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                else:
 
                    #check if we have permissions to alter this usergroup's access
 
                    if not check_perms or HasUserGroupPermissionLevel('read')(member):
 
                        _set_perm_group(obj, users_group=member, perm=perm)
 
            updates.append(obj)
 
            # if it's not recursive call for all,repos,groups
 
            # break the loop and don't proceed with other changes
 
            if recursive not in ['all', 'repos', 'groups']:
 
                break
 

	
 
        return updates
 

	
 
    def update(self, repo_group, form_data):
 

	
 
        try:
 
            repo_group = RepoGroup.guess_instance(repo_group)
 
            old_path = repo_group.full_path
 

	
 
            # change properties
 
            repo_group.group_description = form_data['group_description']
 
            repo_group.parent_group_id = form_data['parent_group_id']
 
            repo_group.enable_locking = form_data['enable_locking']
 

	
 
            repo_group.parent_group = RepoGroup.get(form_data['parent_group_id'])
 
            repo_group.group_name = repo_group.get_new_name(form_data['group_name'])
 
            new_path = repo_group.full_path
 
            self.sa.add(repo_group)
 

	
 
            # iterate over all members of this groups and do fixes
 
            # set locking if given
 
            # if obj is a repoGroup also fix the name of the group according
 
            # to the parent
 
            # if obj is a Repo fix it's name
 
            # this can be potentially heavy operation
 
            for obj in repo_group.recursive_groups_and_repos():
 
                #set the value from it's parent
 
                obj.enable_locking = repo_group.enable_locking
kallithea/model/user.py
Show inline comments
 
@@ -117,234 +117,234 @@ class UserModel(BaseModel):
 
        :param cur_user:
 
        """
 
        if not cur_user:
 
            cur_user = getattr(get_current_authuser(), 'username', None)
 

	
 
        from kallithea.lib.auth import get_crypt_password, check_password
 
        from kallithea.lib.hooks import log_create_user, \
 
            check_allowed_create_user
 
        user_data = {
 
            'username': username, 'password': password,
 
            'email': email, 'firstname': firstname, 'lastname': lastname,
 
            'active': active, 'admin': admin
 
        }
 
        # raises UserCreationError if it's not allowed
 
        check_allowed_create_user(user_data, cur_user)
 

	
 
        log.debug('Checking for %s account in Kallithea database', username)
 
        user = User.get_by_username(username, case_insensitive=True)
 
        if user is None:
 
            log.debug('creating new user %s', username)
 
            new_user = User()
 
            edit = False
 
        else:
 
            log.debug('updating user %s', username)
 
            new_user = user
 
            edit = True
 

	
 
        try:
 
            new_user.username = username
 
            new_user.admin = admin
 
            new_user.email = email
 
            new_user.active = active
 
            new_user.extern_name = safe_str(extern_name) \
 
                if extern_name else None
 
            new_user.extern_type = safe_str(extern_type) \
 
                if extern_type else None
 
            new_user.name = firstname
 
            new_user.lastname = lastname
 

	
 
            if not edit:
 
                new_user.api_key = generate_api_key()
 

	
 
            # set password only if creating an user or password is changed
 
            password_change = new_user.password and \
 
                not check_password(password, new_user.password)
 
            if not edit or password_change:
 
                reason = 'new password' if edit else 'new user'
 
                log.debug('Updating password reason=>%s', reason)
 
                new_user.password = get_crypt_password(password) \
 
                    if password else ''
 

	
 
            if user is None:
 
                Session().add(new_user)
 
                Session().flush() # make database assign new_user.user_id
 

	
 
            if not edit:
 
                log_create_user(new_user.get_dict(), cur_user)
 

	
 
            return new_user
 
        except (DatabaseError,):
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def create_registration(self, form_data):
 
        from kallithea.model.notification import NotificationModel
 
        import kallithea.lib.helpers as h
 

	
 
        form_data['admin'] = False
 
        form_data['extern_type'] = User.DEFAULT_AUTH_TYPE
 
        form_data['extern_name'] = ''
 
        new_user = self.create(form_data)
 

	
 
        # notification to admins
 
        subject = _('New user registration')
 
        body = (
 
            u'New user registration\n'
 
            '---------------------\n'
 
            '- Username: {user.username}\n'
 
            '- Full Name: {user.full_name}\n'
 
            '- Email: {user.email}\n'
 
            ).format(user=new_user)
 
        edit_url = h.canonical_url('edit_user', id=new_user.user_id)
 
        email_kwargs = {
 
            'registered_user_url': edit_url,
 
            'new_username': new_user.username,
 
            'new_email': new_user.email,
 
            'new_full_name': new_user.full_name}
 
        NotificationModel().create(created_by=new_user, subject=subject,
 
                                   body=body, recipients=None,
 
                                   type_=Notification.TYPE_REGISTRATION,
 
                                   email_kwargs=email_kwargs)
 

	
 
    def update(self, user_id, form_data, skip_attrs=None):
 
        from kallithea.lib.auth import get_crypt_password
 
        skip_attrs = skip_attrs or []
 
        user = self.get(user_id, cache=False)
 
        if user.username == User.DEFAULT_USER:
 
        if user.is_default_user:
 
            raise DefaultUserException(
 
                            _("You can't edit this user since it's "
 
                              "crucial for entire application"))
 

	
 
        for k, v in form_data.items():
 
            if k in skip_attrs:
 
                continue
 
            if k == 'new_password' and v:
 
                user.password = get_crypt_password(v)
 
            else:
 
                # old legacy thing orm models store firstname as name,
 
                # need proper refactor to username
 
                if k == 'firstname':
 
                    k = 'name'
 
                setattr(user, k, v)
 
        self.sa.add(user)
 

	
 
    def update_user(self, user, **kwargs):
 
        from kallithea.lib.auth import get_crypt_password
 

	
 
        user = User.guess_instance(user)
 
        if user.username == User.DEFAULT_USER:
 
        if user.is_default_user:
 
            raise DefaultUserException(
 
                _("You can't edit this user since it's"
 
                  " crucial for entire application")
 
            )
 

	
 
        for k, v in kwargs.items():
 
            if k == 'password' and v:
 
                v = get_crypt_password(v)
 

	
 
            setattr(user, k, v)
 
        self.sa.add(user)
 
        return user
 

	
 
    def delete(self, user, cur_user=None):
 
        if cur_user is None:
 
            cur_user = getattr(get_current_authuser(), 'username', None)
 
        user = User.guess_instance(user)
 

	
 
        if user.username == User.DEFAULT_USER:
 
        if user.is_default_user:
 
            raise DefaultUserException(
 
                _("You can't remove this user since it is"
 
                  " crucial for the entire application"))
 
        if user.repositories:
 
            repos = [x.repo_name for x in user.repositories]
 
            raise UserOwnsReposException(
 
                _('User "%s" still owns %s repositories and cannot be '
 
                  'removed. Switch owners or remove those repositories: %s')
 
                % (user.username, len(repos), ', '.join(repos)))
 
        if user.repo_groups:
 
            repogroups = [x.group_name for x in user.repo_groups]
 
            raise UserOwnsReposException(_(
 
                'User "%s" still owns %s repository groups and cannot be '
 
                'removed. Switch owners or remove those repository groups: %s')
 
                % (user.username, len(repogroups), ', '.join(repogroups)))
 
        if user.user_groups:
 
            usergroups = [x.users_group_name for x in user.user_groups]
 
            raise UserOwnsReposException(
 
                _('User "%s" still owns %s user groups and cannot be '
 
                  'removed. Switch owners or remove those user groups: %s')
 
                % (user.username, len(usergroups), ', '.join(usergroups)))
 
        self.sa.delete(user)
 

	
 
        from kallithea.lib.hooks import log_delete_user
 
        log_delete_user(user.get_dict(), cur_user)
 

	
 
    def can_change_password(self, user):
 
        from kallithea.lib import auth_modules
 
        managed_fields = auth_modules.get_managed_fields(user)
 
        return 'password' not in managed_fields
 

	
 
    def get_reset_password_token(self, user, timestamp, session_id):
 
        """
 
        The token is a 40-digit hexstring, calculated as a HMAC-SHA1.
 

	
 
        In a traditional HMAC scenario, an attacker is unable to know or
 
        influence the secret key, but can know or influence the message
 
        and token. This scenario is slightly different (in particular
 
        since the message sender is also the message recipient), but
 
        sufficiently similar to use an HMAC. Benefits compared to a plain
 
        SHA1 hash includes resistance against a length extension attack.
 

	
 
        The HMAC key consists of the following values (known only to the
 
        server and authorized users):
 

	
 
        * per-application secret (the `app_instance_uuid` setting), without
 
          which an attacker cannot counterfeit tokens
 
        * hashed user password, invalidating the token upon password change
 

	
 
        The HMAC message consists of the following values (potentially known
 
        to an attacker):
 

	
 
        * session ID (the anti-CSRF token), requiring an attacker to have
 
          access to the browser session in which the token was created
 
        * numeric user ID, limiting the token to a specific user (yet allowing
 
          users to be renamed)
 
        * user email address
 
        * time of token issue (a Unix timestamp, to enable token expiration)
 

	
 
        The key and message values are separated by NUL characters, which are
 
        guaranteed not to occur in any of the values.
 
        """
 
        app_secret = config.get('app_instance_uuid')
 
        return hmac.HMAC(
 
            key=u'\0'.join([app_secret, user.password]).encode('utf-8'),
 
            msg=u'\0'.join([session_id, str(user.user_id), user.email, str(timestamp)]).encode('utf-8'),
 
            digestmod=hashlib.sha1,
 
        ).hexdigest()
 

	
 
    def send_reset_password_email(self, data):
 
        """
 
        Sends email with a password reset token and link to the password
 
        reset confirmation page with all information (including the token)
 
        pre-filled. Also returns URL of that page, only without the token,
 
        allowing users to copy-paste or manually enter the token from the
 
        email.
 
        """
 
        from kallithea.lib.celerylib import tasks
 
        from kallithea.model.notification import EmailNotificationModel
 
        import kallithea.lib.helpers as h
 

	
 
        user_email = data['email']
 
        user = User.get_by_email(user_email)
 
        timestamp = int(time.time())
 
        if user is not None:
 
            if self.can_change_password(user):
 
                log.debug('password reset user %s found', user)
 
                token = self.get_reset_password_token(user,
 
                                                      timestamp,
 
                                                      h.authentication_token())
 
                # URL must be fully qualified; but since the token is locked to
 
                # the current browser session, we must provide a URL with the
 
                # current scheme and hostname, rather than the canonical_url.
 
                link = h.url('reset_password_confirmation', qualified=True,
 
                             email=user_email,
 
                             timestamp=timestamp,
kallithea/tests/api/api_base.py
Show inline comments
 
@@ -123,193 +123,193 @@ class _BaseTestApi(object):
 
            'error': None,
 
            'result': expected
 
        })
 
        given = json.loads(given)
 
        assert expected == given
 

	
 
    def _compare_error(self, id_, expected, given):
 
        expected = jsonify({
 
            'id': id_,
 
            'error': expected,
 
            'result': None
 
        })
 
        given = json.loads(given)
 
        assert expected == given
 

	
 
    def test_Optional_object(self):
 
        from kallithea.controllers.api.api import Optional
 

	
 
        option1 = Optional(None)
 
        assert '<Optional:%s>' % None == repr(option1)
 
        assert option1() == None
 

	
 
        assert 1 == Optional.extract(Optional(1))
 
        assert 'trololo' == Optional.extract('trololo')
 

	
 
    def test_Optional_OAttr(self):
 
        from kallithea.controllers.api.api import Optional, OAttr
 

	
 
        option1 = Optional(OAttr('apiuser'))
 
        assert 'apiuser' == Optional.extract(option1)
 

	
 
    def test_OAttr_object(self):
 
        from kallithea.controllers.api.api import OAttr
 

	
 
        oattr1 = OAttr('apiuser')
 
        assert '<OptionalAttr:apiuser>' == repr(oattr1)
 
        assert oattr1() == oattr1
 

	
 
    def test_api_wrong_key(self):
 
        id_, params = _build_data('trololo', 'get_user')
 
        response = api_call(self, params)
 

	
 
        expected = 'Invalid API key'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param_args_null(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        params = params.replace('"args": {}', '"args": null')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param_args_bad(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        params = params.replace('"args": {}', '"args": 1')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_args_is_null(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        params = params.replace('"args": {}', '"args": null')
 
        response = api_call(self, params)
 
        assert response.status == '200 OK'
 

	
 
    def test_api_args_is_bad(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        params = params.replace('"args": {}', '"args": 1')
 
        response = api_call(self, params)
 
        assert response.status == '200 OK'
 

	
 
    def test_api_args_different_args(self):
 
        import string
 
        expected = {
 
            'ascii_letters': string.ascii_letters,
 
            'ws': string.whitespace,
 
            'printables': string.printable
 
        }
 
        id_, params = _build_data(self.apikey, 'test', args=expected)
 
        response = api_call(self, params)
 
        assert response.status == '200 OK'
 
        self._compare_ok(id_, expected, response.body)
 

	
 
    def test_api_get_users(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        response = api_call(self, params)
 
        ret_all = []
 
        _users = User.query().filter(User.username != User.DEFAULT_USER) \
 
        _users = User.query().filter_by(is_default_user=False) \
 
            .order_by(User.username).all()
 
        for usr in _users:
 
            ret = usr.get_api_data()
 
            ret_all.append(jsonify(ret))
 
        expected = ret_all
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user(self):
 
        id_, params = _build_data(self.apikey, 'get_user',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(dbuser=usr).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_that_does_not_exist(self):
 
        id_, params = _build_data(self.apikey, 'get_user',
 
                                  userid='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "user `%s` does not exist" % 'trololo'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_without_giving_userid(self):
 
        id_, params = _build_data(self.apikey, 'get_user')
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(dbuser=usr).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_without_giving_userid_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_user')
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(self.TEST_USER_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(dbuser=usr).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_with_giving_userid_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_user',
 
                                  userid=self.TEST_USER_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = 'userid is not the same as your user'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_pull(self):
 
        repo_name = u'test_pull'
 
        r = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        r.clone_uri = os.path.join(Ui.get_by_key('paths', '/').ui_value, self.REPO)
 
        Session.commit()
 

	
 
        id_, params = _build_data(self.apikey, 'pull',
 
                                  repoid=repo_name,)
 
        response = api_call(self, params)
 

	
 
        expected = {'msg': 'Pulled from `%s`' % repo_name,
 
                    'repository': repo_name}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_pull_error(self):
 
        id_, params = _build_data(self.apikey, 'pull',
 
                                  repoid=self.REPO, )
 
        response = api_call(self, params)
 

	
 
        expected = 'Unable to pull changes from `%s`' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_rescan_repos(self):
 
        id_, params = _build_data(self.apikey, 'rescan_repos')
 
        response = api_call(self, params)
 

	
 
        expected = {'added': [], 'removed': []}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(ScmModel, 'repo_scan', crash)
 
    def test_api_rescann_error(self):
 
        id_, params = _build_data(self.apikey, 'rescan_repos', )
 
        response = api_call(self, params)
 

	
 
        expected = 'Error occurred during rescan repositories action'
 
        self._compare_error(id_, expected, given=response.body)
 

	
kallithea/tests/other/manual_test_vcs_operations.py
Show inline comments
 
@@ -65,197 +65,197 @@ class Command(object):
 

	
 
        command = cmd + ' ' + ' '.join(args)
 
        if DEBUG:
 
            print '*** CMD %s ***' % command
 
        testenv = dict(os.environ)
 
        testenv['LANG'] = 'en_US.UTF-8'
 
        testenv['LANGUAGE'] = 'en_US:en'
 
        testenv.update(environ)
 
        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd, env=testenv)
 
        stdout, stderr = p.communicate()
 
        if DEBUG:
 
            if stdout:
 
                print 'stdout:', repr(stdout)
 
            if stderr:
 
                print 'stderr:', repr(stderr)
 
        return stdout, stderr
 

	
 

	
 
def _get_tmp_dir():
 
    return tempfile.mkdtemp(prefix='rc_integration_test')
 

	
 

	
 
def _construct_url(repo, dest=None, **kwargs):
 
    if dest is None:
 
        #make temp clone
 
        dest = _get_tmp_dir()
 
    params = {
 
        'user': TEST_USER_ADMIN_LOGIN,
 
        'passwd': TEST_USER_ADMIN_PASS,
 
        'host': HOST,
 
        'cloned_repo': repo,
 
        'dest': dest
 
    }
 
    params.update(**kwargs)
 
    if params['user'] and params['passwd']:
 
        _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params
 
    else:
 
        _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params
 
    return _url
 

	
 

	
 
def _add_files_and_push(vcs, DEST, **kwargs):
 
    """
 
    Generate some files, add it to DEST repo and push back
 
    vcs is git or hg and defines what VCS we want to make those files for
 

	
 
    :param vcs:
 
    :param DEST:
 
    """
 
    # commit some stuff into this repo
 
    cwd = path = os.path.join(DEST)
 
    #added_file = os.path.join(path, '%ssetupążźć.py' % _RandomNameSequence().next())
 
    added_file = os.path.join(path, '%ssetup.py' % _RandomNameSequence().next())
 
    Command(cwd).execute('touch %s' % added_file)
 
    Command(cwd).execute('%s add %s' % (vcs, added_file))
 

	
 
    email = 'me@example.com'
 
    if os.name == 'nt':
 
        author_str = 'User <%s>' % email
 
    else:
 
        author_str = 'User ǝɯɐᴎ <%s>' % email
 
    for i in xrange(kwargs.get('files_no', 3)):
 
        cmd = """echo "added_line%s" >> %s""" % (i, added_file)
 
        Command(cwd).execute(cmd)
 
        if vcs == 'hg':
 
            cmd = """hg commit -m "committed new %s" -u "%s" "%s" """ % (
 
                i, author_str, added_file
 
            )
 
        elif vcs == 'git':
 
            cmd = """git commit -m "committed new %s" --author "%s" "%s" """ % (
 
                i, author_str, added_file
 
            )
 
        # git commit needs EMAIL on some machines
 
        Command(cwd).execute(cmd, EMAIL=email)
 

	
 
    # PUSH it back
 
    _REPO = None
 
    if vcs == 'hg':
 
        _REPO = HG_REPO
 
    elif vcs == 'git':
 
        _REPO = GIT_REPO
 

	
 
    kwargs['dest'] = ''
 
    clone_url = _construct_url(_REPO, **kwargs)
 
    if 'clone_url' in kwargs:
 
        clone_url = kwargs['clone_url']
 
    stdout = stderr = None
 
    if vcs == 'hg':
 
        stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
 
    elif vcs == 'git':
 
        stdout, stderr = Command(cwd).execute('git push --verbose', clone_url + " master")
 

	
 
    return stdout, stderr
 

	
 

	
 
def set_anonymous_access(enable=True):
 
    user = User.get_by_username(User.DEFAULT_USER)
 
    user = User.get_default_user()
 
    user.active = enable
 
    Session().commit()
 
    print '\tanonymous access is now:', enable
 
    if enable != User.get_by_username(User.DEFAULT_USER).active:
 
    if enable != User.get_default_user().active:
 
        raise Exception('Cannot set anonymous access')
 

	
 

	
 
#==============================================================================
 
# TESTS
 
#==============================================================================
 

	
 

	
 
def _check_proper_git_push(stdout, stderr):
 
    #WTF Git stderr is output ?!
 
    assert 'fatal' not in stderr
 
    assert 'rejected' not in stderr
 
    assert 'Pushing to' in stderr
 
    assert 'master -> master' in stderr
 

	
 

	
 
class TestVCSOperations(TestController):
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        #DISABLE ANONYMOUS ACCESS
 
        set_anonymous_access(False)
 

	
 
    def setup_method(self, method):
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        Repository.unlock(r)
 
        r.enable_locking = False
 
        Session().commit()
 

	
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        Repository.unlock(r)
 
        r.enable_locking = False
 
        Session().commit()
 

	
 
    def test_clone_hg_repo_by_admin(self):
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        assert 'requesting all changes' in stdout
 
        assert 'adding changesets' in stdout
 
        assert 'adding manifests' in stdout
 
        assert 'adding file changes' in stdout
 

	
 
        assert stderr == ''
 

	
 
    def test_clone_git_repo_by_admin(self):
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        assert 'Cloning into' in stdout + stderr
 
        assert stderr == '' or stdout == ''
 

	
 
    def test_clone_wrong_credentials_hg(self):
 
        clone_url = _construct_url(HG_REPO, passwd='bad!')
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 
        assert 'abort: authorization failed' in stderr
 

	
 
    def test_clone_wrong_credentials_git(self):
 
        clone_url = _construct_url(GIT_REPO, passwd='bad!')
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 
        assert 'fatal: Authentication failed' in stderr
 

	
 
    def test_clone_git_dir_as_hg(self):
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_clone_hg_repo_as_git(self):
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 
        assert 'not found' in stderr
 

	
 
    def test_clone_non_existing_path_hg(self):
 
        clone_url = _construct_url('trololo')
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_clone_non_existing_path_git(self):
 
        clone_url = _construct_url('trololo')
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 
        assert 'not found' in stderr
 

	
 
    def test_push_new_file_hg(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(HG_REPO, fork_name)
 
        clone_url = _construct_url(fork_name).split()[0]
 
        stdout, stderr = _add_files_and_push('hg', DEST, clone_url=clone_url)
 

	
 
        assert 'pushing to' in stdout
 
        assert 'Repository size' in stdout
 
        assert 'Last revision is now' in stdout
 

	
0 comments (0 inline, 0 general)