Changeset - 401fe08bc6b8
[Not reviewed]
default
0 5 0
Thomas De Schampheleire - 7 years ago 2018-06-09 21:24:45
thomas.de_schampheleire@nokia.com
utils: move repo_name_slug to utils2 to prevent import cycle on setup_db

After commit 57a733313e4f, 'gearbox setup-db -c my.ini' fails with an import
cycle as follows:

Traceback (most recent call last):
File "/home/tdescham/repo/contrib/kallithea/venv/kallithea-release/bin/gearbox", line 11, in <module>
sys.exit(main())
File "/home/tdescham/repo/contrib/kallithea/venv/kallithea-release/lib/python2.7/site-packages/gearbox/main.py", line 199, in main
return gearbox.run(args)
File "/home/tdescham/repo/contrib/kallithea/venv/kallithea-release/lib/python2.7/site-packages/gearbox/main.py", line 145, in run
return self._run_subcommand(remainder)
File "/home/tdescham/repo/contrib/kallithea/venv/kallithea-release/lib/python2.7/site-packages/gearbox/main.py", line 149, in _run_subcommand
subcommand = self.command_manager.find_command(argv)
File "/home/tdescham/repo/contrib/kallithea/venv/kallithea-release/lib/python2.7/site-packages/gearbox/commandmanager.py", line 78, in find_command
cmd_factory = cmd_ep.resolve()
File "/home/tdescham/repo/contrib/kallithea/venv/kallithea-release/lib/python2.7/site-packages/pkg_resources/__init__.py", line 2324, in resolve
module = __import__(self.module_name, fromlist=['__name__'], level=0)
File "/home/tdescham/repo/contrib/kallithea/kallithea-release/kallithea/lib/paster_commands/setup_db.py", line 27, in <module>
from kallithea.lib.db_manage import DbManage
File "/home/tdescham/repo/contrib/kallithea/kallithea-release/kallithea/lib/db_manage.py", line 47, in <module>
from kallithea.model.repo_group import RepoGroupModel
File "/home/tdescham/repo/contrib/kallithea/kallithea-release/kallithea/model/repo_group.py", line 35, in <module>
import kallithea.lib.utils
File "/home/tdescham/repo/contrib/kallithea/kallithea-release/kallithea/lib/utils.py", line 48, in <module>
from kallithea.model.repo_group import RepoGroupModel
ImportError: cannot import name RepoGroupModel


i.e. kallithea.model.repo_group wants to import kallithea.lib.utils which
in turn wants to import kallithea.model.repo_group.

In fact there exists kallithea.lib.utils and kallithea.lib.utils2.
The current split is that 'utils2' contains 'simple' utilities, none of which
depend on kallithea models, controllers, ... In contrast, 'utils' does rely
on such kallithea classes.

As kallithea.model.repo_group was only include kallithea.lib.utils for its
repo_name_slug method, which has no dependency on other kallithea classes,
move that method (and its dependent recursive_replace) to
kallithea.lib.utils2 instead. This fixes the import cycle.
5 files changed with 48 insertions and 46 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/utils.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.utils
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Utilities library for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 18, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import re
 
import logging
 
import datetime
 
import traceback
 
import beaker
 

	
 
from tg import request, response
 
from tg.i18n import ugettext as _
 
from webhelpers.text import collapse, remove_formatting, strip_tags
 
from beaker.cache import _cache_decorate
 

	
 
from kallithea.lib.vcs.utils.hgcompat import ui, config
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.exceptions import VCSError
 

	
 
from kallithea.lib.exceptions import HgsubversionImportError
 
from kallithea.model import meta
 
from kallithea.model.db import Repository, User, Ui, \
 
    UserLog, RepoGroup, Setting, UserGroup
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.lib.utils2 import safe_str, safe_unicode, get_current_authuser
 
from kallithea.lib.vcs.utils.fakemod import create_module
 

	
 
log = logging.getLogger(__name__)
 

	
 
REMOVED_REPO_PAT = re.compile(r'rm__\d{8}_\d{6}_\d{6}_.*')
 

	
 

	
 
def recursive_replace(str_, replace=' '):
 
    """
 
    Recursive replace of given sign to just one instance
 

	
 
    :param str_: given string
 
    :param replace: char to find and replace multiple instances
 

	
 
    Examples::
 
    >>> recursive_replace("Mighty---Mighty-Bo--sstones",'-')
 
    'Mighty-Mighty-Bo-sstones'
 
    """
 

	
 
    if str_.find(replace * 2) == -1:
 
        return str_
 
    else:
 
        str_ = str_.replace(replace * 2, replace)
 
        return recursive_replace(str_, replace)
 

	
 

	
 
def repo_name_slug(value):
 
    """
 
    Return slug of name of repository
 
    This function is called on each creation/modification
 
    of repository to prevent bad names in repo
 
    """
 

	
 
    slug = remove_formatting(value)
 
    slug = strip_tags(slug)
 

	
 
    for c in """`?=[]\;'"<>,/~!@#$%^&*()+{}|: """:
 
        slug = slug.replace(c, '-')
 
    slug = recursive_replace(slug, '-')
 
    slug = collapse(slug, '-')
 
    return slug
 

	
 

	
 
#==============================================================================
 
# PERM DECORATOR HELPERS FOR EXTRACTING NAMES FOR PERM CHECKS
 
#==============================================================================
 
def get_repo_slug(request):
 
    _repo = request.environ['pylons.routes_dict'].get('repo_name')
 
    if _repo:
 
        _repo = _repo.rstrip('/')
 
    return _repo
 

	
 

	
 
def get_repo_group_slug(request):
 
    _group = request.environ['pylons.routes_dict'].get('group_name')
 
    if _group:
 
        _group = _group.rstrip('/')
 
    return _group
 

	
 

	
 
def get_user_group_slug(request):
 
    _group = request.environ['pylons.routes_dict'].get('id')
 
    _group = UserGroup.get(_group)
 
    if _group:
 
        return _group.users_group_name
 
    return None
 

	
 

	
 
def _extract_id_from_repo_name(repo_name):
 
    if repo_name.startswith('/'):
 
        repo_name = repo_name.lstrip('/')
 
    by_id_match = re.match(r'^_(\d{1,})', repo_name)
 
    if by_id_match:
 
        return by_id_match.groups()[0]
 

	
 

	
 
def get_repo_by_id(repo_name):
 
    """
 
    Extracts repo_name by id from special urls. Example url is _11/repo_name
 

	
 
    :param repo_name:
 
    :return: repo_name if matched else None
 
    """
 
    _repo_id = _extract_id_from_repo_name(repo_name)
 
    if _repo_id:
 
        from kallithea.model.db import Repository
 
        repo = Repository.get(_repo_id)
 
        if repo:
 
            # TODO: return repo instead of reponame? or would that be a layering violation?
 
            return repo.repo_name
 
    return None
kallithea/lib/utils2.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.utils2
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
Some simple helper functions
 
Some simple helper functions.
 
Note: all these functions should be independent of Kallithea classes, i.e.
 
models, controllers, etc.  to prevent import cycles.
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jan 5, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import os
 
import re
 
import sys
 
import time
 
import uuid
 
import datetime
 
import urllib
 
import binascii
 

	
 
import webob
 
import urlobject
 
from webhelpers.text import collapse, remove_formatting, strip_tags
 

	
 
from tg.i18n import ugettext as _, ungettext
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.compat import json
 

	
 

	
 
def str2bool(_str):
 
    """
 
    returns True/False value from given string, it tries to translate the
 
    string into boolean
 

	
 
    :param _str: string value to translate into boolean
 
    :rtype: boolean
 
    :returns: boolean from given string
 
    """
 
    if _str is None:
 
        return False
 
    if _str in (True, False):
 
        return _str
 
    _str = str(_str).strip().lower()
 
    return _str in ('t', 'true', 'y', 'yes', 'on', '1')
 

	
 

	
 
def aslist(obj, sep=None, strip=True):
 
    """
 
    Returns given string separated by sep as list
 

	
 
    :param obj:
 
    :param sep:
 
    :param strip:
 
    """
 
    if isinstance(obj, (basestring)):
 
        lst = obj.split(sep)
 
        if strip:
 
            lst = [v.strip() for v in lst]
 
        return lst
 
    elif isinstance(obj, (list, tuple)):
 
        return obj
 
    elif obj is None:
 
        return []
 
    else:
 
        return [obj]
 

	
 

	
 
def convert_line_endings(line, mode):
 
    """
 
    Converts a given line  "line end" according to given mode
 

	
 
@@ -601,48 +604,84 @@ OAttr = OptionalAttr
 

	
 
class Optional(object):
 
    """
 
    Defines an optional parameter::
 

	
 
        param = param.getval() if isinstance(param, Optional) else param
 
        param = param() if isinstance(param, Optional) else param
 

	
 
    is equivalent of::
 

	
 
        param = Optional.extract(param)
 

	
 
    """
 

	
 
    def __init__(self, type_):
 
        self.type_ = type_
 

	
 
    def __repr__(self):
 
        return '<Optional:%s>' % self.type_.__repr__()
 

	
 
    def __call__(self):
 
        return self.getval()
 

	
 
    def getval(self):
 
        """
 
        returns value from this Optional instance
 
        """
 
        if isinstance(self.type_, OAttr):
 
            # use params name
 
            return self.type_.attr_name
 
        return self.type_
 

	
 
    @classmethod
 
    def extract(cls, val):
 
        """
 
        Extracts value from Optional() instance
 

	
 
        :param val:
 
        :return: original value if it's not Optional instance else
 
            value of instance
 
        """
 
        if isinstance(val, cls):
 
            return val.getval()
 
        return val
 

	
 

	
 
def urlreadable(s, _cleanstringsub=re.compile('[^-a-zA-Z0-9./]+').sub):
 
    return _cleanstringsub('_', safe_str(s)).rstrip('_')
 

	
 

	
 
def recursive_replace(str_, replace=' '):
 
    """
 
    Recursive replace of given sign to just one instance
 

	
 
    :param str_: given string
 
    :param replace: char to find and replace multiple instances
 

	
 
    Examples::
 
    >>> recursive_replace("Mighty---Mighty-Bo--sstones",'-')
 
    'Mighty-Mighty-Bo-sstones'
 
    """
 

	
 
    if str_.find(replace * 2) == -1:
 
        return str_
 
    else:
 
        str_ = str_.replace(replace * 2, replace)
 
        return recursive_replace(str_, replace)
 

	
 

	
 
def repo_name_slug(value):
 
    """
 
    Return slug of name of repository
 
    This function is called on each creation/modification
 
    of repository to prevent bad names in repo
 
    """
 

	
 
    slug = remove_formatting(value)
 
    slug = strip_tags(slug)
 

	
 
    for c in """`?=[]\;'"<>,/~!@#$%^&*()+{}|: """:
 
        slug = slug.replace(c, '-')
 
    slug = recursive_replace(slug, '-')
 
    slug = collapse(slug, '-')
 
    return slug
kallithea/model/repo.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.repo
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
Repository model for kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 5, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
import os
 
import shutil
 
import logging
 
import traceback
 
from datetime import datetime
 
from sqlalchemy.orm import subqueryload
 

	
 
import kallithea.lib.utils
 
import kallithea.lib.utils2
 
from kallithea.lib.utils import make_ui, is_valid_repo_uri
 
from kallithea.lib.vcs.backends import get_backend
 
from kallithea.lib.utils2 import LazyProperty, safe_str, safe_unicode, \
 
    remove_prefix, obfuscate_url_pw, get_current_authuser
 
from kallithea.lib.caching_query import FromCache
 
from kallithea.lib.hooks import log_delete_repository
 

	
 
from kallithea.model.db import Repository, UserRepoToPerm, UserGroupRepoToPerm, \
 
    UserRepoGroupToPerm, UserGroupRepoGroupToPerm, User, Permission, Session, \
 
    Statistics, UserGroup, Ui, RepoGroup, RepositoryField
 

	
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasRepoPermissionLevel, HasUserGroupPermissionLevel
 
from kallithea.lib.exceptions import AttachedForksError
 
from kallithea.model.scm import UserGroupList
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class RepoModel(object):
 

	
 
    URL_SEPARATOR = Repository.url_sep()
 

	
 
    def _create_default_perms(self, repository, private):
 
        # create default permission
 
        default = 'repository.read'
 
        def_user = User.get_default_user()
 
        for p in def_user.user_perms:
 
            if p.permission.permission_name.startswith('repository.'):
 
                default = p.permission.permission_name
 
                break
 

	
 
        default_perm = 'repository.none' if private else default
 

	
 
        repo_to_perm = UserRepoToPerm()
 
        repo_to_perm.permission = Permission.get_by_key(default_perm)
 

	
 
        repo_to_perm.repository = repository
 
        repo_to_perm.user_id = def_user.user_id
 
        Session().add(repo_to_perm)
 

	
 
        return repo_to_perm
 

	
 
    @LazyProperty
 
    def repos_path(self):
 
        """
 
        Gets the repositories root path from database
 
        """
 
@@ -271,148 +271,148 @@ class RepoModel(object):
 
        else:
 
            replacement_user = User.query().filter(User.admin ==
 
                                                   True).first().username
 
            defaults.update({'owner': replacement_user})
 

	
 
        # fill repository users
 
        for p in repo_info.repo_to_perm:
 
            defaults.update({'u_perm_%s' % p.user.username:
 
                                 p.permission.permission_name})
 

	
 
        # fill repository groups
 
        for p in repo_info.users_group_to_perm:
 
            defaults.update({'g_perm_%s' % p.users_group.users_group_name:
 
                                 p.permission.permission_name})
 

	
 
        return defaults
 

	
 
    def update(self, repo, **kwargs):
 
        try:
 
            cur_repo = Repository.guess_instance(repo)
 
            org_repo_name = cur_repo.repo_name
 
            if 'owner' in kwargs:
 
                cur_repo.owner = User.get_by_username(kwargs['owner'])
 

	
 
            if 'repo_group' in kwargs:
 
                assert kwargs['repo_group'] != u'-1', kwargs # RepoForm should have converted to None
 
                cur_repo.group = RepoGroup.get(kwargs['repo_group'])
 
                cur_repo.repo_name = cur_repo.get_new_name(cur_repo.just_name)
 
            log.debug('Updating repo %s with params:%s', cur_repo, kwargs)
 
            for k in ['repo_enable_downloads',
 
                      'repo_description',
 
                      'repo_enable_locking',
 
                      'repo_landing_rev',
 
                      'repo_private',
 
                      'repo_enable_statistics',
 
                      ]:
 
                if k in kwargs:
 
                    setattr(cur_repo, remove_prefix(k, 'repo_'), kwargs[k])
 
            clone_uri = kwargs.get('clone_uri')
 
            if clone_uri is not None and clone_uri != cur_repo.clone_uri_hidden:
 
                # clone_uri is modified - if given a value, check it is valid
 
                if clone_uri != '':
 
                    # will raise exception on error
 
                    is_valid_repo_uri(cur_repo.repo_type, clone_uri, make_ui('db', clear_session=False))
 
                cur_repo.clone_uri = clone_uri
 

	
 
            if 'repo_name' in kwargs:
 
                repo_name = kwargs['repo_name']
 
                if kallithea.lib.utils.repo_name_slug(repo_name) != repo_name:
 
                if kallithea.lib.utils2.repo_name_slug(repo_name) != repo_name:
 
                    raise Exception('invalid repo name %s' % repo_name)
 
                cur_repo.repo_name = cur_repo.get_new_name(repo_name)
 

	
 
            # if private flag is set, reset default permission to NONE
 
            if kwargs.get('repo_private'):
 
                EMPTY_PERM = 'repository.none'
 
                RepoModel().grant_user_permission(
 
                    repo=cur_repo, user='default', perm=EMPTY_PERM
 
                )
 
                # handle extra fields
 
            for field in filter(lambda k: k.startswith(RepositoryField.PREFIX),
 
                                kwargs):
 
                k = RepositoryField.un_prefix_key(field)
 
                ex_field = RepositoryField.get_by_key_name(key=k, repo=cur_repo)
 
                if ex_field:
 
                    ex_field.field_value = kwargs[field]
 

	
 
            if org_repo_name != cur_repo.repo_name:
 
                # rename repository
 
                self._rename_filesystem_repo(old=org_repo_name, new=cur_repo.repo_name)
 

	
 
            return cur_repo
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def _create_repo(self, repo_name, repo_type, description, owner,
 
                     private=False, clone_uri=None, repo_group=None,
 
                     landing_rev='rev:tip', fork_of=None,
 
                     copy_fork_permissions=False, enable_statistics=False,
 
                     enable_locking=False, enable_downloads=False,
 
                     copy_group_permissions=False, state=Repository.STATE_PENDING):
 
        """
 
        Create repository inside database with PENDING state. This should only be
 
        executed by create() repo, with exception of importing existing repos.
 

	
 
        """
 
        from kallithea.model.scm import ScmModel
 

	
 
        owner = User.guess_instance(owner)
 
        fork_of = Repository.guess_instance(fork_of)
 
        repo_group = RepoGroup.guess_instance(repo_group)
 
        try:
 
            repo_name = safe_unicode(repo_name)
 
            description = safe_unicode(description)
 
            # repo name is just a name of repository
 
            # while repo_name_full is a full qualified name that is combined
 
            # with name and path of group
 
            repo_name_full = repo_name
 
            repo_name = repo_name.split(self.URL_SEPARATOR)[-1]
 
            if kallithea.lib.utils.repo_name_slug(repo_name) != repo_name:
 
            if kallithea.lib.utils2.repo_name_slug(repo_name) != repo_name:
 
                raise Exception('invalid repo name %s' % repo_name)
 

	
 
            new_repo = Repository()
 
            new_repo.repo_state = state
 
            new_repo.enable_statistics = False
 
            new_repo.repo_name = repo_name_full
 
            new_repo.repo_type = repo_type
 
            new_repo.owner = owner
 
            new_repo.group = repo_group
 
            new_repo.description = description or repo_name
 
            new_repo.private = private
 
            if clone_uri:
 
                # will raise exception on error
 
                is_valid_repo_uri(repo_type, clone_uri, make_ui('db', clear_session=False))
 
            new_repo.clone_uri = clone_uri
 
            new_repo.landing_rev = landing_rev
 

	
 
            new_repo.enable_statistics = enable_statistics
 
            new_repo.enable_locking = enable_locking
 
            new_repo.enable_downloads = enable_downloads
 

	
 
            if repo_group:
 
                new_repo.enable_locking = repo_group.enable_locking
 

	
 
            if fork_of:
 
                parent_repo = fork_of
 
                new_repo.fork = parent_repo
 

	
 
            Session().add(new_repo)
 

	
 
            if fork_of and copy_fork_permissions:
 
                repo = fork_of
 
                user_perms = UserRepoToPerm.query() \
 
                    .filter(UserRepoToPerm.repository == repo).all()
 
                group_perms = UserGroupRepoToPerm.query() \
 
                    .filter(UserGroupRepoToPerm.repository == repo).all()
 

	
 
                for perm in user_perms:
 
                    UserRepoToPerm.create(perm.user, new_repo, perm.permission)
 

	
 
                for perm in group_perms:
 
                    UserGroupRepoToPerm.create(perm.users_group, new_repo,
 
                                               perm.permission)
 

	
 
            elif repo_group and copy_group_permissions:
 

	
 
                user_perms = UserRepoGroupToPerm.query() \
 
                    .filter(UserRepoGroupToPerm.group == repo_group).all()
kallithea/model/repo_group.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.repo_group
 
~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
repo group model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jan 25, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import os
 
import logging
 
import traceback
 
import shutil
 
import datetime
 

	
 
import kallithea.lib.utils
 
import kallithea.lib.utils2
 
from kallithea.lib.utils2 import LazyProperty
 

	
 
from kallithea.model.db import RepoGroup, Session, Ui, UserRepoGroupToPerm, \
 
    User, Permission, UserGroupRepoGroupToPerm, UserGroup, Repository
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class RepoGroupModel(object):
 

	
 
    @LazyProperty
 
    def repos_path(self):
 
        """
 
        Gets the repositories root path from database
 
        """
 

	
 
        q = Ui.get_by_key('paths', '/')
 
        return q.ui_value
 

	
 
    def _create_default_perms(self, new_group):
 
        # create default permission
 
        default_perm = 'group.read'
 
        def_user = User.get_default_user()
 
        for p in def_user.user_perms:
 
            if p.permission.permission_name.startswith('group.'):
 
                default_perm = p.permission.permission_name
 
                break
 

	
 
        repo_group_to_perm = UserRepoGroupToPerm()
 
        repo_group_to_perm.permission = Permission.get_by_key(default_perm)
 

	
 
        repo_group_to_perm.group = new_group
 
        repo_group_to_perm.user_id = def_user.user_id
 
        Session().add(repo_group_to_perm)
 
        return repo_group_to_perm
 

	
 
    def _create_group(self, group_name):
 
        """
 
        makes repository group on filesystem
 

	
 
        :param repo_name:
 
        :param parent_id:
 
        """
 

	
 
        create_path = os.path.join(self.repos_path, group_name)
 
        log.debug('creating new group in %s', create_path)
 

	
 
        if os.path.isdir(create_path):
 
@@ -91,97 +91,97 @@ class RepoGroupModel(object):
 
        Renames a group on filesystem
 

	
 
        :param group_name:
 
        """
 

	
 
        if old == new:
 
            log.debug('skipping group rename')
 
            return
 

	
 
        log.debug('renaming repository group from %s to %s', old, new)
 

	
 
        old_path = os.path.join(self.repos_path, old)
 
        new_path = os.path.join(self.repos_path, new)
 

	
 
        log.debug('renaming repos paths from %s to %s', old_path, new_path)
 

	
 
        if os.path.isdir(new_path):
 
            raise Exception('Was trying to rename to already '
 
                            'existing dir %s' % new_path)
 
        shutil.move(old_path, new_path)
 

	
 
    def _delete_group(self, group, force_delete=False):
 
        """
 
        Deletes a group from a filesystem
 

	
 
        :param group: instance of group from database
 
        :param force_delete: use shutil rmtree to remove all objects
 
        """
 
        paths = group.full_path.split(RepoGroup.url_sep())
 
        paths = os.sep.join(paths)
 

	
 
        rm_path = os.path.join(self.repos_path, paths)
 
        log.info("Removing group %s", rm_path)
 
        # delete only if that path really exists
 
        if os.path.isdir(rm_path):
 
            if force_delete:
 
                shutil.rmtree(rm_path)
 
            else:
 
                # archive that group
 
                _now = datetime.datetime.now()
 
                _ms = str(_now.microsecond).rjust(6, '0')
 
                _d = 'rm__%s_GROUP_%s' % (_now.strftime('%Y%m%d_%H%M%S_' + _ms),
 
                                          group.name)
 
                shutil.move(rm_path, os.path.join(self.repos_path, _d))
 

	
 
    def create(self, group_name, group_description, owner, parent=None,
 
               just_db=False, copy_permissions=False):
 
        try:
 
            if kallithea.lib.utils.repo_name_slug(group_name) != group_name:
 
            if kallithea.lib.utils2.repo_name_slug(group_name) != group_name:
 
                raise Exception('invalid repo group name %s' % group_name)
 

	
 
            owner = User.guess_instance(owner)
 
            parent_group = RepoGroup.guess_instance(parent)
 
            new_repo_group = RepoGroup()
 
            new_repo_group.owner = owner
 
            new_repo_group.group_description = group_description or group_name
 
            new_repo_group.parent_group = parent_group
 
            new_repo_group.group_name = new_repo_group.get_new_name(group_name)
 

	
 
            Session().add(new_repo_group)
 

	
 
            # create an ADMIN permission for owner except if we're super admin,
 
            # later owner should go into the owner field of groups
 
            if not owner.is_admin:
 
                self.grant_user_permission(repo_group=new_repo_group,
 
                                           user=owner, perm='group.admin')
 

	
 
            if parent_group and copy_permissions:
 
                # copy permissions from parent
 
                user_perms = UserRepoGroupToPerm.query() \
 
                    .filter(UserRepoGroupToPerm.group == parent_group).all()
 

	
 
                group_perms = UserGroupRepoGroupToPerm.query() \
 
                    .filter(UserGroupRepoGroupToPerm.group == parent_group).all()
 

	
 
                for perm in user_perms:
 
                    # don't copy over the permission for user who is creating
 
                    # this group, if he is not super admin he get's admin
 
                    # permission set above
 
                    if perm.user != owner or owner.is_admin:
 
                        UserRepoGroupToPerm.create(perm.user, new_repo_group, perm.permission)
 

	
 
                for perm in group_perms:
 
                    UserGroupRepoGroupToPerm.create(perm.users_group, new_repo_group, perm.permission)
 
            else:
 
                self._create_default_perms(new_repo_group)
 

	
 
            if not just_db:
 
                # we need to flush here, in order to check if database won't
 
                # throw any exceptions, create filesystem dirs at the very end
 
                Session().flush()
 
                self._create_group(new_repo_group.group_name)
 

	
 
            return new_repo_group
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 
@@ -250,97 +250,97 @@ class RepoGroupModel(object):
 
                obj = repo_group
 
                # also we do a break at the end of this loop.
 

	
 
            # update permissions
 
            for member, perm, member_type in perms_updates:
 
                ## set for user
 
                if member_type == 'user':
 
                    # this updates also current one if found
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                ## set for user group
 
                else:
 
                    # check if we have permissions to alter this usergroup's access
 
                    if not check_perms or HasUserGroupPermissionLevel('read')(member):
 
                        _set_perm_group(obj, users_group=member, perm=perm)
 
            # set new permissions
 
            for member, perm, member_type in perms_new:
 
                if member_type == 'user':
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                else:
 
                    # check if we have permissions to alter this usergroup's access
 
                    if not check_perms or HasUserGroupPermissionLevel('read')(member):
 
                        _set_perm_group(obj, users_group=member, perm=perm)
 
            updates.append(obj)
 
            # if it's not recursive call for all,repos,groups
 
            # break the loop and don't proceed with other changes
 
            if recursive not in ['all', 'repos', 'groups']:
 
                break
 

	
 
        return updates
 

	
 
    def update(self, repo_group, kwargs):
 
        try:
 
            repo_group = RepoGroup.guess_instance(repo_group)
 
            old_path = repo_group.full_path
 

	
 
            # change properties
 
            if 'group_description' in kwargs:
 
                repo_group.group_description = kwargs['group_description']
 
            if 'parent_group_id' in kwargs:
 
                repo_group.parent_group_id = kwargs['parent_group_id']
 
            if 'enable_locking' in kwargs:
 
                repo_group.enable_locking = kwargs['enable_locking']
 

	
 
            if 'parent_group_id' in kwargs:
 
                assert kwargs['parent_group_id'] != u'-1', kwargs # RepoGroupForm should have converted to None
 
                repo_group.parent_group = RepoGroup.get(kwargs['parent_group_id'])
 
            if 'group_name' in kwargs:
 
                group_name = kwargs['group_name']
 
                if kallithea.lib.utils.repo_name_slug(group_name) != group_name:
 
                if kallithea.lib.utils2.repo_name_slug(group_name) != group_name:
 
                    raise Exception('invalid repo group name %s' % group_name)
 
                repo_group.group_name = repo_group.get_new_name(group_name)
 
            new_path = repo_group.full_path
 
            Session().add(repo_group)
 

	
 
            # iterate over all members of this groups and do fixes
 
            # set locking if given
 
            # if obj is a repoGroup also fix the name of the group according
 
            # to the parent
 
            # if obj is a Repo fix it's name
 
            # this can be potentially heavy operation
 
            for obj in repo_group.recursive_groups_and_repos():
 
                # set the value from it's parent
 
                obj.enable_locking = repo_group.enable_locking
 
                if isinstance(obj, RepoGroup):
 
                    new_name = obj.get_new_name(obj.name)
 
                    log.debug('Fixing group %s to new name %s' \
 
                                % (obj.group_name, new_name))
 
                    obj.group_name = new_name
 
                elif isinstance(obj, Repository):
 
                    # we need to get all repositories from this new group and
 
                    # rename them accordingly to new group path
 
                    new_name = obj.get_new_name(obj.just_name)
 
                    log.debug('Fixing repo %s to new name %s' \
 
                                % (obj.repo_name, new_name))
 
                    obj.repo_name = new_name
 

	
 
            self._rename_group(old_path, new_path)
 

	
 
            return repo_group
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def delete(self, repo_group, force_delete=False):
 
        repo_group = RepoGroup.guess_instance(repo_group)
 
        try:
 
            Session().delete(repo_group)
 
            self._delete_group(repo_group, force_delete)
 
        except Exception:
 
            log.error('Error removing repo_group %s', repo_group)
 
            raise
 

	
 
    def add_permission(self, repo_group, obj, obj_type, perm, recursive):
 
        from kallithea.model.repo import RepoModel
 
        repo_group = RepoGroup.guess_instance(repo_group)
 
        perm = Permission.guess_instance(perm)
 

	
kallithea/model/validators.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Set of generic validators
 
"""
 

	
 
import os
 
import re
 
import formencode
 
import logging
 
from collections import defaultdict
 
from tg.i18n import ugettext as _
 
from sqlalchemy import func
 
from webhelpers.pylonslib.secure_form import authentication_token
 
import sqlalchemy
 

	
 
from formencode.validators import (
 
    UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set,
 
    NotEmpty, IPAddress, CIDR, String, FancyValidator
 
)
 
from kallithea.lib.compat import OrderedSet
 
from kallithea.lib import ipaddr
 
from kallithea.lib.utils import repo_name_slug, is_valid_repo_uri
 
from kallithea.lib.utils2 import str2bool, aslist
 
from kallithea.lib.utils import is_valid_repo_uri
 
from kallithea.lib.utils2 import str2bool, aslist, repo_name_slug
 
from kallithea.model.db import RepoGroup, Repository, UserGroup, User
 
from kallithea.lib.exceptions import LdapImportError
 
from kallithea.config.routing import ADMIN_PREFIX
 
from kallithea.lib.auth import HasRepoGroupPermissionLevel, HasPermissionAny
 

	
 
# silence warnings and pylint
 
UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \
 
    NotEmpty, IPAddress, CIDR, String, FancyValidator
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def UniqueListFromString():
 
    class _UniqueListFromString(formencode.FancyValidator):
 
        """
 
        Split value on ',' and make unique while preserving order
 
        """
 
        messages = dict(
 
            empty=_('Value cannot be an empty list'),
 
            missing_value=_('Value cannot be an empty list'),
 
        )
 

	
 
        def _to_python(self, value, state):
 
            value = aslist(value, ',')
 
            seen = set()
 
            return [c for c in value if not (c in seen or seen.add(c))]
 

	
 
        def empty_value(self, value):
 
            return []
 

	
 
    return _UniqueListFromString
 

	
 

	
 
def ValidUsername(edit=False, old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'username_exists': _('Username "%(username)s" already exists'),
 
            'system_invalid_username':
 
                _('Username "%(username)s" cannot be used'),
 
            'invalid_username':
 
                _('Username may only contain alphanumeric characters '
 
                  'underscores, periods or dashes and must begin with an '
 
                  'alphanumeric character or underscore')
 
        }
 

	
 
        def validate_python(self, value, state):
0 comments (0 inline, 0 general)