Changeset - 2e3e1dacdbb7
[Not reviewed]
default
0 5 0
Mads Kiilerich - 7 years ago 2018-12-26 02:11:55
mads@kiilerich.com
auth: drop confusing and layering-violating User.AuthUser property

Keep it simple and just explicitly create an AuthUser if "needed". That can
perhaps be simplified further later on.
5 files changed with 7 insertions and 15 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/admin/my_account.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.admin.my_account
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
my account controller for Kallithea admin
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: August 20, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 
import formencode
 

	
 
from sqlalchemy import func
 
from formencode import htmlfill
 
from tg import request, tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPFound
 

	
 
from kallithea.config.routing import url
 
from kallithea.lib import helpers as h
 
from kallithea.lib import auth_modules
 
from kallithea.lib.auth import LoginRequired, AuthUser
 
from kallithea.lib.base import BaseController, render
 
from kallithea.lib.utils2 import generate_api_key, safe_int
 
from kallithea.model.db import Repository, UserEmailMap, User, UserFollowing
 
from kallithea.model.forms import UserForm, PasswordChangeForm
 
from kallithea.model.user import UserModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.api_key import ApiKeyModel
 
from kallithea.model.meta import Session
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class MyAccountController(BaseController):
 
    """REST Controller styled on the Atom Publishing Protocol"""
 
    # To properly map this controller, ensure your config/routing.py
 
    # file has a resource setup:
 
    #     map.resource('setting', 'settings', controller='admin/settings',
 
    #         path_prefix='/admin', name_prefix='admin_')
 

	
 
    @LoginRequired()
 
    def _before(self, *args, **kwargs):
 
        super(MyAccountController, self)._before(*args, **kwargs)
 

	
 
    def __load_data(self):
 
        c.user = User.get(request.authuser.user_id)
 
        if c.user.is_default_user:
 
            h.flash(_("You can't edit this user since it's"
 
                      " crucial for entire application"), category='warning')
 
            raise HTTPFound(location=url('users'))
 

	
 
    def _load_my_repos_data(self, watched=False):
 
        if watched:
 
            admin = False
 
            repos_list = Session().query(Repository) \
 
                         .join(UserFollowing) \
 
                         .filter(UserFollowing.user_id ==
 
                                 request.authuser.user_id).all()
 
        else:
 
            admin = True
 
            repos_list = Session().query(Repository) \
 
                         .filter(Repository.owner_id ==
 
                                 request.authuser.user_id).all()
 

	
 
        return RepoModel().get_repos_as_dict(repos_list, admin=admin)
 

	
 
    def my_account(self):
 
        c.active = 'profile'
 
        self.__load_data()
 
        c.perm_user = AuthUser(user_id=request.authuser.user_id)
 
        managed_fields = auth_modules.get_managed_fields(c.user)
 
        def_user_perms = User.get_default_user().AuthUser.permissions['global']
 
        def_user_perms = AuthUser(dbuser=User.get_default_user()).permissions['global']
 
        if 'hg.register.none' in def_user_perms:
 
            managed_fields.extend(['username', 'firstname', 'lastname', 'email'])
 

	
 
        c.readonly = lambda n: 'readonly' if n in managed_fields else None
 

	
 
        defaults = c.user.get_dict()
 
        update = False
 
        if request.POST:
 
            _form = UserForm(edit=True,
 
                             old_data={'user_id': request.authuser.user_id,
 
                                       'email': request.authuser.email})()
 
            form_result = {}
 
            try:
 
                post_data = dict(request.POST)
 
                post_data['new_password'] = ''
 
                post_data['password_confirmation'] = ''
 
                form_result = _form.to_python(post_data)
 
                # skip updating those attrs for my account
 
                skip_attrs = ['admin', 'active', 'extern_type', 'extern_name',
 
                              'new_password', 'password_confirmation',
 
                             ] + managed_fields
 

	
 
                UserModel().update(request.authuser.user_id, form_result,
 
                                   skip_attrs=skip_attrs)
 
                h.flash(_('Your account was updated successfully'),
 
                        category='success')
 
                Session().commit()
 
                update = True
 

	
 
            except formencode.Invalid as errors:
 
                return htmlfill.render(
 
                    render('admin/my_account/my_account.html'),
 
                    defaults=errors.value,
 
                    errors=errors.error_dict or {},
 
                    prefix_error=False,
 
                    encoding="UTF-8",
 
                    force_defaults=False)
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                h.flash(_('Error occurred during update of user %s')
 
                        % form_result.get('username'), category='error')
 
        if update:
 
            raise HTTPFound(location='my_account')
 
        return htmlfill.render(
 
            render('admin/my_account/my_account.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def my_account_password(self):
 
        c.active = 'password'
 
        self.__load_data()
 

	
 
        managed_fields = auth_modules.get_managed_fields(c.user)
 
        c.can_change_password = 'password' not in managed_fields
 

	
 
        if request.POST and c.can_change_password:
 
            _form = PasswordChangeForm(request.authuser.username)()
 
            try:
 
                form_result = _form.to_python(request.POST)
 
                UserModel().update(request.authuser.user_id, form_result)
 
                Session().commit()
 
                h.flash(_("Successfully updated password"), category='success')
 
            except formencode.Invalid as errors:
 
                return htmlfill.render(
 
                    render('admin/my_account/my_account.html'),
 
                    defaults=errors.value,
 
                    errors=errors.error_dict or {},
 
                    prefix_error=False,
 
                    encoding="UTF-8",
 
                    force_defaults=False)
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                h.flash(_('Error occurred during update of user password'),
 
                        category='error')
 
        return render('admin/my_account/my_account.html')
 

	
 
    def my_account_repos(self):
 
        c.active = 'repos'
 
        self.__load_data()
 

	
 
        # data used to render the grid
 
        c.data = self._load_my_repos_data()
 
        return render('admin/my_account/my_account.html')
 

	
 
    def my_account_watched(self):
 
        c.active = 'watched'
 
        self.__load_data()
 

	
 
        # data used to render the grid
 
        c.data = self._load_my_repos_data(watched=True)
 
        return render('admin/my_account/my_account.html')
 

	
 
    def my_account_perms(self):
 
        c.active = 'perms'
 
        self.__load_data()
 
        c.perm_user = AuthUser(user_id=request.authuser.user_id)
 

	
 
        return render('admin/my_account/my_account.html')
 

	
 
    def my_account_emails(self):
 
        c.active = 'emails'
 
        self.__load_data()
 

	
 
        c.user_email_map = UserEmailMap.query() \
 
            .filter(UserEmailMap.user == c.user).all()
 
        return render('admin/my_account/my_account.html')
 

	
 
    def my_account_emails_add(self):
 
        email = request.POST.get('new_email')
 

	
 
        try:
 
            UserModel().add_extra_email(request.authuser.user_id, email)
 
            Session().commit()
 
            h.flash(_("Added email %s to user") % email, category='success')
 
        except formencode.Invalid as error:
 
            msg = error.error_dict['email']
 
            h.flash(msg, category='error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during email saving'),
 
                    category='error')
 
        raise HTTPFound(location=url('my_account_emails'))
 

	
 
    def my_account_emails_delete(self):
 
        email_id = request.POST.get('del_email_id')
 
        user_model = UserModel()
 
        user_model.delete_extra_email(request.authuser.user_id, email_id)
 
        Session().commit()
 
        h.flash(_("Removed email from user"), category='success')
 
        raise HTTPFound(location=url('my_account_emails'))
 

	
 
    def my_account_api_keys(self):
 
        c.active = 'api_keys'
 
        self.__load_data()
 
        show_expired = True
 
        c.lifetime_values = [
 
            (str(-1), _('Forever')),
 
            (str(5), _('5 minutes')),
 
            (str(60), _('1 hour')),
 
            (str(60 * 24), _('1 day')),
 
            (str(60 * 24 * 30), _('1 month')),
 
        ]
 
        c.lifetime_options = [(c.lifetime_values, _("Lifetime"))]
 
        c.user_api_keys = ApiKeyModel().get_api_keys(request.authuser.user_id,
 
                                                     show_expired=show_expired)
 
        return render('admin/my_account/my_account.html')
 

	
 
    def my_account_api_keys_add(self):
 
        lifetime = safe_int(request.POST.get('lifetime'), -1)
 
        description = request.POST.get('description')
 
        ApiKeyModel().create(request.authuser.user_id, description, lifetime)
 
        Session().commit()
 
        h.flash(_("API key successfully created"), category='success')
 
        raise HTTPFound(location=url('my_account_api_keys'))
 

	
 
    def my_account_api_keys_delete(self):
 
        api_key = request.POST.get('del_api_key')
 
        if request.POST.get('del_api_key_builtin'):
 
            user = User.get(request.authuser.user_id)
 
            user.api_key = generate_api_key()
 
            Session().commit()
 
            h.flash(_("API key successfully reset"), category='success')
 
        elif api_key:
 
            ApiKeyModel().delete(api_key, request.authuser.user_id)
 
            Session().commit()
 
            h.flash(_("API key successfully deleted"), category='success')
 

	
 
        raise HTTPFound(location=url('my_account_api_keys'))
kallithea/controllers/admin/permissions.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.admin.permissions
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
permissions controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 27, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import logging
 
import traceback
 
import formencode
 
from formencode import htmlfill
 

	
 
from tg import request, tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPFound
 

	
 
from kallithea.config.routing import url
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import LoginRequired, HasPermissionAnyDecorator
 
from kallithea.lib.auth import LoginRequired, HasPermissionAnyDecorator, AuthUser
 
from kallithea.lib.base import BaseController, render
 
from kallithea.model.forms import DefaultPermissionsForm
 
from kallithea.model.permission import PermissionModel
 
from kallithea.model.db import User, UserIpMap
 
from kallithea.model.meta import Session
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class PermissionsController(BaseController):
 
    """REST Controller styled on the Atom Publishing Protocol"""
 
    # To properly map this controller, ensure your config/routing.py
 
    # file has a resource setup:
 
    #     map.resource('permission', 'permissions')
 

	
 
    @LoginRequired()
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def _before(self, *args, **kwargs):
 
        super(PermissionsController, self)._before(*args, **kwargs)
 

	
 
    def __load_data(self):
 
        c.repo_perms_choices = [('repository.none', _('None'),),
 
                                   ('repository.read', _('Read'),),
 
                                   ('repository.write', _('Write'),),
 
                                   ('repository.admin', _('Admin'),)]
 
        c.group_perms_choices = [('group.none', _('None'),),
 
                                 ('group.read', _('Read'),),
 
                                 ('group.write', _('Write'),),
 
                                 ('group.admin', _('Admin'),)]
 
        c.user_group_perms_choices = [('usergroup.none', _('None'),),
 
                                      ('usergroup.read', _('Read'),),
 
                                      ('usergroup.write', _('Write'),),
 
                                      ('usergroup.admin', _('Admin'),)]
 
        c.register_choices = [
 
            ('hg.register.none',
 
                _('Disabled')),
 
            ('hg.register.manual_activate',
 
                _('Allowed with manual account activation')),
 
            ('hg.register.auto_activate',
 
                _('Allowed with automatic account activation')), ]
 

	
 
        c.extern_activate_choices = [
 
            ('hg.extern_activate.manual', _('Manual activation of external account')),
 
            ('hg.extern_activate.auto', _('Automatic activation of external account')),
 
        ]
 

	
 
        c.repo_create_choices = [('hg.create.none', _('Disabled')),
 
                                 ('hg.create.repository', _('Enabled'))]
 

	
 
        c.repo_create_on_write_choices = [
 
            ('hg.create.write_on_repogroup.true', _('Enabled')),
 
            ('hg.create.write_on_repogroup.false', _('Disabled')),
 
        ]
 

	
 
        c.user_group_create_choices = [('hg.usergroup.create.false', _('Disabled')),
 
                                       ('hg.usergroup.create.true', _('Enabled'))]
 

	
 
        c.repo_group_create_choices = [('hg.repogroup.create.false', _('Disabled')),
 
                                       ('hg.repogroup.create.true', _('Enabled'))]
 

	
 
        c.fork_choices = [('hg.fork.none', _('Disabled')),
 
                          ('hg.fork.repository', _('Enabled'))]
 

	
 
    def permission_globals(self):
 
        c.active = 'globals'
 
        self.__load_data()
 
        if request.POST:
 
            _form = DefaultPermissionsForm(
 
                [x[0] for x in c.repo_perms_choices],
 
                [x[0] for x in c.group_perms_choices],
 
                [x[0] for x in c.user_group_perms_choices],
 
                [x[0] for x in c.repo_create_choices],
 
                [x[0] for x in c.repo_create_on_write_choices],
 
                [x[0] for x in c.repo_group_create_choices],
 
                [x[0] for x in c.user_group_create_choices],
 
                [x[0] for x in c.fork_choices],
 
                [x[0] for x in c.register_choices],
 
                [x[0] for x in c.extern_activate_choices])()
 

	
 
            try:
 
                form_result = _form.to_python(dict(request.POST))
 
                form_result.update({'perm_user_name': 'default'})
 
                PermissionModel().update(form_result)
 
                Session().commit()
 
                h.flash(_('Global permissions updated successfully'),
 
                        category='success')
 

	
 
            except formencode.Invalid as errors:
 
                defaults = errors.value
 

	
 
                return htmlfill.render(
 
                    render('admin/permissions/permissions.html'),
 
                    defaults=defaults,
 
                    errors=errors.error_dict or {},
 
                    prefix_error=False,
 
                    encoding="UTF-8",
 
                    force_defaults=False)
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                h.flash(_('Error occurred during update of permissions'),
 
                        category='error')
 

	
 
            raise HTTPFound(location=url('admin_permissions'))
 

	
 
        c.user = User.get_default_user()
 
        defaults = {'anonymous': c.user.active}
 

	
 
        for p in c.user.user_perms:
 
            if p.permission.permission_name.startswith('repository.'):
 
                defaults['default_repo_perm'] = p.permission.permission_name
 

	
 
            if p.permission.permission_name.startswith('group.'):
 
                defaults['default_group_perm'] = p.permission.permission_name
 

	
 
            if p.permission.permission_name.startswith('usergroup.'):
 
                defaults['default_user_group_perm'] = p.permission.permission_name
 

	
 
            if p.permission.permission_name.startswith('hg.create.write_on_repogroup'):
 
                defaults['create_on_write'] = p.permission.permission_name
 

	
 
            elif p.permission.permission_name.startswith('hg.create.'):
 
                defaults['default_repo_create'] = p.permission.permission_name
 

	
 
            if p.permission.permission_name.startswith('hg.repogroup.'):
 
                defaults['default_repo_group_create'] = p.permission.permission_name
 

	
 
            if p.permission.permission_name.startswith('hg.usergroup.'):
 
                defaults['default_user_group_create'] = p.permission.permission_name
 

	
 
            if p.permission.permission_name.startswith('hg.register.'):
 
                defaults['default_register'] = p.permission.permission_name
 

	
 
            if p.permission.permission_name.startswith('hg.extern_activate.'):
 
                defaults['default_extern_activate'] = p.permission.permission_name
 

	
 
            if p.permission.permission_name.startswith('hg.fork.'):
 
                defaults['default_fork'] = p.permission.permission_name
 

	
 
        return htmlfill.render(
 
            render('admin/permissions/permissions.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def permission_ips(self):
 
        c.active = 'ips'
 
        c.user = User.get_default_user()
 
        c.user_ip_map = UserIpMap.query() \
 
                        .filter(UserIpMap.user == c.user).all()
 

	
 
        return render('admin/permissions/permissions.html')
 

	
 
    def permission_perms(self):
 
        c.active = 'perms'
 
        c.user = User.get_default_user()
 
        c.perm_user = c.user.AuthUser
 
        c.perm_user = AuthUser(dbuser=c.user)
 
        return render('admin/permissions/permissions.html')
kallithea/controllers/login.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.login
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Login controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 22, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import logging
 
import re
 
import formencode
 

	
 
from formencode import htmlfill
 
from tg.i18n import ugettext as _
 
from tg import request, session, tmpl_context as c
 
from webob.exc import HTTPFound, HTTPBadRequest
 

	
 
import kallithea.lib.helpers as h
 
from kallithea.config.routing import url
 
from kallithea.lib.auth import AuthUser, HasPermissionAnyDecorator
 
from kallithea.lib.base import BaseController, log_in_user, render
 
from kallithea.lib.exceptions import UserCreationError
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.model.db import User, Setting
 
from kallithea.model.forms import \
 
    LoginForm, RegisterForm, PasswordResetRequestForm, PasswordResetConfirmationForm
 
from kallithea.model.user import UserModel
 
from kallithea.model.meta import Session
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class LoginController(BaseController):
 

	
 
    def _validate_came_from(self, came_from,
 
            _re=re.compile(r"/(?!/)[-!#$%&'()*+,./:;=?@_~0-9A-Za-z]*$")):
 
        """Return True if came_from is valid and can and should be used.
 

	
 
        Determines if a URI reference is valid and relative to the origin;
 
        or in RFC 3986 terms, whether it matches this production:
 

	
 
          origin-relative-ref = path-absolute [ "?" query ] [ "#" fragment ]
 

	
 
        with the exception that '%' escapes are not validated and '#' is
 
        allowed inside the fragment part.
 
        """
 
        return _re.match(came_from) is not None
 

	
 
    def index(self):
 
        c.came_from = safe_str(request.GET.get('came_from', ''))
 
        if c.came_from:
 
            if not self._validate_came_from(c.came_from):
 
                log.error('Invalid came_from (not server-relative): %r', c.came_from)
 
                raise HTTPBadRequest()
 
        else:
 
            c.came_from = url('home')
 

	
 
        ip_allowed = AuthUser.check_ip_allowed(request.authuser, request.ip_addr)
 

	
 
        # redirect if already logged in
 
        if request.authuser.is_authenticated and ip_allowed:
 
            raise HTTPFound(location=c.came_from)
 

	
 
        if request.POST:
 
            # import Login Form validator class
 
            login_form = LoginForm()()
 
            try:
 
                c.form_result = login_form.to_python(dict(request.POST))
 
                # form checks for username/password, now we're authenticated
 
                username = c.form_result['username']
 
                user = User.get_by_username_or_email(username, case_insensitive=True)
 
            except formencode.Invalid as errors:
 
                defaults = errors.value
 
                # remove password from filling in form again
 
                defaults.pop('password', None)
 
                return htmlfill.render(
 
                    render('/login.html'),
 
                    defaults=errors.value,
 
                    errors=errors.error_dict or {},
 
                    prefix_error=False,
 
                    encoding="UTF-8",
 
                    force_defaults=False)
 
            except UserCreationError as e:
 
                # container auth or other auth functions that create users on
 
                # the fly can throw this exception signaling that there's issue
 
                # with user creation, explanation should be provided in
 
                # Exception itself
 
                h.flash(e, 'error')
 
            else:
 
                log_in_user(user, c.form_result['remember'],
 
                    is_external_auth=False)
 
                raise HTTPFound(location=c.came_from)
 

	
 
        return render('/login.html')
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.register.auto_activate',
 
                               'hg.register.manual_activate')
 
    def register(self):
 
        c.auto_active = 'hg.register.auto_activate' in User.get_default_user() \
 
            .AuthUser.permissions['global']
 
        def_user_perms = AuthUser(dbuser=User.get_default_user()).permissions['global']
 
        c.auto_active = 'hg.register.auto_activate' in def_user_perms
 

	
 
        settings = Setting.get_app_settings()
 
        captcha_private_key = settings.get('captcha_private_key')
 
        c.captcha_active = bool(captcha_private_key)
 
        c.captcha_public_key = settings.get('captcha_public_key')
 

	
 
        if request.POST:
 
            register_form = RegisterForm()()
 
            try:
 
                form_result = register_form.to_python(dict(request.POST))
 
                form_result['active'] = c.auto_active
 

	
 
                if c.captcha_active:
 
                    from kallithea.lib.recaptcha import submit
 
                    response = submit(request.POST.get('g-recaptcha-response'),
 
                                      private_key=captcha_private_key,
 
                                      remoteip=request.ip_addr)
 
                    if not response.is_valid:
 
                        _value = form_result
 
                        _msg = _('Bad captcha')
 
                        error_dict = {'recaptcha_field': _msg}
 
                        raise formencode.Invalid(_msg, _value, None,
 
                                                 error_dict=error_dict)
 

	
 
                UserModel().create_registration(form_result)
 
                h.flash(_('You have successfully registered with %s') % (c.site_name or 'Kallithea'),
 
                        category='success')
 
                Session().commit()
 
                raise HTTPFound(location=url('login_home'))
 

	
 
            except formencode.Invalid as errors:
 
                return htmlfill.render(
 
                    render('/register.html'),
 
                    defaults=errors.value,
 
                    errors=errors.error_dict or {},
 
                    prefix_error=False,
 
                    encoding="UTF-8",
 
                    force_defaults=False)
 
            except UserCreationError as e:
 
                # container auth or other auth functions that create users on
 
                # the fly can throw this exception signaling that there's issue
 
                # with user creation, explanation should be provided in
 
                # Exception itself
 
                h.flash(e, 'error')
 

	
 
        return render('/register.html')
 

	
 
    def password_reset(self):
 
        settings = Setting.get_app_settings()
 
        captcha_private_key = settings.get('captcha_private_key')
 
        c.captcha_active = bool(captcha_private_key)
 
        c.captcha_public_key = settings.get('captcha_public_key')
 

	
 
        if request.POST:
 
            password_reset_form = PasswordResetRequestForm()()
 
            try:
 
                form_result = password_reset_form.to_python(dict(request.POST))
 
                if c.captcha_active:
 
                    from kallithea.lib.recaptcha import submit
 
                    response = submit(request.POST.get('g-recaptcha-response'),
 
                                      private_key=captcha_private_key,
 
                                      remoteip=request.ip_addr)
 
                    if not response.is_valid:
 
                        _value = form_result
 
                        _msg = _('Bad captcha')
 
                        error_dict = {'recaptcha_field': _msg}
 
                        raise formencode.Invalid(_msg, _value, None,
 
                                                 error_dict=error_dict)
 
                redirect_link = UserModel().send_reset_password_email(form_result)
 
                h.flash(_('A password reset confirmation code has been sent'),
 
                            category='success')
 
                raise HTTPFound(location=redirect_link)
 

	
 
            except formencode.Invalid as errors:
 
                return htmlfill.render(
 
                    render('/password_reset.html'),
 
                    defaults=errors.value,
 
                    errors=errors.error_dict or {},
 
                    prefix_error=False,
 
                    encoding="UTF-8",
 
                    force_defaults=False)
 

	
 
        return render('/password_reset.html')
 

	
 
    def password_reset_confirmation(self):
 
        # This controller handles both GET and POST requests, though we
 
        # only ever perform the actual password change on POST (since
 
        # GET requests are not allowed to have side effects, and do not
 
        # receive automatic CSRF protection).
 

	
 
        # The template needs the email address outside of the form.
 
        c.email = request.params.get('email')
 

	
 
        if not request.POST:
 
            return htmlfill.render(
 
                render('/password_reset_confirmation.html'),
 
                defaults=dict(request.params),
 
                encoding='UTF-8')
 

	
 
        form = PasswordResetConfirmationForm()()
 
        try:
 
            form_result = form.to_python(dict(request.POST))
 
        except formencode.Invalid as errors:
 
            return htmlfill.render(
 
                render('/password_reset_confirmation.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding='UTF-8')
 

	
 
        if not UserModel().verify_reset_password_token(
 
            form_result['email'],
 
            form_result['timestamp'],
 
            form_result['token'],
 
        ):
 
            return htmlfill.render(
 
                render('/password_reset_confirmation.html'),
 
                defaults=form_result,
 
                errors={'token': _('Invalid password reset token')},
 
                prefix_error=False,
 
                encoding='UTF-8')
 

	
 
        UserModel().reset_password(form_result['email'], form_result['password'])
 
        h.flash(_('Successfully updated password'), category='success')
 
        raise HTTPFound(location=url('login_home'))
 

	
 
    def logout(self):
 
        session.delete()
 
        log.info('Logging out and deleting session for user')
 
        raise HTTPFound(location=url('home'))
 

	
 
    def authentication_token(self):
 
        """Return the CSRF protection token for the session - just like it
 
        could have been screen scraped from a page with a form.
 
        Only intended for testing but might also be useful for other kinds
 
        of automation.
 
        """
 
        return h.authentication_token()
kallithea/lib/auth_modules/__init__.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Authentication modules
 
"""
 

	
 
import logging
 
import traceback
 
import importlib
 

	
 
from kallithea.lib.utils2 import str2bool
 
from kallithea.lib.compat import formatted_json, hybrid_property
 
from kallithea.lib.auth import PasswordGenerator
 
from kallithea.lib.auth import PasswordGenerator, AuthUser
 
from kallithea.model.user import UserModel
 
from kallithea.model.db import Setting, User
 
from kallithea.model.meta import Session
 
from kallithea.model.user_group import UserGroupModel
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class LazyFormencode(object):
 
    def __init__(self, formencode_obj, *args, **kwargs):
 
        self.formencode_obj = formencode_obj
 
        self.args = args
 
        self.kwargs = kwargs
 

	
 
    def __call__(self, *args, **kwargs):
 
        from inspect import isfunction
 
        formencode_obj = self.formencode_obj
 
        if isfunction(formencode_obj):
 
            # case we wrap validators into functions
 
            formencode_obj = self.formencode_obj(*args, **kwargs)
 
        return formencode_obj(*self.args, **self.kwargs)
 

	
 

	
 
class KallitheaAuthPluginBase(object):
 
    auth_func_attrs = {
 
        "username": "unique username",
 
        "firstname": "first name",
 
        "lastname": "last name",
 
        "email": "email address",
 
        "groups": '["list", "of", "groups"]',
 
        "extern_name": "name in external source of record",
 
        "admin": 'True|False defines if user should be Kallithea admin',
 
    }
 

	
 
    @property
 
    def validators(self):
 
        """
 
        Exposes Kallithea validators modules
 
        """
 
        # this is a hack to overcome issues with pylons threadlocals and
 
        # translator object _() not being registered properly.
 
        class LazyCaller(object):
 
            def __init__(self, name):
 
                self.validator_name = name
 

	
 
            def __call__(self, *args, **kwargs):
 
                from kallithea.model import validators as v
 
                obj = getattr(v, self.validator_name)
 
                #log.debug('Initializing lazy formencode object: %s', obj)
 
                return LazyFormencode(obj, *args, **kwargs)
 

	
 
        class ProxyGet(object):
 
            def __getattribute__(self, name):
 
                return LazyCaller(name)
 

	
 
        return ProxyGet()
 

	
 
    @hybrid_property
 
    def name(self):
 
        """
 
        Returns the name of this authentication plugin.
 

	
 
        :returns: string
 
        """
 
        raise NotImplementedError("Not implemented in base class")
 

	
 
    @hybrid_property
 
    def is_container_auth(self):
 
        """
 
        Returns bool if this module uses container auth.
 

	
 
        This property will trigger an automatic call to authenticate on
 
        a visit to the website or during a push/pull.
 

	
 
        :returns: bool
 
        """
 
        return False
 

	
 
    def accepts(self, user, accepts_empty=True):
 
        """
 
        Checks if this authentication module should accept a request for
 
        the current user.
 

	
 
        :param user: user object fetched using plugin's get_user() method.
 
        :param accepts_empty: if True accepts don't allow the user to be empty
 
        :returns: boolean
 
        """
 
        plugin_name = self.name
 
        if not user and not accepts_empty:
 
            log.debug('User is empty not allowed to authenticate')
 
            return False
 

	
 
        if user and user.extern_type and user.extern_type != plugin_name:
 
            log.debug('User %s should authenticate using %s this is %s, skipping',
 
                      user, user.extern_type, plugin_name)
 

	
 
            return False
 
        return True
 

	
 
    def get_user(self, username=None, **kwargs):
 
        """
 
        Helper method for user fetching in plugins, by default it's using
 
        simple fetch by username, but this method can be customized in plugins
 
        eg. container auth plugin to fetch user by environ params
 

	
 
        :param username: username if given to fetch from database
 
        :param kwargs: extra arguments needed for user fetching.
 
        """
 
        user = None
 
        log.debug('Trying to fetch user `%s` from Kallithea database',
 
                  username)
 
        if username:
 
            user = User.get_by_username_or_email(username)
 
            if user is None:
 
                log.debug('Fallback to fetch user in case insensitive mode')
 
                user = User.get_by_username(username, case_insensitive=True)
 
        else:
 
            log.debug('provided username:`%s` is empty skipping...', username)
 
        return user
 

	
 
    def settings(self):
 
        """
 
        Return a list of the form:
 
        [
 
            {
 
                "name": "OPTION_NAME",
 
                "type": "[bool|password|string|int|select]",
 
                ["values": ["opt1", "opt2", ...]]
 
                "validator": "expr"
 
                "description": "A short description of the option" [,
 
                "default": Default Value],
 
                ["formname": "Friendly Name for Forms"]
 
            } [, ...]
 
        ]
 

	
 
        This is used to interrogate the authentication plugin as to what
 
        settings it expects to be present and configured.
 

	
 
        'type' is a shorthand notation for what kind of value this option is.
 
        This is primarily used by the auth web form to control how the option
 
        is configured.
 
                bool : checkbox
 
                password : password input box
 
                string : input box
 
                select : single select dropdown
 

	
 
        'validator' is an lazy instantiated form field validator object, ala
 
        formencode. You need to *call* this object to init the validators.
 
        All calls to Kallithea validators should be used through self.validators
 
        which is a lazy loading proxy of formencode module.
 
        """
 
        raise NotImplementedError("Not implemented in base class")
 

	
 
    def plugin_settings(self):
 
        """
 
        This method is called by the authentication framework, not the .settings()
 
        method. This method adds a few default settings (e.g., "enabled"), so that
 
        plugin authors don't have to maintain a bunch of boilerplate.
 

	
 
        OVERRIDING THIS METHOD WILL CAUSE YOUR PLUGIN TO FAIL.
 
        """
 

	
 
        rcsettings = self.settings()
 
        rcsettings.insert(0, {
 
            "name": "enabled",
 
            "validator": self.validators.StringBoolean(if_missing=False),
 
            "type": "bool",
 
            "description": "Enable or Disable this Authentication Plugin",
 
            "formname": "Enabled"
 
            }
 
        )
 
        return rcsettings
 

	
 
    def auth(self, userobj, username, passwd, settings, **kwargs):
 
        """
 
        Given a user object (which may be None), username, a plaintext password,
 
        and a settings object (containing all the keys needed as listed in settings()),
 
        authenticate this user's login attempt.
 

	
 
        Return None on failure. On success, return a dictionary with keys from
 
        KallitheaAuthPluginBase.auth_func_attrs.
 

	
 
        This is later validated for correctness.
 
        """
 
        raise NotImplementedError("not implemented in base class")
 

	
 
    def _authenticate(self, userobj, username, passwd, settings, **kwargs):
 
        """
 
        Wrapper to call self.auth() that validates call on it
 
        """
 
        user_data = self.auth(userobj, username, passwd, settings, **kwargs)
 
        if user_data is not None:
 
            return self._validate_auth_return(user_data)
 
        return None
 

	
 
    def _validate_auth_return(self, user_data):
 
        if not isinstance(user_data, dict):
 
            raise Exception('returned value from auth must be a dict')
 
        for k in self.auth_func_attrs:
 
            if k not in user_data:
 
                raise Exception('Missing %s attribute from returned data' % k)
 
        return user_data
 

	
 

	
 
class KallitheaExternalAuthPlugin(KallitheaAuthPluginBase):
 
    def use_fake_password(self):
 
        """
 
        Return a boolean that indicates whether or not we should set the user's
 
        password to a random value when it is authenticated by this plugin.
 
        If your plugin provides authentication, then you will generally want this.
 

	
 
        :returns: boolean
 
        """
 
        raise NotImplementedError("Not implemented in base class")
 

	
 
    def _authenticate(self, userobj, username, passwd, settings, **kwargs):
 
        user_data = super(KallitheaExternalAuthPlugin, self)._authenticate(
 
            userobj, username, passwd, settings, **kwargs)
 
        if user_data is not None:
 
            if userobj is None: # external authentication of unknown user that will be created soon
 
                def_user_perms = User.get_default_user().AuthUser.permissions['global']
 
                def_user_perms = AuthUser(dbuser=User.get_default_user()).permissions['global']
 
                active = 'hg.extern_activate.auto' in def_user_perms
 
            else:
 
                active = userobj.active
 

	
 
            if self.use_fake_password():
 
                # Randomize the PW because we don't need it, but don't want
 
                # them blank either
 
                passwd = PasswordGenerator().gen_password(length=8)
 

	
 
            log.debug('Updating or creating user info from %s plugin',
 
                      self.name)
 
            user = UserModel().create_or_update(
 
                username=user_data['username'],
 
                password=passwd,
 
                email=user_data["email"],
 
                firstname=user_data["firstname"],
 
                lastname=user_data["lastname"],
 
                active=active,
 
                admin=user_data["admin"],
 
                extern_name=user_data["extern_name"],
 
                extern_type=self.name,
 
            )
 
            # enforce user is just in given groups, all of them has to be ones
 
            # created from plugins. We store this info in _group_data JSON field
 
            groups = user_data['groups'] or []
 
            UserGroupModel().enforce_groups(user, groups, self.name)
 
            Session().commit()
 
        return user_data
 

	
 

	
 
def loadplugin(plugin):
 
    """
 
    Imports, instantiates, and returns the authentication plugin in the module named by plugin
 
    (e.g., plugin='kallithea.lib.auth_modules.auth_internal'). Returns an instance of the
 
    KallitheaAuthPluginBase subclass on success, raises exceptions on failure.
 

	
 
    raises:
 
        AttributeError -- no KallitheaAuthPlugin class in the module
 
        TypeError -- if the KallitheaAuthPlugin is not a subclass of ours KallitheaAuthPluginBase
 
        ImportError -- if we couldn't import the plugin at all
 
    """
 
    log.debug("Importing %s", plugin)
 
    if not plugin.startswith(u'kallithea.lib.auth_modules.auth_'):
 
        parts = plugin.split(u'.lib.auth_modules.auth_', 1)
 
        if len(parts) == 2:
 
            _module, pn = parts
 
            plugin = u'kallithea.lib.auth_modules.auth_' + pn
 
    PLUGIN_CLASS_NAME = "KallitheaAuthPlugin"
 
    try:
 
        module = importlib.import_module(plugin)
 
    except (ImportError, TypeError):
 
        log.error(traceback.format_exc())
 
        # TODO: make this more error prone, if by some accident we screw up
 
        # the plugin name, the crash is pretty bad and hard to recover
 
        raise
 

	
 
    log.debug("Loaded auth plugin from %s (module:%s, file:%s)",
 
              plugin, module.__name__, module.__file__)
 

	
 
    pluginclass = getattr(module, PLUGIN_CLASS_NAME)
 
    if not issubclass(pluginclass, KallitheaAuthPluginBase):
 
        raise TypeError("Authentication class %s.KallitheaAuthPlugin is not "
 
                        "a subclass of %s" % (plugin, KallitheaAuthPluginBase))
 

	
 
    plugin = pluginclass()
 
    if plugin.plugin_settings.im_func != KallitheaAuthPluginBase.plugin_settings.im_func:
 
        raise TypeError("Authentication class %s.KallitheaAuthPluginBase "
 
                        "has overridden the plugin_settings method, which is "
 
                        "forbidden." % plugin)
 
    return plugin
 

	
 

	
 
def get_auth_plugins():
 
    """Return a list of instances of plugins that are available and enabled"""
 
    auth_plugins = []
 
    for plugin_name in Setting.get_by_name("auth_plugins").app_settings_value:
 
        try:
 
            plugin = loadplugin(plugin_name)
 
        except Exception:
 
            log.exception('Failed to load authentication module %s' % (plugin_name))
 
        else:
 
            auth_plugins.append(plugin)
 
    return auth_plugins
 

	
 

	
 
def authenticate(username, password, environ=None):
 
    """
 
    Authentication function used for access control,
 
    It tries to authenticate based on enabled authentication modules.
 

	
 
    :param username: username can be empty for container auth
 
    :param password: password can be empty for container auth
 
    :param environ: environ headers passed for container auth
 
    :returns: None if auth failed, user_data dict if auth is correct
 
    """
 

	
 
    auth_plugins = get_auth_plugins()
 
    for plugin in auth_plugins:
 
        module = plugin.__class__.__module__
 
        log.debug('Trying authentication using %s', module)
 
        # load plugin settings from Kallithea database
 
        plugin_name = plugin.name
 
        plugin_settings = {}
 
        for v in plugin.plugin_settings():
 
            conf_key = "auth_%s_%s" % (plugin_name, v["name"])
 
            setting = Setting.get_by_name(conf_key)
 
            plugin_settings[v["name"]] = setting.app_settings_value if setting else None
 
        log.debug('Settings for auth plugin %s:\n%s', plugin_name, formatted_json(plugin_settings))
 

	
 
        if not str2bool(plugin_settings["enabled"]):
 
            log.info("Authentication plugin %s is disabled, skipping for %s",
 
                     module, username)
 
            continue
 

	
 
        # use plugin's method of user extraction.
 
        user = plugin.get_user(username, environ=environ,
 
                               settings=plugin_settings)
 
        log.debug('Plugin %s extracted user `%s`', module, user)
 

	
 
        if user is not None and not user.active:
 
            log.error("Rejecting authentication of in-active user %s", user)
 
            continue
 

	
 
        if not plugin.accepts(user):
 
            log.debug('Plugin %s does not accept user `%s` for authentication',
 
                      module, user)
 
            continue
 
        else:
 
            log.debug('Plugin %s accepted user `%s` for authentication',
 
                      module, user)
 
            # The user might have tried to authenticate using their email address,
 
            # then the username variable wouldn't contain a valid username.
 
            # But as the plugin has accepted the user, .username field should
 
            # have a valid username, so use it for authentication purposes.
 
            if user is not None:
 
                username = user.username
 

	
 
        log.info('Authenticating user using %s plugin', module)
 

	
 
        # _authenticate is a wrapper for .auth() method of plugin.
 
        # it checks if .auth() sends proper data. For KallitheaExternalAuthPlugin
 
        # it also maps users to Database and maps the attributes returned
 
        # from .auth() to Kallithea database. If this function returns data
 
        # then auth is correct.
 
        user_data = plugin._authenticate(user, username, password,
 
                                           plugin_settings,
 
                                           environ=environ or {})
 
        log.debug('Plugin user data: %s', user_data)
 

	
 
        if user_data is not None:
 
            log.debug('Plugin returned proper authentication data')
 
            return user_data
 

	
 
        # we failed to Auth because .auth() method didn't return the user
 
        if username:
 
            log.warning("User `%s` failed to authenticate against %s",
 
                        username, module)
 
    return None
 

	
 

	
 
def get_managed_fields(user):
 
    """return list of fields that are managed by the user's auth source, usually some of
 
    'username', 'firstname', 'lastname', 'email', 'password'
 
    """
 
    auth_plugins = get_auth_plugins()
 
    for plugin in auth_plugins:
 
        module = plugin.__class__.__module__
 
        log.debug('testing %s (%s) with auth plugin %s', user, user.extern_type, module)
 
        if plugin.name == user.extern_type:
 
            return plugin.get_managed_fields()
 
    log.error('no auth plugin %s found for %s', user.extern_type, user)
 
    return [] # TODO: Fail badly instead of allowing everything to be edited?
kallithea/model/db.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.db
 
~~~~~~~~~~~~~~~~~~
 

	
 
Database Models for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 08, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import time
 
import logging
 
import datetime
 
import traceback
 
import hashlib
 
import collections
 
import functools
 

	
 
import sqlalchemy
 
from sqlalchemy import *
 
from sqlalchemy.ext.hybrid import hybrid_property
 
from sqlalchemy.orm import relationship, joinedload, class_mapper, validates
 
from beaker.cache import cache_region, region_invalidate
 
from webob.exc import HTTPNotFound
 

	
 
from tg.i18n import lazy_ugettext as _
 

	
 
from kallithea.lib.exceptions import DefaultUserException
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 

	
 
from kallithea.lib.utils2 import str2bool, safe_str, get_changeset_safe, \
 
    safe_unicode, remove_prefix, time_to_datetime, aslist, Optional, safe_int, \
 
    get_clone_url, urlreadable
 
from kallithea.lib.compat import json
 
from kallithea.lib.caching_query import FromCache
 

	
 
from kallithea.model.meta import Base, Session
 

	
 
URL_SEP = '/'
 
log = logging.getLogger(__name__)
 

	
 
#==============================================================================
 
# BASE CLASSES
 
#==============================================================================
 

	
 
_hash_key = lambda k: hashlib.md5(safe_str(k)).hexdigest()
 

	
 

	
 
class BaseDbModel(object):
 
    """
 
    Base Model for all classes
 
    """
 

	
 
    @classmethod
 
    def _get_keys(cls):
 
        """return column names for this model """
 
        return class_mapper(cls).c.keys()
 

	
 
    def get_dict(self):
 
        """
 
        return dict with keys and values corresponding
 
        to this model data """
 

	
 
        d = {}
 
        for k in self._get_keys():
 
            d[k] = getattr(self, k)
 

	
 
        # also use __json__() if present to get additional fields
 
        _json_attr = getattr(self, '__json__', None)
 
        if _json_attr:
 
            # update with attributes from __json__
 
            if callable(_json_attr):
 
                _json_attr = _json_attr()
 
            for k, val in _json_attr.iteritems():
 
                d[k] = val
 
        return d
 

	
 
    def get_appstruct(self):
 
        """return list with keys and values tuples corresponding
 
        to this model data """
 

	
 
        return [
 
            (k, getattr(self, k))
 
            for k in self._get_keys()
 
        ]
 

	
 
    def populate_obj(self, populate_dict):
 
        """populate model with data from given populate_dict"""
 

	
 
        for k in self._get_keys():
 
            if k in populate_dict:
 
                setattr(self, k, populate_dict[k])
 

	
 
    @classmethod
 
    def query(cls):
 
        return Session().query(cls)
 

	
 
    @classmethod
 
    def get(cls, id_):
 
        if id_:
 
            return cls.query().get(id_)
 

	
 
    @classmethod
 
    def guess_instance(cls, value, callback=None):
 
        """Haphazardly attempt to convert `value` to a `cls` instance.
 

	
 
        If `value` is None or already a `cls` instance, return it. If `value`
 
        is a number (or looks like one if you squint just right), assume it's
 
        a database primary key and let SQLAlchemy sort things out. Otherwise,
 
        fall back to resolving it using `callback` (if specified); this could
 
        e.g. be a function that looks up instances by name (though that won't
 
        work if the name begins with a digit). Otherwise, raise Exception.
 
        """
 

	
 
        if value is None:
 
            return None
 
        if isinstance(value, cls):
 
            return value
 
        if isinstance(value, (int, long)) or safe_str(value).isdigit():
 
            return cls.get(value)
 
        if callback is not None:
 
            return callback(value)
 

	
 
        raise Exception(
 
            'given object must be int, long or Instance of %s '
 
            'got %s, no callback provided' % (cls, type(value))
 
        )
 

	
 
    @classmethod
 
    def get_or_404(cls, id_):
 
        try:
 
            id_ = int(id_)
 
        except (TypeError, ValueError):
 
            raise HTTPNotFound
 

	
 
        res = cls.query().get(id_)
 
        if res is None:
 
            raise HTTPNotFound
 
        return res
 

	
 
    @classmethod
 
    def delete(cls, id_):
 
        obj = cls.query().get(id_)
 
        Session().delete(obj)
 

	
 
    def __repr__(self):
 
        if hasattr(self, '__unicode__'):
 
            # python repr needs to return str
 
            try:
 
                return safe_str(self.__unicode__())
 
            except UnicodeDecodeError:
 
                pass
 
        return '<DB:%s>' % (self.__class__.__name__)
 

	
 

	
 
_table_args_default_dict = {'extend_existing': True,
 
                            'mysql_engine': 'InnoDB',
 
                            'mysql_charset': 'utf8',
 
                            'sqlite_autoincrement': True,
 
                           }
 

	
 
class Setting(Base, BaseDbModel):
 
    __tablename__ = 'settings'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    SETTINGS_TYPES = {
 
        'str': safe_str,
 
        'int': safe_int,
 
        'unicode': safe_unicode,
 
        'bool': str2bool,
 
        'list': functools.partial(aslist, sep=',')
 
    }
 
    DEFAULT_UPDATE_URL = ''
 

	
 
    app_settings_id = Column(Integer(), primary_key=True)
 
    app_settings_name = Column(String(255), nullable=False, unique=True)
 
    _app_settings_value = Column("app_settings_value", Unicode(4096), nullable=False)
 
    _app_settings_type = Column("app_settings_type", String(255), nullable=True) # FIXME: not nullable?
 

	
 
    def __init__(self, key='', val='', type='unicode'):
 
        self.app_settings_name = key
 
        self.app_settings_value = val
 
        self.app_settings_type = type
 

	
 
    @validates('_app_settings_value')
 
    def validate_settings_value(self, key, val):
 
        assert type(val) == unicode
 
        return val
 

	
 
    @hybrid_property
 
    def app_settings_value(self):
 
        v = self._app_settings_value
 
        _type = self.app_settings_type
 
        converter = self.SETTINGS_TYPES.get(_type) or self.SETTINGS_TYPES['unicode']
 
        return converter(v)
 

	
 
    @app_settings_value.setter
 
    def app_settings_value(self, val):
 
        """
 
        Setter that will always make sure we use unicode in app_settings_value
 

	
 
        :param val:
 
        """
 
        self._app_settings_value = safe_unicode(val)
 

	
 
    @hybrid_property
 
    def app_settings_type(self):
 
        return self._app_settings_type
 

	
 
    @app_settings_type.setter
 
    def app_settings_type(self, val):
 
        if val not in self.SETTINGS_TYPES:
 
            raise Exception('type must be one of %s got %s'
 
                            % (self.SETTINGS_TYPES.keys(), val))
 
        self._app_settings_type = val
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s[%s]')>" % (
 
            self.__class__.__name__,
 
            self.app_settings_name, self.app_settings_value, self.app_settings_type
 
        )
 

	
 
    @classmethod
 
    def get_by_name(cls, key):
 
        return cls.query() \
 
            .filter(cls.app_settings_name == key).scalar()
 

	
 
    @classmethod
 
    def get_by_name_or_create(cls, key, val='', type='unicode'):
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            res = cls(key, val, type)
 
        return res
 

	
 
    @classmethod
 
    def create_or_update(cls, key, val=Optional(''), type=Optional('unicode')):
 
        """
 
        Creates or updates Kallithea setting. If updates are triggered, it will only
 
        update parameters that are explicitly set. Optional instance will be skipped.
 

	
 
        :param key:
 
        :param val:
 
        :param type:
 
        :return:
 
        """
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            val = Optional.extract(val)
 
            type = Optional.extract(type)
 
            res = cls(key, val, type)
 
            Session().add(res)
 
        else:
 
            res.app_settings_name = key
 
            if not isinstance(val, Optional):
 
                # update if set
 
                res.app_settings_value = val
 
            if not isinstance(type, Optional):
 
                # update if set
 
                res.app_settings_type = type
 
        return res
 

	
 
    @classmethod
 
    def get_app_settings(cls, cache=False):
 

	
 
        ret = cls.query()
 

	
 
        if cache:
 
            ret = ret.options(FromCache("sql_cache_short", "get_hg_settings"))
 

	
 
        if ret is None:
 
            raise Exception('Could not get application settings !')
 
        settings = {}
 
        for each in ret:
 
            settings[each.app_settings_name] = \
 
                each.app_settings_value
 

	
 
        return settings
 

	
 
    @classmethod
 
    def get_auth_settings(cls, cache=False):
 
        ret = cls.query() \
 
                .filter(cls.app_settings_name.startswith('auth_')).all()
 
        fd = {}
 
        for row in ret:
 
            fd[row.app_settings_name] = row.app_settings_value
 
        return fd
 

	
 
    @classmethod
 
    def get_default_repo_settings(cls, cache=False, strip_prefix=False):
 
        ret = cls.query() \
 
                .filter(cls.app_settings_name.startswith('default_')).all()
 
        fd = {}
 
        for row in ret:
 
            key = row.app_settings_name
 
            if strip_prefix:
 
                key = remove_prefix(key, prefix='default_')
 
            fd.update({key: row.app_settings_value})
 

	
 
        return fd
 

	
 
    @classmethod
 
    def get_server_info(cls):
 
        import pkg_resources
 
        import platform
 
        import kallithea
 
        from kallithea.lib.utils import check_git_version
 
        mods = [(p.project_name, p.version) for p in pkg_resources.working_set]
 
        info = {
 
            'modules': sorted(mods, key=lambda k: k[0].lower()),
 
            'py_version': platform.python_version(),
 
            'platform': safe_unicode(platform.platform()),
 
            'kallithea_version': kallithea.__version__,
 
            'git_version': safe_unicode(check_git_version()),
 
            'git_path': kallithea.CONFIG.get('git_path')
 
        }
 
        return info
 

	
 

	
 
class Ui(Base, BaseDbModel):
 
    __tablename__ = 'ui'
 
    __table_args__ = (
 
        # FIXME: ui_key as key is wrong and should be removed when the corresponding
 
        # Ui.get_by_key has been replaced by the composite key
 
        UniqueConstraint('ui_key'),
 
        UniqueConstraint('ui_section', 'ui_key'),
 
        _table_args_default_dict,
 
    )
 

	
 
    HOOK_UPDATE = 'changegroup.update'
 
    HOOK_REPO_SIZE = 'changegroup.repo_size'
 
    HOOK_PUSH_LOG = 'changegroup.push_logger'
 
    HOOK_PUSH_LOCK = 'prechangegroup.push_lock_handling'
 
    HOOK_PULL_LOG = 'outgoing.pull_logger'
 
    HOOK_PULL_LOCK = 'preoutgoing.pull_lock_handling'
 

	
 
    ui_id = Column(Integer(), primary_key=True)
 
    ui_section = Column(String(255), nullable=False)
 
    ui_key = Column(String(255), nullable=False)
 
    ui_value = Column(String(255), nullable=True) # FIXME: not nullable?
 
    ui_active = Column(Boolean(), nullable=False, default=True)
 

	
 
    @classmethod
 
    def get_by_key(cls, section, key):
 
        """ Return specified Ui object, or None if not found. """
 
        return cls.query().filter_by(ui_section=section, ui_key=key).scalar()
 

	
 
    @classmethod
 
    def get_or_create(cls, section, key):
 
        """ Return specified Ui object, creating it if necessary. """
 
        setting = cls.get_by_key(section, key)
 
        if setting is None:
 
            setting = cls(ui_section=section, ui_key=key)
 
            Session().add(setting)
 
        return setting
 

	
 
    @classmethod
 
    def get_builtin_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE,
 
                                     cls.HOOK_PUSH_LOG, cls.HOOK_PUSH_LOCK,
 
                                     cls.HOOK_PULL_LOG, cls.HOOK_PULL_LOCK]))
 
        q = q.filter(cls.ui_section == 'hooks')
 
        return q.all()
 

	
 
    @classmethod
 
    def get_custom_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE,
 
                                      cls.HOOK_PUSH_LOG, cls.HOOK_PUSH_LOCK,
 
                                      cls.HOOK_PULL_LOG, cls.HOOK_PULL_LOCK]))
 
        q = q.filter(cls.ui_section == 'hooks')
 
        return q.all()
 

	
 
    @classmethod
 
    def get_repos_location(cls):
 
        return cls.get_by_key('paths', '/').ui_value
 

	
 
    @classmethod
 
    def create_or_update_hook(cls, key, val):
 
        new_ui = cls.get_or_create('hooks', key)
 
        new_ui.ui_active = True
 
        new_ui.ui_value = val
 

	
 
    def __repr__(self):
 
        return '<%s[%s]%s=>%s]>' % (self.__class__.__name__, self.ui_section,
 
                                    self.ui_key, self.ui_value)
 

	
 

	
 
class User(Base, BaseDbModel):
 
    __tablename__ = 'users'
 
    __table_args__ = (
 
        Index('u_username_idx', 'username'),
 
        Index('u_email_idx', 'email'),
 
        _table_args_default_dict,
 
    )
 

	
 
    DEFAULT_USER = 'default'
 
    DEFAULT_GRAVATAR_URL = 'https://secure.gravatar.com/avatar/{md5email}?d=identicon&s={size}'
 
    # The name of the default auth type in extern_type, 'internal' lives in auth_internal.py
 
    DEFAULT_AUTH_TYPE = 'internal'
 

	
 
    user_id = Column(Integer(), primary_key=True)
 
    username = Column(String(255), nullable=False, unique=True)
 
    password = Column(String(255), nullable=False)
 
    active = Column(Boolean(), nullable=False, default=True)
 
    admin = Column(Boolean(), nullable=False, default=False)
 
    name = Column("firstname", Unicode(255), nullable=False)
 
    lastname = Column(Unicode(255), nullable=False)
 
    _email = Column("email", String(255), nullable=True, unique=True) # FIXME: not nullable?
 
    last_login = Column(DateTime(timezone=False), nullable=True)
 
    extern_type = Column(String(255), nullable=True) # FIXME: not nullable?
 
    extern_name = Column(String(255), nullable=True) # FIXME: not nullable?
 
    api_key = Column(String(255), nullable=False)
 
    inherit_default_permissions = Column(Boolean(), nullable=False, default=True)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _user_data = Column("user_data", LargeBinary(), nullable=True)  # JSON data # FIXME: not nullable?
 

	
 
    user_log = relationship('UserLog')
 
    user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
 

	
 
    repositories = relationship('Repository')
 
    repo_groups = relationship('RepoGroup')
 
    user_groups = relationship('UserGroup')
 
    user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
 
    followings = relationship('UserFollowing', primaryjoin='UserFollowing.user_id==User.user_id', cascade='all')
 

	
 
    repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
 
    repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
 

	
 
    group_member = relationship('UserGroupMember', cascade='all')
 

	
 
    # comments created by this user
 
    user_comments = relationship('ChangesetComment', cascade='all')
 
    # extra emails for this user
 
    user_emails = relationship('UserEmailMap', cascade='all')
 
    # extra API keys
 
    user_api_keys = relationship('UserApiKeys', cascade='all')
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 
    @property
 
    def firstname(self):
 
        # alias for future
 
        return self.name
 

	
 
    @property
 
    def emails(self):
 
        other = UserEmailMap.query().filter(UserEmailMap.user == self).all()
 
        return [self.email] + [x.email for x in other]
 

	
 
    @property
 
    def api_keys(self):
 
        other = UserApiKeys.query().filter(UserApiKeys.user == self).all()
 
        return [self.api_key] + [x.api_key for x in other]
 

	
 
    @property
 
    def ip_addresses(self):
 
        ret = UserIpMap.query().filter(UserIpMap.user == self).all()
 
        return [x.ip_addr for x in ret]
 

	
 
    @property
 
    def full_name(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def full_name_or_username(self):
 
        """
 
        Show full name.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s' % (self.firstname, self.lastname)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_name_and_username(self):
 
        """
 
        Show full name and username as 'Firstname Lastname (username)'.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s (%s)' % (self.firstname, self.lastname, self.username)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_contact(self):
 
        return '%s %s <%s>' % (self.firstname, self.lastname, self.email)
 

	
 
    @property
 
    def short_contact(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def is_admin(self):
 
        return self.admin
 

	
 
    @hybrid_property
 
    def is_default_user(self):
 
        return self.username == User.DEFAULT_USER
 

	
 
    @property
 
    def AuthUser(self):
 
        """
 
        Returns instance of AuthUser for this user
 
        """
 
        from kallithea.lib.auth import AuthUser
 
        return AuthUser(dbuser=self)
 

	
 
    @hybrid_property
 
    def user_data(self):
 
        if not self._user_data:
 
            return {}
 

	
 
        try:
 
            return json.loads(self._user_data)
 
        except TypeError:
 
            return {}
 

	
 
    @user_data.setter
 
    def user_data(self, val):
 
        try:
 
            self._user_data = json.dumps(val)
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
 
                                      self.user_id, self.username)
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(User, cls).guess_instance(value, User.get_by_username)
 

	
 
    @classmethod
 
    def get_or_404(cls, id_, allow_default=True):
 
        '''
 
        Overridden version of BaseDbModel.get_or_404, with an extra check on
 
        the default user.
 
        '''
 
        user = super(User, cls).get_or_404(id_)
 
        if not allow_default and user.is_default_user:
 
            raise DefaultUserException()
 
        return user
 

	
 
    @classmethod
 
    def get_by_username_or_email(cls, username_or_email, case_insensitive=False, cache=False):
 
        """
 
        For anything that looks like an email address, look up by the email address (matching
 
        case insensitively).
 
        For anything else, try to look up by the user name.
 

	
 
        This assumes no normal username can have '@' symbol.
 
        """
 
        if '@' in username_or_email:
 
            return User.get_by_email(username_or_email, cache=cache)
 
        else:
 
            return User.get_by_username(username_or_email, case_insensitive=case_insensitive, cache=cache)
 

	
 
    @classmethod
 
    def get_by_username(cls, username, case_insensitive=False, cache=False):
 
        if case_insensitive:
 
            q = cls.query().filter(func.lower(cls.username) == func.lower(username))
 
        else:
 
            q = cls.query().filter(cls.username == username)
 

	
 
        if cache:
 
            q = q.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_user_%s" % _hash_key(username)
 
                          )
 
            )
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get_by_api_key(cls, api_key, cache=False, fallback=True):
 
        if len(api_key) != 40 or not api_key.isalnum():
 
            return None
 

	
 
        q = cls.query().filter(cls.api_key == api_key)
 

	
 
        if cache:
 
            q = q.options(FromCache("sql_cache_short",
 
                                    "get_api_key_%s" % api_key))
 
        res = q.scalar()
 

	
 
        if fallback and not res:
 
            # fallback to additional keys
 
            _res = UserApiKeys.query().filter_by(api_key=api_key, is_expired=False).first()
 
            if _res:
 
                res = _res.user
 
        return res
 

	
 
    @classmethod
 
    def get_by_email(cls, email, cache=False):
 
        q = cls.query().filter(func.lower(cls.email) == func.lower(email))
 

	
 
        if cache:
 
            q = q.options(FromCache("sql_cache_short",
 
                                    "get_email_key_%s" % email))
 

	
 
        ret = q.scalar()
 
        if ret is None:
 
            q = UserEmailMap.query()
 
            # try fetching in alternate email map
 
            q = q.filter(func.lower(UserEmailMap.email) == func.lower(email))
 
            q = q.options(joinedload(UserEmailMap.user))
 
            if cache:
 
                q = q.options(FromCache("sql_cache_short",
 
                                        "get_email_map_key_%s" % email))
 
            ret = getattr(q.scalar(), 'user', None)
 

	
 
        return ret
 

	
 
    @classmethod
 
    def get_from_cs_author(cls, author):
 
        """
 
        Tries to get User objects out of commit author string
 

	
 
        :param author:
 
        """
 
        from kallithea.lib.helpers import email, author_name
 
        # Valid email in the attribute passed, see if they're in the system
 
        _email = email(author)
 
        if _email:
 
            user = cls.get_by_email(_email)
 
            if user is not None:
 
                return user
 
        # Maybe we can match by username?
 
        _author = author_name(author)
 
        user = cls.get_by_username(_author, case_insensitive=True)
 
        if user is not None:
 
            return user
 

	
 
    def update_lastlogin(self):
 
        """Update user lastlogin"""
 
        self.last_login = datetime.datetime.now()
 
        log.debug('updated user %s lastlogin', self.username)
 

	
 
    @classmethod
 
    def get_first_admin(cls):
 
        user = User.query().filter(User.admin == True).first()
 
        if user is None:
 
            raise Exception('Missing administrative account!')
 
        return user
 

	
 
    @classmethod
 
    def get_default_user(cls, cache=False):
 
        user = User.get_by_username(User.DEFAULT_USER, cache=cache)
 
        if user is None:
 
            raise Exception('Missing default account!')
 
        return user
 

	
 
    def get_api_data(self, details=False):
 
        """
 
        Common function for generating user related data for API
 
        """
 
        user = self
 
        data = dict(
 
            user_id=user.user_id,
 
            username=user.username,
 
            firstname=user.name,
 
            lastname=user.lastname,
 
            email=user.email,
 
            emails=user.emails,
 
            active=user.active,
 
            admin=user.admin,
 
        )
 
        if details:
 
            data.update(dict(
 
                extern_type=user.extern_type,
 
                extern_name=user.extern_name,
 
                api_key=user.api_key,
 
                api_keys=user.api_keys,
 
                last_login=user.last_login,
 
                ip_addresses=user.ip_addresses
 
                ))
 
        return data
 

	
 
    def __json__(self):
 
        data = dict(
 
            full_name=self.full_name,
 
            full_name_or_username=self.full_name_or_username,
 
            short_contact=self.short_contact,
 
            full_contact=self.full_contact
 
        )
 
        data.update(self.get_api_data())
 
        return data
 

	
 

	
 
class UserApiKeys(Base, BaseDbModel):
 
    __tablename__ = 'user_api_keys'
 
    __table_args__ = (
 
        Index('uak_api_key_idx', 'api_key'),
 
        Index('uak_api_key_expires_idx', 'api_key', 'expires'),
 
        _table_args_default_dict,
 
    )
 
    __mapper_args__ = {}
 

	
 
    user_api_key_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    api_key = Column(String(255), nullable=False, unique=True)
 
    description = Column(UnicodeText(), nullable=False)
 
    expires = Column(Float(53), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    user = relationship('User')
 

	
 
    @hybrid_property
 
    def is_expired(self):
 
        return (self.expires != -1) & (time.time() > self.expires)
 

	
 

	
 
class UserEmailMap(Base, BaseDbModel):
 
    __tablename__ = 'user_email_map'
 
    __table_args__ = (
 
        Index('uem_email_idx', 'email'),
 
        _table_args_default_dict,
 
    )
 
    __mapper_args__ = {}
 

	
 
    email_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    _email = Column("email", String(255), nullable=False, unique=True)
 
    user = relationship('User')
 

	
 
    @validates('_email')
 
    def validate_email(self, key, email):
 
        # check if this email is not main one
 
        main_email = Session().query(User).filter(User.email == email).scalar()
 
        if main_email is not None:
 
            raise AttributeError('email %s is present is user table' % email)
 
        return email
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 

	
 
class UserIpMap(Base, BaseDbModel):
 
    __tablename__ = 'user_ip_map'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'ip_addr'),
 
        _table_args_default_dict,
 
    )
 
    __mapper_args__ = {}
 

	
 
    ip_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    ip_addr = Column(String(255), nullable=False)
 
    active = Column(Boolean(), nullable=False, default=True)
 
    user = relationship('User')
 

	
 
    @classmethod
 
    def _get_ip_range(cls, ip_addr):
 
        from kallithea.lib import ipaddr
 
        net = ipaddr.IPNetwork(address=ip_addr)
 
        return [str(net.network), str(net.broadcast)]
 

	
 
    def __json__(self):
 
        return dict(
 
          ip_addr=self.ip_addr,
 
          ip_range=self._get_ip_range(self.ip_addr)
 
        )
 

	
 
    def __unicode__(self):
 
        return u"<%s('user_id:%s=>%s')>" % (self.__class__.__name__,
 
                                            self.user_id, self.ip_addr)
 

	
 

	
 
class UserLog(Base, BaseDbModel):
 
    __tablename__ = 'user_logs'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    user_log_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=True)
 
    username = Column(String(255), nullable=False)
 
    repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=True)
 
    repository_name = Column(Unicode(255), nullable=False)
 
    user_ip = Column(String(255), nullable=True)
 
    action = Column(UnicodeText(), nullable=False)
 
    action_date = Column(DateTime(timezone=False), nullable=False)
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
 
                                      self.repository_name,
 
                                      self.action)
 

	
 
    @property
 
    def action_as_day(self):
 
        return datetime.date(*self.action_date.timetuple()[:3])
 

	
 
    user = relationship('User')
 
    repository = relationship('Repository', cascade='')
 

	
 

	
 
class UserGroup(Base, BaseDbModel):
 
    __tablename__ = 'users_groups'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    users_group_id = Column(Integer(), primary_key=True)
 
    users_group_name = Column(Unicode(255), nullable=False, unique=True)
 
    user_group_description = Column(Unicode(10000), nullable=True) # FIXME: not nullable?
 
    users_group_active = Column(Boolean(), nullable=False)
 
    inherit_default_permissions = Column("users_group_inherit_default_permissions", Boolean(), nullable=False, default=True)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _group_data = Column("group_data", LargeBinary(), nullable=True)  # JSON data # FIXME: not nullable?
 

	
 
    members = relationship('UserGroupMember', cascade="all, delete-orphan")
 
    users_group_to_perm = relationship('UserGroupToPerm', cascade='all')
 
    users_group_repo_to_perm = relationship('UserGroupRepoToPerm', cascade='all')
 
    users_group_repo_group_to_perm = relationship('UserGroupRepoGroupToPerm', cascade='all')
 
    user_user_group_to_perm = relationship('UserUserGroupToPerm ', cascade='all')
 
    user_group_user_group_to_perm = relationship('UserGroupUserGroupToPerm ', primaryjoin="UserGroupUserGroupToPerm.target_user_group_id==UserGroup.users_group_id", cascade='all')
 

	
 
    owner = relationship('User')
 

	
 
    @hybrid_property
 
    def group_data(self):
 
        if not self._group_data:
 
            return {}
 

	
 
        try:
 
            return json.loads(self._group_data)
 
        except TypeError:
 
            return {}
 

	
 
    @group_data.setter
 
    def group_data(self, val):
 
        try:
 
            self._group_data = json.dumps(val)
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
 
                                      self.users_group_id,
 
                                      self.users_group_name)
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(UserGroup, cls).guess_instance(value, UserGroup.get_by_group_name)
 

	
 
    @classmethod
 
    def get_by_group_name(cls, group_name, cache=False,
 
                          case_insensitive=False):
 
        if case_insensitive:
 
            q = cls.query().filter(func.lower(cls.users_group_name) == func.lower(group_name))
 
        else:
 
            q = cls.query().filter(cls.users_group_name == group_name)
 
        if cache:
 
            q = q.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_group_%s" % _hash_key(group_name)
 
                          )
 
            )
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get(cls, user_group_id, cache=False):
 
        user_group = cls.query()
 
        if cache:
 
            user_group = user_group.options(FromCache("sql_cache_short",
 
                                    "get_users_group_%s" % user_group_id))
 
        return user_group.get(user_group_id)
 

	
 
    def get_api_data(self, with_members=True):
 
        user_group = self
 

	
 
        data = dict(
 
            users_group_id=user_group.users_group_id,
 
            group_name=user_group.users_group_name,
 
            group_description=user_group.user_group_description,
 
            active=user_group.users_group_active,
 
            owner=user_group.owner.username,
 
        )
 
        if with_members:
 
            data['members'] = [
 
                ugm.user.get_api_data()
 
                for ugm in user_group.members
 
            ]
 

	
 
        return data
 

	
 

	
 
class UserGroupMember(Base, BaseDbModel):
 
    __tablename__ = 'users_groups_members'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    users_group_member_id = Column(Integer(), primary_key=True)
 
    users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    users_group = relationship('UserGroup')
 

	
 
    def __init__(self, gr_id='', u_id=''):
 
        self.users_group_id = gr_id
 
        self.user_id = u_id
 

	
 

	
 
class RepositoryField(Base, BaseDbModel):
 
    __tablename__ = 'repositories_fields'
 
    __table_args__ = (
 
        UniqueConstraint('repository_id', 'field_key'),  # no-multi field
 
        _table_args_default_dict,
 
    )
 

	
 
    PREFIX = 'ex_'  # prefix used in form to not conflict with already existing fields
 

	
 
    repo_field_id = Column(Integer(), primary_key=True)
 
    repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    field_key = Column(String(250), nullable=False)
 
    field_label = Column(String(1024), nullable=False)
 
    field_value = Column(String(10000), nullable=False)
 
    field_desc = Column(String(1024), nullable=False)
 
    field_type = Column(String(255), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    repository = relationship('Repository')
 

	
 
    @property
 
    def field_key_prefixed(self):
 
        return 'ex_%s' % self.field_key
 

	
 
    @classmethod
 
    def un_prefix_key(cls, key):
 
        if key.startswith(cls.PREFIX):
 
            return key[len(cls.PREFIX):]
 
        return key
 

	
 
    @classmethod
 
    def get_by_key_name(cls, key, repo):
 
        row = cls.query() \
 
                .filter(cls.repository == repo) \
 
                .filter(cls.field_key == key).scalar()
 
        return row
 

	
 

	
 
class Repository(Base, BaseDbModel):
 
    __tablename__ = 'repositories'
 
    __table_args__ = (
 
        Index('r_repo_name_idx', 'repo_name'),
 
        _table_args_default_dict,
 
    )
 

	
 
    DEFAULT_CLONE_URI = '{scheme}://{user}@{netloc}/{repo}'
 
    DEFAULT_CLONE_URI_ID = '{scheme}://{user}@{netloc}/_{repoid}'
 

	
 
    STATE_CREATED = u'repo_state_created'
 
    STATE_PENDING = u'repo_state_pending'
 
    STATE_ERROR = u'repo_state_error'
 

	
 
    repo_id = Column(Integer(), primary_key=True)
 
    repo_name = Column(Unicode(255), nullable=False, unique=True)
 
    repo_state = Column(String(255), nullable=False)
 

	
 
    clone_uri = Column(String(255), nullable=True) # FIXME: not nullable?
 
    repo_type = Column(String(255), nullable=False)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    private = Column(Boolean(), nullable=False)
 
    enable_statistics = Column("statistics", Boolean(), nullable=False, default=True)
 
    enable_downloads = Column("downloads", Boolean(), nullable=False, default=True)
 
    description = Column(Unicode(10000), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    updated_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _landing_revision = Column("landing_revision", String(255), nullable=False)
 
    enable_locking = Column(Boolean(), nullable=False, default=False)
 
    _locked = Column("locked", String(255), nullable=True) # FIXME: not nullable?
 
    _changeset_cache = Column("changeset_cache", LargeBinary(), nullable=True) # JSON data # FIXME: not nullable?
 

	
 
    fork_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=True)
 
    group_id = Column(Integer(), ForeignKey('groups.group_id'), nullable=True)
 

	
 
    owner = relationship('User')
 
    fork = relationship('Repository', remote_side=repo_id)
 
    group = relationship('RepoGroup')
 
    repo_to_perm = relationship('UserRepoToPerm', cascade='all', order_by='UserRepoToPerm.repo_to_perm_id')
 
    users_group_to_perm = relationship('UserGroupRepoToPerm', cascade='all')
 
    stats = relationship('Statistics', cascade='all', uselist=False)
 

	
 
    followers = relationship('UserFollowing',
 
                             primaryjoin='UserFollowing.follows_repository_id==Repository.repo_id',
 
                             cascade='all')
 
    extra_fields = relationship('RepositoryField',
 
                                cascade="all, delete-orphan")
 

	
 
    logs = relationship('UserLog')
 
    comments = relationship('ChangesetComment', cascade="all, delete-orphan")
 

	
 
    pull_requests_org = relationship('PullRequest',
 
                    primaryjoin='PullRequest.org_repo_id==Repository.repo_id',
 
                    cascade="all, delete-orphan")
 

	
 
    pull_requests_other = relationship('PullRequest',
 
                    primaryjoin='PullRequest.other_repo_id==Repository.repo_id',
 
                    cascade="all, delete-orphan")
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s')>" % (self.__class__.__name__, self.repo_id,
 
                                   safe_unicode(self.repo_name))
 

	
 
    @hybrid_property
 
    def landing_rev(self):
 
        # always should return [rev_type, rev]
 
        if self._landing_revision:
 
            _rev_info = self._landing_revision.split(':')
 
            if len(_rev_info) < 2:
 
                _rev_info.insert(0, 'rev')
 
            return [_rev_info[0], _rev_info[1]]
 
        return [None, None]
 

	
 
    @landing_rev.setter
 
    def landing_rev(self, val):
 
        if ':' not in val:
 
            raise ValueError('value must be delimited with `:` and consist '
 
                             'of <rev_type>:<rev>, got %s instead' % val)
 
        self._landing_revision = val
 

	
 
    @hybrid_property
 
    def locked(self):
 
        # always should return [user_id, timelocked]
 
        if self._locked:
 
            _lock_info = self._locked.split(':')
 
            return int(_lock_info[0]), _lock_info[1]
 
        return [None, None]
 

	
 
    @locked.setter
 
    def locked(self, val):
 
        if val and isinstance(val, (list, tuple)):
 
            self._locked = ':'.join(map(str, val))
 
        else:
 
            self._locked = None
 

	
 
    @hybrid_property
 
    def changeset_cache(self):
 
        try:
 
            cs_cache = json.loads(self._changeset_cache) # might raise on bad data
 
            cs_cache['raw_id'] # verify data, raise exception on error
 
            return cs_cache
 
        except (TypeError, KeyError, ValueError):
 
            return EmptyChangeset().__json__()
 

	
 
    @changeset_cache.setter
 
    def changeset_cache(self, val):
 
        try:
 
            self._changeset_cache = json.dumps(val)
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    @classmethod
 
    def query(cls, sorted=False):
 
        """Add Repository-specific helpers for common query constructs.
 

	
 
        sorted: if True, apply the default ordering (name, case insensitive).
 
        """
 
        q = super(Repository, cls).query()
 

	
 
        if sorted:
 
            q = q.order_by(func.lower(Repository.repo_name))
 

	
 
        return q
 

	
 
    @classmethod
 
    def url_sep(cls):
 
        return URL_SEP
 

	
 
    @classmethod
 
    def normalize_repo_name(cls, repo_name):
 
        """
 
        Normalizes os specific repo_name to the format internally stored inside
 
        database using URL_SEP
 

	
 
        :param cls:
 
        :param repo_name:
 
        """
 
        return cls.url_sep().join(repo_name.split(os.sep))
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(Repository, cls).guess_instance(value, Repository.get_by_repo_name)
 

	
 
    @classmethod
 
    def get_by_repo_name(cls, repo_name, case_insensitive=False):
 
        """Get the repo, defaulting to database case sensitivity.
 
        case_insensitive will be slower and should only be specified if necessary."""
 
        if case_insensitive:
 
            q = Session().query(cls).filter(func.lower(cls.repo_name) == func.lower(repo_name))
 
        else:
 
            q = Session().query(cls).filter(cls.repo_name == repo_name)
 
        q = q.options(joinedload(Repository.fork)) \
 
                .options(joinedload(Repository.owner)) \
 
                .options(joinedload(Repository.group))
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get_by_full_path(cls, repo_full_path):
 
        base_full_path = os.path.realpath(cls.base_path())
 
        repo_full_path = os.path.realpath(repo_full_path)
 
        assert repo_full_path.startswith(base_full_path + os.path.sep)
 
        repo_name = repo_full_path[len(base_full_path) + 1:]
 
        repo_name = cls.normalize_repo_name(repo_name)
 
        return cls.get_by_repo_name(repo_name.strip(URL_SEP))
 

	
 
    @classmethod
 
    def get_repo_forks(cls, repo_id):
 
        return cls.query().filter(Repository.fork_id == repo_id)
 

	
 
    @classmethod
 
    def base_path(cls):
 
        """
 
        Returns base path where all repos are stored
 

	
 
        :param cls:
 
        """
 
        q = Session().query(Ui) \
 
            .filter(Ui.ui_key == cls.url_sep())
 
        q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
 
        return q.one().ui_value
 

	
 
    @property
 
    def forks(self):
 
        """
 
        Return forks of this repo
 
        """
 
        return Repository.get_repo_forks(self.repo_id)
 

	
 
    @property
 
    def parent(self):
 
        """
 
        Returns fork parent
 
        """
 
        return self.fork
 

	
 
    @property
 
    def just_name(self):
 
        return self.repo_name.split(Repository.url_sep())[-1]
 

	
 
    @property
 
    def groups_with_parents(self):
 
        groups = []
 
        group = self.group
 
        while group is not None:
 
            groups.append(group)
 
            group = group.parent_group
 
            assert group not in groups, group # avoid recursion on bad db content
 
        groups.reverse()
 
        return groups
 

	
 
    @LazyProperty
 
    def repo_path(self):
 
        """
 
        Returns base full path for that repository means where it actually
 
        exists on a filesystem
 
        """
 
        q = Session().query(Ui).filter(Ui.ui_key ==
 
                                              Repository.url_sep())
 
        q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
 
        return q.one().ui_value
 

	
 
    @property
 
    def repo_full_path(self):
 
        p = [self.repo_path]
 
        # we need to split the name by / since this is how we store the
 
        # names in the database, but that eventually needs to be converted
 
        # into a valid system path
 
        p += self.repo_name.split(Repository.url_sep())
 
        return os.path.join(*map(safe_unicode, p))
 

	
 
    @property
 
    def cache_keys(self):
 
        """
 
        Returns associated cache keys for that repo
 
        """
 
        return CacheInvalidation.query() \
 
            .filter(CacheInvalidation.cache_args == self.repo_name) \
 
            .order_by(CacheInvalidation.cache_key) \
 
            .all()
 

	
 
    def get_new_name(self, repo_name):
 
        """
 
        returns new full repository name based on assigned group and new new
 

	
 
        :param group_name:
 
        """
 
        path_prefix = self.group.full_path_splitted if self.group else []
 
        return Repository.url_sep().join(path_prefix + [repo_name])
 

	
 
    @property
 
    def _ui(self):
 
        """
 
        Creates an db based ui object for this repository
 
        """
 
        from kallithea.lib.utils import make_ui
 
        return make_ui('db', clear_session=False)
 

	
 
    @classmethod
 
    def is_valid(cls, repo_name):
 
        """
 
        returns True if given repo name is a valid filesystem repository
 

	
 
        :param cls:
 
        :param repo_name:
 
        """
 
        from kallithea.lib.utils import is_valid_repo
 

	
 
        return is_valid_repo(repo_name, cls.base_path())
 

	
 
    def get_api_data(self, with_revision_names=False,
 
                           with_pullrequests=False):
 
        """
 
        Common function for generating repo api data.
 
        Optionally, also return tags, branches, bookmarks and PRs.
 
        """
 
        repo = self
 
        data = dict(
 
            repo_id=repo.repo_id,
 
            repo_name=repo.repo_name,
 
            repo_type=repo.repo_type,
 
            clone_uri=repo.clone_uri,
 
            private=repo.private,
 
            created_on=repo.created_on,
 
            description=repo.description,
 
            landing_rev=repo.landing_rev,
 
            owner=repo.owner.username,
 
            fork_of=repo.fork.repo_name if repo.fork else None,
 
            enable_statistics=repo.enable_statistics,
 
            enable_locking=repo.enable_locking,
 
            enable_downloads=repo.enable_downloads,
 
            last_changeset=repo.changeset_cache,
 
            locked_by=User.get(self.locked[0]).get_api_data() \
 
                if self.locked[0] else None,
 
            locked_date=time_to_datetime(self.locked[1]) \
 
                if self.locked[1] else None
 
        )
 
        if with_revision_names:
 
            scm_repo = repo.scm_instance_no_cache()
 
            data.update(dict(
 
                tags=scm_repo.tags,
 
                branches=scm_repo.branches,
 
                bookmarks=scm_repo.bookmarks,
 
            ))
 
        if with_pullrequests:
 
            data['pull_requests'] = repo.pull_requests_other
 
        rc_config = Setting.get_app_settings()
 
        repository_fields = str2bool(rc_config.get('repository_fields'))
 
        if repository_fields:
 
            for f in self.extra_fields:
 
                data[f.field_key_prefixed] = f.field_value
 

	
 
        return data
 

	
 
    @classmethod
 
    def lock(cls, repo, user_id, lock_time=None):
 
        if lock_time is not None:
 
            lock_time = time.time()
 
        repo.locked = [user_id, lock_time]
 
        Session().commit()
 

	
 
    @classmethod
 
    def unlock(cls, repo):
 
        repo.locked = None
 
        Session().commit()
 

	
 
    @classmethod
 
    def getlock(cls, repo):
0 comments (0 inline, 0 general)