Changeset - 0456028c4ffe
[Not reviewed]
default
0 9 0
Mads Kiilerich - 6 years ago 2020-04-12 02:43:07
mads@kiilerich.com
Grafted from: 030f092b2f16
db: introduce kallithea.DEFAULT_USER_ID to avoid repeated get_default_user()
9 files changed with 29 insertions and 24 deletions:
0 comments (0 inline, 0 general)
kallithea/config/app_cfg.py
Show inline comments
 
@@ -21,48 +21,49 @@ import logging
 
import os
 
import platform
 
import sys
 

	
 
import alembic.config
 
import mercurial
 
import tg
 
from alembic.migration import MigrationContext
 
from alembic.script.base import ScriptDirectory
 
from sqlalchemy import create_engine
 
from tg.configuration import AppConfig
 
from tg.support.converters import asbool
 

	
 
import kallithea.lib.locale
 
import kallithea.model.base
 
import kallithea.model.meta
 
from kallithea.lib import celerypylons
 
from kallithea.lib.middleware.https_fixup import HttpsFixup
 
from kallithea.lib.middleware.permanent_repo_url import PermanentRepoUrl
 
from kallithea.lib.middleware.simplegit import SimpleGit
 
from kallithea.lib.middleware.simplehg import SimpleHg
 
from kallithea.lib.middleware.wrapper import RequestWrapper
 
from kallithea.lib.utils import check_git_version, load_rcextensions, set_app_settings, set_indexer_config, set_vcs_config
 
from kallithea.lib.utils2 import str2bool
 
from kallithea.model import db
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class KallitheaAppConfig(AppConfig):
 
    # Note: AppConfig has a misleading name, as it's not the application
 
    # configuration, but the application configurator. The AppConfig values are
 
    # used as a template to create the actual configuration, which might
 
    # overwrite or extend the one provided by the configurator template.
 

	
 
    # To make it clear, AppConfig creates the config and sets into it the same
 
    # values that AppConfig itself has. Then the values from the config file and
 
    # gearbox options are loaded and merged into the configuration. Then an
 
    # after_init_config(conf) method of AppConfig is called for any change that
 
    # might depend on options provided by configuration files.
 

	
 
    def __init__(self):
 
        super(KallitheaAppConfig, self).__init__()
 

	
 
        self['package'] = kallithea
 

	
 
        self['prefer_toscawidgets2'] = False
 
        self['use_toscawidgets'] = False
 
@@ -136,48 +137,50 @@ def setup_configuration(app):
 
        log.warn('database alembic revision checking is disabled')
 
    else:
 
        dbconf = config['sqlalchemy.url']
 
        alembic_cfg = alembic.config.Config()
 
        alembic_cfg.set_main_option('script_location', 'kallithea:alembic')
 
        alembic_cfg.set_main_option('sqlalchemy.url', dbconf)
 
        script_dir = ScriptDirectory.from_config(alembic_cfg)
 
        available_heads = sorted(script_dir.get_heads())
 

	
 
        engine = create_engine(dbconf)
 
        with engine.connect() as conn:
 
            context = MigrationContext.configure(conn)
 
            current_heads = sorted(str(s) for s in context.get_current_heads())
 
        if current_heads != available_heads:
 
            log.error('Failed to run Kallithea:\n\n'
 
                      'The database version does not match the Kallithea version.\n'
 
                      'Please read the documentation on how to upgrade or downgrade the database.\n'
 
                      'Current database version id(s): %s\n'
 
                      'Expected database version id(s): %s\n'
 
                      'If you are a developer and you know what you are doing, you can add `ignore_alembic_revision = True` '
 
                      'to your .ini file to skip the check.\n' % (' '.join(current_heads), ' '.join(available_heads)))
 
            sys.exit(1)
 

	
 
    # store some globals into kallithea
 
    kallithea.DEFAULT_USER_ID = db.User.get_default_user().user_id
 

	
 
    if str2bool(config.get('use_celery')):
 
        kallithea.CELERY_APP = celerypylons.make_app()
 
    kallithea.CONFIG = config
 

	
 
    load_rcextensions(root_path=config['here'])
 

	
 
    set_app_settings(config)
 

	
 
    instance_id = kallithea.CONFIG.get('instance_id', '*')
 
    if instance_id == '*':
 
        instance_id = '%s-%s' % (platform.uname()[1], os.getpid())
 
        kallithea.CONFIG['instance_id'] = instance_id
 

	
 
    # update kallithea.CONFIG with the meanwhile changed 'config'
 
    kallithea.CONFIG.update(config)
 

	
 
    # configure vcs and indexer libraries (they are supposed to be independent
 
    # as much as possible and thus avoid importing tg.config or
 
    # kallithea.CONFIG).
 
    set_vcs_config(kallithea.CONFIG)
 
    set_indexer_config(kallithea.CONFIG)
 

	
 
    check_git_version()
 

	
kallithea/controllers/admin/repos.py
Show inline comments
 
@@ -24,49 +24,49 @@ Original author and date, and relevant c
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
import celery.result
 
import formencode
 
from formencode import htmlfill
 
from tg import request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPForbidden, HTTPFound, HTTPInternalServerError, HTTPNotFound
 

	
 
import kallithea
 
from kallithea.config.routing import url
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasPermissionAny, HasRepoPermissionLevelDecorator, LoginRequired, NotAnonymous
 
from kallithea.lib.base import BaseRepoController, jsonify, render
 
from kallithea.lib.exceptions import AttachedForksError
 
from kallithea.lib.utils import action_logger
 
from kallithea.lib.utils2 import safe_int
 
from kallithea.lib.vcs import RepositoryError
 
from kallithea.model.db import RepoGroup, Repository, RepositoryField, Setting, User, UserFollowing
 
from kallithea.model.db import RepoGroup, Repository, RepositoryField, Setting, UserFollowing
 
from kallithea.model.forms import RepoFieldForm, RepoForm, RepoPermsForm
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.scm import AvailableRepoGroupChoices, RepoList, ScmModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ReposController(BaseRepoController):
 
    """
 
    REST Controller styled on the Atom Publishing Protocol"""
 
    # To properly map this controller, ensure your config/routing.py
 
    # file has a resource setup:
 
    #     map.resource('repo', 'repos')
 

	
 
    @LoginRequired(allow_default_user=True)
 
    def _before(self, *args, **kwargs):
 
        super(ReposController, self)._before(*args, **kwargs)
 

	
 
    def _load_repo(self):
 
        repo_obj = c.db_repo
 

	
 
        if repo_obj is None:
 
@@ -384,86 +384,86 @@ class ReposController(BaseRepoController
 
            Session().add(new_field)
 
            Session().commit()
 
        except formencode.Invalid as e:
 
            h.flash(_('Field validation error: %s') % e.msg, category='error')
 
        except Exception as e:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during creation of field: %r') % e, category='error')
 
        raise HTTPFound(location=url('edit_repo_fields', repo_name=repo_name))
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def delete_repo_field(self, repo_name, field_id):
 
        field = RepositoryField.get_or_404(field_id)
 
        try:
 
            Session().delete(field)
 
            Session().commit()
 
        except Exception as e:
 
            log.error(traceback.format_exc())
 
            msg = _('An error occurred during removal of field')
 
            h.flash(msg, category='error')
 
        raise HTTPFound(location=url('edit_repo_fields', repo_name=repo_name))
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def edit_advanced(self, repo_name):
 
        c.repo_info = self._load_repo()
 
        c.default_user_id = User.get_default_user().user_id
 
        c.default_user_id = kallithea.DEFAULT_USER_ID
 
        c.in_public_journal = UserFollowing.query() \
 
            .filter(UserFollowing.user_id == c.default_user_id) \
 
            .filter(UserFollowing.follows_repository == c.repo_info).scalar()
 

	
 
        _repos = Repository.query(sorted=True).all()
 
        read_access_repos = RepoList(_repos, perm_level='read')
 
        c.repos_list = [(None, _('-- Not a fork --'))]
 
        c.repos_list += [(x.repo_id, x.repo_name)
 
                         for x in read_access_repos
 
                         if x.repo_id != c.repo_info.repo_id
 
                         and x.repo_type == c.repo_info.repo_type]
 

	
 
        defaults = {
 
            'id_fork_of': c.repo_info.fork_id if c.repo_info.fork_id else ''
 
        }
 

	
 
        c.active = 'advanced'
 
        if request.POST:
 
            raise HTTPFound(location=url('repo_edit_advanced'))
 
        return htmlfill.render(
 
            render('admin/repos/repo_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def edit_advanced_journal(self, repo_name):
 
        """
 
        Sets this repository to be visible in public journal,
 
        in other words asking default user to follow this repo
 

	
 
        :param repo_name:
 
        """
 

	
 
        try:
 
            repo_id = Repository.get_by_repo_name(repo_name).repo_id
 
            user_id = User.get_default_user().user_id
 
            user_id = kallithea.DEFAULT_USER_ID
 
            self.scm_model.toggle_following_repo(repo_id, user_id)
 
            h.flash(_('Updated repository visibility in public journal'),
 
                    category='success')
 
            Session().commit()
 
        except Exception:
 
            h.flash(_('An error occurred during setting this'
 
                      ' repository in public journal'),
 
                    category='error')
 
        raise HTTPFound(location=url('edit_repo_advanced', repo_name=repo_name))
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def edit_advanced_fork(self, repo_name):
 
        """
 
        Mark given repository as a fork of another
 

	
 
        :param repo_name:
 
        """
 
        try:
 
            fork_id = request.POST.get('id_fork_of')
 
            repo = ScmModel().mark_as_fork(repo_name, fork_id,
 
                                           request.authuser.username)
 
            fork = repo.fork.repo_name if repo.fork else _('Nothing')
 
            Session().commit()
 
            h.flash(_('Marked repository %s as fork of %s') % (repo_name, fork),
kallithea/controllers/admin/users.py
Show inline comments
 
@@ -15,48 +15,49 @@
 
kallithea.controllers.admin.users
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Users crud controller
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 4, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
import formencode
 
from formencode import htmlfill
 
from sqlalchemy.sql.expression import func
 
from tg import app_globals, request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPFound, HTTPNotFound
 

	
 
import kallithea
 
from kallithea.config.routing import url
 
from kallithea.lib import auth_modules
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import AuthUser, HasPermissionAnyDecorator, LoginRequired
 
from kallithea.lib.base import BaseController, IfSshEnabled, render
 
from kallithea.lib.exceptions import DefaultUserException, UserCreationError, UserOwnsReposException
 
from kallithea.lib.utils import action_logger
 
from kallithea.lib.utils2 import datetime_to_time, generate_api_key, safe_int
 
from kallithea.model.api_key import ApiKeyModel
 
from kallithea.model.db import User, UserEmailMap, UserIpMap, UserToPerm
 
from kallithea.model.forms import CustomDefaultPermissionsForm, UserForm
 
from kallithea.model.meta import Session
 
from kallithea.model.ssh_key import SshKeyModel, SshKeyModelException
 
from kallithea.model.user import UserModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UsersController(BaseController):
 
    """REST Controller styled on the Atom Publishing Protocol"""
 

	
 
    @LoginRequired()
 
    @HasPermissionAnyDecorator('hg.admin')
 
@@ -366,49 +367,49 @@ class UsersController(BaseController):
 
            msg = error.error_dict['email']
 
            h.flash(msg, category='error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during email saving'),
 
                    category='error')
 
        raise HTTPFound(location=url('edit_user_emails', id=id))
 

	
 
    def delete_email(self, id):
 
        user = self._get_user_or_raise_if_default(id)
 
        email_id = request.POST.get('del_email_id')
 
        user_model = UserModel()
 
        user_model.delete_extra_email(id, email_id)
 
        Session().commit()
 
        h.flash(_("Removed email from user"), category='success')
 
        raise HTTPFound(location=url('edit_user_emails', id=id))
 

	
 
    def edit_ips(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'ips'
 
        c.user_ip_map = UserIpMap.query() \
 
            .filter(UserIpMap.user == c.user).all()
 

	
 
        c.default_user_ip_map = UserIpMap.query() \
 
            .filter(UserIpMap.user == User.get_default_user()).all()
 
            .filter(UserIpMap.user_id == kallithea.DEFAULT_USER_ID).all()
 

	
 
        defaults = c.user.get_dict()
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def add_ip(self, id):
 
        ip = request.POST.get('new_ip')
 
        user_model = UserModel()
 

	
 
        try:
 
            user_model.add_extra_ip(id, ip)
 
            Session().commit()
 
            h.flash(_("Added IP address %s to user whitelist") % ip, category='success')
 
        except formencode.Invalid as error:
 
            msg = error.error_dict['ip']
 
            h.flash(msg, category='error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred while adding IP address'),
 
                    category='error')
 

	
kallithea/controllers/forks.py
Show inline comments
 
@@ -14,90 +14,91 @@
 
"""
 
kallithea.controllers.forks
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
forks controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 23, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
import formencode
 
from formencode import htmlfill
 
from tg import request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPFound
 

	
 
import kallithea
 
import kallithea.lib.helpers as h
 
from kallithea.config.routing import url
 
from kallithea.lib.auth import HasPermissionAny, HasPermissionAnyDecorator, HasRepoPermissionLevel, HasRepoPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.base import BaseRepoController, render
 
from kallithea.lib.page import Page
 
from kallithea.lib.utils2 import safe_int
 
from kallithea.model.db import Repository, Ui, User, UserFollowing
 
from kallithea.model.db import Repository, Ui, UserFollowing
 
from kallithea.model.forms import RepoForkForm
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.scm import AvailableRepoGroupChoices, ScmModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ForksController(BaseRepoController):
 

	
 
    def __load_defaults(self):
 
        if HasPermissionAny('hg.create.write_on_repogroup.true')():
 
            repo_group_perm_level = 'write'
 
        else:
 
            repo_group_perm_level = 'admin'
 
        c.repo_groups = AvailableRepoGroupChoices(['hg.create.repository'], repo_group_perm_level)
 

	
 
        c.landing_revs_choices, c.landing_revs = ScmModel().get_repo_landing_revs()
 

	
 
        c.can_update = Ui.get_by_key('hooks', Ui.HOOK_UPDATE).ui_active
 

	
 
    def __load_data(self):
 
        """
 
        Load defaults settings for edit, and update
 
        """
 
        self.__load_defaults()
 

	
 
        c.repo_info = c.db_repo
 
        repo = c.db_repo.scm_instance
 

	
 
        if c.repo_info is None:
 
            h.not_mapped_error(c.repo_name)
 
            raise HTTPFound(location=url('repos'))
 

	
 
        c.default_user_id = User.get_default_user().user_id
 
        c.default_user_id = kallithea.DEFAULT_USER_ID
 
        c.in_public_journal = UserFollowing.query() \
 
            .filter(UserFollowing.user_id == c.default_user_id) \
 
            .filter(UserFollowing.follows_repository == c.repo_info).scalar()
 

	
 
        if c.repo_info.stats:
 
            last_rev = c.repo_info.stats.stat_on_revision + 1
 
        else:
 
            last_rev = 0
 
        c.stats_revision = last_rev
 

	
 
        c.repo_last_rev = repo.count() if repo.revisions else 0
 

	
 
        if last_rev == 0 or c.repo_last_rev == 0:
 
            c.stats_percentage = 0
 
        else:
 
            c.stats_percentage = '%.2f' % ((float((last_rev)) /
 
                                            c.repo_last_rev) * 100)
 

	
 
        defaults = RepoModel()._get_defaults(c.repo_name)
 
        # alter the description to indicate a fork
 
        defaults['description'] = ('fork of repository: %s \n%s'
 
                                   % (defaults['repo_name'],
 
                                      defaults['description']))
 
        # add suffix to fork
kallithea/lib/auth.py
Show inline comments
 
@@ -18,53 +18,54 @@ kallithea.lib.auth
 
authentication and permission libraries
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 4, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 
import hashlib
 
import itertools
 
import logging
 
import os
 
import string
 

	
 
import bcrypt
 
import ipaddr
 
from decorator import decorator
 
from sqlalchemy.orm import joinedload
 
from sqlalchemy.orm.exc import ObjectDeletedError
 
from tg import request
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPForbidden, HTTPFound
 

	
 
import kallithea
 
from kallithea.config.routing import url
 
from kallithea.lib.utils import get_repo_group_slug, get_repo_slug, get_user_group_slug
 
from kallithea.lib.utils2 import ascii_bytes, ascii_str, safe_bytes
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.model.db import (Permission, User, UserApiKeys, UserGroup, UserGroupMember, UserGroupRepoGroupToPerm, UserGroupRepoToPerm, UserGroupToPerm,
 
from kallithea.model.db import (Permission, UserApiKeys, UserGroup, UserGroupMember, UserGroupRepoGroupToPerm, UserGroupRepoToPerm, UserGroupToPerm,
 
                                UserGroupUserGroupToPerm, UserIpMap, UserToPerm)
 
from kallithea.model.meta import Session
 
from kallithea.model.user import UserModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class PasswordGenerator(object):
 
    """
 
    This is a simple class for generating password from different sets of
 
    characters
 
    usage::
 

	
 
        passwd_gen = PasswordGenerator()
 
        #print 8-letter password containing only big and small letters
 
            of alphabet
 
        passwd_gen.gen_password(8, passwd_gen.ALPHABETS_BIG_SMALL)
 
    """
 
    ALPHABETS_NUM = r'''1234567890'''
 
    ALPHABETS_SMALL = r'''qwertyuiopasdfghjklzxcvbnm'''
 
    ALPHABETS_BIG = r'''QWERTYUIOPASDFGHJKLZXCVBNM'''
 
    ALPHABETS_SPECIAL = r'''`-=[]\;',./~!@#$%^&*()_+{}|:"<>?'''
 
    ALPHABETS_FULL = ALPHABETS_BIG + ALPHABETS_SMALL \
 
@@ -117,89 +118,86 @@ def check_password(password, hashed):
 

	
 

	
 
def _cached_perms_data(user_id, user_is_admin):
 
    RK = 'repositories'
 
    GK = 'repositories_groups'
 
    UK = 'user_groups'
 
    GLOBAL = 'global'
 
    PERM_WEIGHTS = Permission.PERM_WEIGHTS
 
    permissions = {RK: {}, GK: {}, UK: {}, GLOBAL: set()}
 

	
 
    def bump_permission(kind, key, new_perm):
 
        """Add a new permission for kind and key.
 
        Assuming the permissions are comparable, set the new permission if it
 
        has higher weight, else drop it and keep the old permission.
 
        """
 
        cur_perm = permissions[kind][key]
 
        new_perm_val = PERM_WEIGHTS[new_perm]
 
        cur_perm_val = PERM_WEIGHTS[cur_perm]
 
        if new_perm_val > cur_perm_val:
 
            permissions[kind][key] = new_perm
 

	
 
    #======================================================================
 
    # fetch default permissions
 
    #======================================================================
 
    default_user = User.get_by_username('default')
 
    default_user_id = default_user.user_id
 

	
 
    default_repo_perms = Permission.get_default_perms(default_user_id)
 
    default_repo_groups_perms = Permission.get_default_group_perms(default_user_id)
 
    default_user_group_perms = Permission.get_default_user_group_perms(default_user_id)
 
    default_repo_perms = Permission.get_default_perms(kallithea.DEFAULT_USER_ID)
 
    default_repo_groups_perms = Permission.get_default_group_perms(kallithea.DEFAULT_USER_ID)
 
    default_user_group_perms = Permission.get_default_user_group_perms(kallithea.DEFAULT_USER_ID)
 

	
 
    if user_is_admin:
 
        #==================================================================
 
        # admin users have all rights;
 
        # based on default permissions, just set everything to admin
 
        #==================================================================
 
        permissions[GLOBAL].add('hg.admin')
 
        permissions[GLOBAL].add('hg.create.write_on_repogroup.true')
 

	
 
        # repositories
 
        for perm in default_repo_perms:
 
            r_k = perm.repository.repo_name
 
            p = 'repository.admin'
 
            permissions[RK][r_k] = p
 

	
 
        # repository groups
 
        for perm in default_repo_groups_perms:
 
            rg_k = perm.group.group_name
 
            p = 'group.admin'
 
            permissions[GK][rg_k] = p
 

	
 
        # user groups
 
        for perm in default_user_group_perms:
 
            u_k = perm.user_group.users_group_name
 
            p = 'usergroup.admin'
 
            permissions[UK][u_k] = p
 
        return permissions
 

	
 
    #==================================================================
 
    # SET DEFAULTS GLOBAL, REPOS, REPOSITORY GROUPS
 
    #==================================================================
 

	
 
    # default global permissions taken from the default user
 
    default_global_perms = UserToPerm.query() \
 
        .filter(UserToPerm.user_id == default_user_id) \
 
        .filter(UserToPerm.user_id == kallithea.DEFAULT_USER_ID) \
 
        .options(joinedload(UserToPerm.permission))
 

	
 
    for perm in default_global_perms:
 
        permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
    # defaults for repositories, taken from default user
 
    for perm in default_repo_perms:
 
        r_k = perm.repository.repo_name
 
        if perm.repository.owner_id == user_id:
 
            p = 'repository.admin'
 
        elif perm.repository.private:
 
            p = 'repository.none'
 
        else:
 
            p = perm.permission.permission_name
 
        permissions[RK][r_k] = p
 

	
 
    # defaults for repository groups taken from default user permission
 
    # on given group
 
    for perm in default_repo_groups_perms:
 
        rg_k = perm.group.group_name
 
        p = perm.permission.permission_name
 
        permissions[GK][rg_k] = p
 

	
 
    # defaults for user groups taken from default user permission
 
@@ -523,50 +521,49 @@ class AuthUser(object):
 
        return "<%s %s: %r>" % (self.__class__.__name__, self.user_id, self.username)
 

	
 
    def to_cookie(self):
 
        """ Serializes this login session to a cookie `dict`. """
 
        return {
 
            'user_id': self.user_id,
 
            'is_external_auth': self.is_external_auth,
 
        }
 

	
 
    @staticmethod
 
    def from_cookie(cookie, ip_addr):
 
        """
 
        Deserializes an `AuthUser` from a cookie `dict` ... or return None.
 
        """
 
        return AuthUser.make(
 
            dbuser=UserModel().get(cookie.get('user_id')),
 
            is_external_auth=cookie.get('is_external_auth', False),
 
            ip_addr=ip_addr,
 
        )
 

	
 
    @classmethod
 
    def get_allowed_ips(cls, user_id):
 
        _set = set()
 

	
 
        default_ips = UserIpMap.query().filter(UserIpMap.user_id ==
 
                                        User.get_default_user().user_id)
 
        default_ips = UserIpMap.query().filter(UserIpMap.user_id == kallithea.DEFAULT_USER_ID)
 
        for ip in default_ips:
 
            try:
 
                _set.add(ip.ip_addr)
 
            except ObjectDeletedError:
 
                # since we use heavy caching sometimes it happens that we get
 
                # deleted objects here, we just skip them
 
                pass
 

	
 
        user_ips = UserIpMap.query().filter(UserIpMap.user_id == user_id)
 
        for ip in user_ips:
 
            try:
 
                _set.add(ip.ip_addr)
 
            except ObjectDeletedError:
 
                # since we use heavy caching sometimes it happens that we get
 
                # deleted objects here, we just skip them
 
                pass
 
        return _set or set(['0.0.0.0/0', '::/0'])
 

	
 

	
 
#==============================================================================
 
# CHECK DECORATORS
 
#==============================================================================
 

	
 
def _redirect_to_login(message=None):
kallithea/tests/conftest.py
Show inline comments
 
@@ -124,49 +124,49 @@ def set_test_settings():
 
    yield _set_settings
 
    # Restore settings.
 
    session = Session()
 
    keys = frozenset(k for (k, v, t) in settings_snapshot)
 
    for s in Setting.query().all():
 
        if s.app_settings_name not in keys:
 
            session.delete(s)
 
    for k, v, t in settings_snapshot:
 
        if t == 'list' and hasattr(v, '__iter__'):
 
            v = ','.join(v) # Quirk: must format list value manually.
 
        Setting.create_or_update(k, v, t)
 
    session.commit()
 

	
 

	
 
@pytest.fixture
 
def auto_clear_ip_permissions():
 
    """Fixture that provides nothing but clearing IP permissions upon test
 
    exit. This clearing is needed to avoid other test failing to make fake http
 
    accesses."""
 
    yield
 
    # cleanup
 
    user_model = UserModel()
 

	
 
    user_ids = []
 
    user_ids.append(User.get_default_user().user_id)
 
    user_ids.append(kallithea.DEFAULT_USER_ID)
 
    user_ids.append(User.get_by_username(TEST_USER_REGULAR_LOGIN).user_id)
 

	
 
    for user_id in user_ids:
 
        for ip in UserIpMap.query().filter(UserIpMap.user_id == user_id):
 
            user_model.delete_extra_ip(user_id, ip.ip_id)
 

	
 
    # IP permissions are cached, need to invalidate this cache explicitly
 
    invalidate_all_caches()
 
    session = Session()
 
    session.commit()
 

	
 

	
 
@pytest.fixture
 
def test_context_fixture(app_fixture):
 
    """
 
    Encompass the entire test using this fixture in a test_context,
 
    making sure that certain functionality still works even if no call to
 
    self.app.get/post has been made.
 
    The typical error message indicating you need a test_context is:
 
        TypeError: No object (name: context) has been registered for this thread
 

	
 
    The standard way to fix this is simply using the test_context context
 
    manager directly inside your test:
 
        with test_context(self.app):
kallithea/tests/functional/test_admin_permissions.py
Show inline comments
 
import kallithea
 
from kallithea.model.db import User, UserIpMap
 
from kallithea.tests import base
 

	
 

	
 
class TestAdminPermissionsController(base.TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(base.url('admin_permissions'))
 
        # Test response...
 

	
 
    def test_index_ips(self):
 
        self.log_user()
 
        response = self.app.get(base.url('admin_permissions_ips'))
 
        # Test response...
 
        response.mustcontain('All IP addresses are allowed')
 

	
 
    def test_add_delete_ips(self, auto_clear_ip_permissions):
 
        self.log_user()
 
        default_user_id = User.get_default_user().user_id
 
        default_user_id = kallithea.DEFAULT_USER_ID
 

	
 
        # Add IP and verify it is shown in UI and both gives access and rejects
 

	
 
        response = self.app.post(base.url('edit_user_ips_update', id=default_user_id),
 
                                 params=dict(new_ip='0.0.0.0/24',
 
                                 _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        base.invalidate_all_caches()
 
        response = self.app.get(base.url('admin_permissions_ips'),
 
                                extra_environ={'REMOTE_ADDR': '0.0.0.1'})
 
        response.mustcontain('0.0.0.0/24')
 
        response.mustcontain('0.0.0.0 - 0.0.0.255')
 

	
 
        response = self.app.get(base.url('admin_permissions_ips'),
 
                                extra_environ={'REMOTE_ADDR': '0.0.1.1'}, status=403)
 

	
 
        # Add another IP and verify previously rejected now works
 

	
 
        response = self.app.post(base.url('edit_user_ips_update', id=default_user_id),
 
                                 params=dict(new_ip='0.0.1.0/24',
 
                                 _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        base.invalidate_all_caches()
 

	
 
        response = self.app.get(base.url('admin_permissions_ips'),
 
                                extra_environ={'REMOTE_ADDR': '0.0.1.1'})
kallithea/tests/functional/test_admin_users.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import pytest
 
from sqlalchemy.orm.exc import NoResultFound
 
from tg.util.webtest import test_context
 
from webob.exc import HTTPNotFound
 

	
 
import kallithea
 
from kallithea.controllers.admin.users import UsersController
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import check_password
 
from kallithea.model import validators
 
from kallithea.model.db import Permission, RepoGroup, User, UserApiKeys, UserSshKeys
 
from kallithea.model.meta import Session
 
from kallithea.model.user import UserModel
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
@pytest.fixture
 
def user_and_repo_group_fail():
 
    username = 'repogrouperr'
 
    groupname = 'repogroup_fail'
 
    user = fixture.create_user(name=username)
 
    repo_group = fixture.create_repo_group(name=groupname, cur_user=username)
 
    yield user, repo_group
 
    # cleanup
 
    if RepoGroup.get_by_group_name(groupname):
 
        fixture.destroy_repo_group(repo_group)
 
@@ -557,49 +558,49 @@ class TestAdminUsersController(base.Test
 

	
 
        response = self.app.post(base.url('edit_user_ssh_keys_delete', id=user_id),
 
                                 {'del_public_key_fingerprint': ssh_key.fingerprint,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key successfully deleted')
 
        keys = UserSshKeys.query().all()
 
        assert 0 == len(keys)
 

	
 

	
 
class TestAdminUsersController_unittest(base.TestController):
 
    """ Unit tests for the users controller """
 

	
 
    def test_get_user_or_raise_if_default(self, monkeypatch, test_context_fixture):
 
        # flash complains about an non-existing session
 
        def flash_mock(*args, **kwargs):
 
            pass
 
        monkeypatch.setattr(h, 'flash', flash_mock)
 

	
 
        u = UsersController()
 
        # a regular user should work correctly
 
        user = User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
 
        assert u._get_user_or_raise_if_default(user.user_id) == user
 
        # the default user should raise
 
        with pytest.raises(HTTPNotFound):
 
            u._get_user_or_raise_if_default(User.get_default_user().user_id)
 
            u._get_user_or_raise_if_default(kallithea.DEFAULT_USER_ID)
 

	
 

	
 
class TestAdminUsersControllerForDefaultUser(base.TestController):
 
    """
 
    Edit actions on the default user are not allowed.
 
    Validate that they throw a 404 exception.
 
    """
 
    def test_edit_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(base.url('edit_user', id=user.user_id), status=404)
 

	
 
    def test_edit_advanced_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(base.url('edit_user_advanced', id=user.user_id), status=404)
 

	
 
    # API keys
 
    def test_edit_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(base.url('edit_user_api_keys', id=user.user_id), status=404)
 

	
 
    def test_add_api_keys_default_user(self):
kallithea/tests/models/test_user_permissions_on_repo_groups.py
Show inline comments
 
import functools
 

	
 
from kallithea.model.db import RepoGroup, Repository, User
 
import kallithea
 
from kallithea.model.db import RepoGroup, Repository
 
from kallithea.model.meta import Session
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.tests.models.common import _check_expected_count, _create_project_tree, _destroy_project_tree, _get_perms, check_tree_perms, expected_count
 

	
 

	
 
test_u1_id = None
 
_get_repo_perms = None
 
_get_group_perms = None
 

	
 

	
 
def permissions_setup_func(group_name='g0', perm='group.read', recursive='all',
 
                           user_id=None):
 
    """
 
    Resets all permissions to perm attribute
 
    """
 
    if not user_id:
 
        user_id = test_u1_id
 
        permissions_setup_func(group_name, perm, recursive,
 
                               user_id=User.get_default_user().user_id)
 
                               user_id=kallithea.DEFAULT_USER_ID)
 

	
 
    repo_group = RepoGroup.get_by_group_name(group_name=group_name)
 
    if not repo_group:
 
        raise Exception('Cannot get group %s' % group_name)
 

	
 
    # Start with a baseline that current group can read recursive
 
    perms_updates = [[user_id, 'group.read', 'user']]
 
    RepoGroupModel()._update_permissions(repo_group,
 
                                         perms_updates=perms_updates,
 
                                         recursive='all', check_perms=False)
 

	
 
    perms_updates = [[user_id, perm, 'user']]
 
    RepoGroupModel()._update_permissions(repo_group,
 
                                         perms_updates=perms_updates,
 
                                         recursive=recursive, check_perms=False)
 
    Session().commit()
 

	
 

	
 
def setup_module():
 
    global test_u1_id, _get_repo_perms, _get_group_perms
 
    test_u1 = _create_project_tree()
 
    Session().commit()
 
    test_u1_id = test_u1.user_id
 
    _get_repo_perms = functools.partial(_get_perms, key='repositories',
 
@@ -95,49 +96,49 @@ def test_user_permissions_on_group_with_
 

	
 
    # set permission to g0 recursive mode, all children including
 
    # other repos and groups should have this permission now set !
 
    recursive = 'all'
 
    group = 'g0'
 
    permissions_setup_func(group, 'group.write', recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.write')
 

	
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.write')
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_mode_for_default_user():
 

	
 
    # set permission to g0 recursive mode, all children including
 
    # other repos and groups should have this permission now set !
 
    recursive = 'all'
 
    group = 'g0'
 
    default_user_id = User.get_default_user().user_id
 
    default_user_id = kallithea.DEFAULT_USER_ID
 
    permissions_setup_func(group, 'group.write', recursive=recursive,
 
                           user_id=default_user_id)
 

	
 
    # change default to get perms for default user
 
    _get_repo_perms = functools.partial(_get_perms, key='repositories',
 
                                        test_u1_id=default_user_id)
 
    _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
 
                                         test_u1_id=default_user_id)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        # default user permissions do not "recurse into" private repos
 
        is_private = Repository.get_by_repo_name(name).private
 
        check_tree_perms(name, perm, group, 'repository.none' if is_private else 'repository.write')
 

	
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.write')
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_mode_inner_group():
 
    ## set permission to g0_3 group to none
 
@@ -175,49 +176,49 @@ def test_user_permissions_on_group_with_
 

	
 
def test_user_permissions_on_group_with_recursive_mode_only_with_repos():
 
    ## set permission to g0_3 group to none
 
    recursive = 'all'
 
    group = 'g0/g0_2'
 
    permissions_setup_func(group, 'group.admin', recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.admin')
 

	
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.admin')
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_repo_mode_for_default_user():
 
    # set permission to g0/g0_1 recursive repos only mode, all children including
 
    # other repos should have this permission now set, inner groups are excluded!
 
    recursive = 'repos'
 
    group = 'g0/g0_1'
 
    perm = 'group.none'
 
    default_user_id = User.get_default_user().user_id
 
    default_user_id = kallithea.DEFAULT_USER_ID
 

	
 
    permissions_setup_func(group, perm, recursive=recursive,
 
                           user_id=default_user_id)
 

	
 
    # change default to get perms for default user
 
    _get_repo_perms = functools.partial(_get_perms, key='repositories',
 
                                        test_u1_id=default_user_id)
 
    _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
 
                                         test_u1_id=default_user_id)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.none')
 

	
 
    for name, perm in items:
 
        # permission is set with repos only mode, but we also change the permission
 
        # on the group we trigger the apply to children from, thus we need
 
        # to change its permission check
 
        old_perm = 'group.read'
 
        if name == group:
 
            old_perm = perm
 
@@ -233,49 +234,49 @@ def test_user_permissions_on_group_with_
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.none')
 

	
 
    for name, perm in items:
 
        # permission is set with repos only mode, but we also change the permission
 
        # on the group we trigger the apply to children from, thus we need
 
        # to change its permission check
 
        old_perm = 'group.read'
 
        if name == group:
 
            old_perm = perm
 
        check_tree_perms(name, perm, group, old_perm)
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_group_mode_for_default_user():
 
    # set permission to g0/g0_1 with recursive groups only mode, all children including
 
    # other groups should have this permission now set. repositories should
 
    # remain intact as we use groups only mode !
 
    recursive = 'groups'
 
    group = 'g0/g0_1'
 
    default_user_id = User.get_default_user().user_id
 
    default_user_id = kallithea.DEFAULT_USER_ID
 
    permissions_setup_func(group, 'group.write', recursive=recursive,
 
                           user_id=default_user_id)
 

	
 
    # change default to get perms for default user
 
    _get_repo_perms = functools.partial(_get_perms, key='repositories',
 
                                        test_u1_id=default_user_id)
 
    _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
 
                                         test_u1_id=default_user_id)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.read')
 

	
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.write')
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_group_mode_inner_group():
 
    ## set permission to g0_3 group to none, with recursive mode for groups only
 
    recursive = 'groups'
 
    group = 'g0/g0_3'
0 comments (0 inline, 0 general)