Changeset - 36e22160e5e5
[Not reviewed]
default
0 12 0
Søren Løvborg - 9 years ago 2016-11-07 14:59:39
sorenl@unity3d.com
db: rename RepoGroup.group_parent_id to parent_group_id

Also for consistency with the existing parent_group relationship.
10 files changed:
0 comments (0 inline, 0 general)
kallithea/controllers/admin/repo_groups.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.admin.repo_groups
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Repository groups controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Mar 23, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 
import formencode
 
import itertools
 

	
 
from formencode import htmlfill
 

	
 
from pylons import request, tmpl_context as c
 
from pylons.i18n.translation import _, ungettext
 
from webob.exc import HTTPFound, HTTPForbidden, HTTPNotFound, HTTPInternalServerError
 

	
 
import kallithea
 
from kallithea.config.routing import url
 
from kallithea.lib import helpers as h
 
from kallithea.lib.compat import json
 
from kallithea.lib.auth import LoginRequired, \
 
    HasRepoGroupPermissionAnyDecorator, HasRepoGroupPermissionAny, \
 
    HasPermissionAny
 
from kallithea.lib.base import BaseController, render
 
from kallithea.model.db import RepoGroup, Repository
 
from kallithea.model.scm import RepoGroupList, AvailableRepoGroupChoices
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.forms import RepoGroupForm, RepoGroupPermsForm
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.lib.utils2 import safe_int
 
from sqlalchemy.sql.expression import func
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class RepoGroupsController(BaseController):
 

	
 
    @LoginRequired()
 
    def __before__(self):
 
        super(RepoGroupsController, self).__before__()
 

	
 
    def __load_defaults(self, extras=(), exclude=()):
 
        """extras is used for keeping current parent ignoring permissions
 
        exclude is used for not moving group to itself TODO: also exclude descendants
 
        Note: only admin can create top level groups
 
        """
 
        repo_groups = AvailableRepoGroupChoices([], ['group.admin'], extras)
 
        exclude_group_ids = set(rg.group_id for rg in exclude)
 
        c.repo_groups = [rg for rg in repo_groups
 
                         if rg[0] not in exclude_group_ids]
 

	
 
        repo_model = RepoModel()
 
        c.users_array = repo_model.get_users_js()
 
        c.user_groups_array = repo_model.get_user_groups_js()
 

	
 
    def __load_data(self, group_id):
 
        """
 
        Load defaults settings for edit, and update
 

	
 
        :param group_id:
 
        """
 
        repo_group = RepoGroup.get_or_404(group_id)
 
        data = repo_group.get_dict()
 
        data['group_name'] = repo_group.name
 

	
 
        # fill repository group users
 
        for p in repo_group.repo_group_to_perm:
 
            data.update({'u_perm_%s' % p.user.username:
 
                             p.permission.permission_name})
 

	
 
        # fill repository group groups
 
        for p in repo_group.users_group_to_perm:
 
            data.update({'g_perm_%s' % p.users_group.users_group_name:
 
                             p.permission.permission_name})
 

	
 
        return data
 

	
 
    def _revoke_perms_on_yourself(self, form_result):
 
        _up = filter(lambda u: c.authuser.username == u[0],
 
                     form_result['perms_updates'])
 
        _new = filter(lambda u: c.authuser.username == u[0],
 
                      form_result['perms_new'])
 
        if _new and _new[0][1] != 'group.admin' or _up and _up[0][1] != 'group.admin':
 
            return True
 
        return False
 

	
 
    def index(self, format='html'):
 
        _list = RepoGroup.query(sorted=True).all()
 
        group_iter = RepoGroupList(_list, perm_set=['group.admin'])
 
        repo_groups_data = []
 
        total_records = len(group_iter)
 
        _tmpl_lookup = kallithea.CONFIG['pylons.app_globals'].mako_lookup
 
        template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
 

	
 
        repo_group_name = lambda repo_group_name, children_groups: (
 
            template.get_def("repo_group_name")
 
            .render(repo_group_name, children_groups, _=_, h=h, c=c)
 
        )
 
        repo_group_actions = lambda repo_group_id, repo_group_name, gr_count: (
 
            template.get_def("repo_group_actions")
 
            .render(repo_group_id, repo_group_name, gr_count, _=_, h=h, c=c,
 
                    ungettext=ungettext)
 
        )
 

	
 
        for repo_gr in group_iter:
 
            children_groups = map(h.safe_unicode,
 
                itertools.chain((g.name for g in repo_gr.parents),
 
                                (x.name for x in [repo_gr])))
 
            repo_count = repo_gr.repositories.count()
 
            repo_groups_data.append({
 
                "raw_name": repo_gr.group_name,
 
                "group_name": repo_group_name(repo_gr.group_name, children_groups),
 
                "desc": h.escape(repo_gr.group_description),
 
                "repos": repo_count,
 
                "owner": h.person(repo_gr.owner),
 
                "action": repo_group_actions(repo_gr.group_id, repo_gr.group_name,
 
                                             repo_count)
 
            })
 

	
 
        c.data = json.dumps({
 
            "totalRecords": total_records,
 
            "startIndex": 0,
 
            "sort": None,
 
            "dir": "asc",
 
            "records": repo_groups_data
 
        })
 

	
 
        return render('admin/repo_groups/repo_groups.html')
 

	
 
    def create(self):
 
        self.__load_defaults()
 

	
 
        # permissions for can create group based on parent_id are checked
 
        # here in the Form
 
        repo_group_form = RepoGroupForm(repo_groups=c.repo_groups)
 
        try:
 
            form_result = repo_group_form.to_python(dict(request.POST))
 
            gr = RepoGroupModel().create(
 
                group_name=form_result['group_name'],
 
                group_description=form_result['group_description'],
 
                parent=form_result['group_parent_id'],
 
                parent=form_result['parent_group_id'],
 
                owner=self.authuser.user_id, # TODO: make editable
 
                copy_permissions=form_result['group_copy_permissions']
 
            )
 
            Session().commit()
 
            #TODO: in futureaction_logger(, '', '', '', self.sa)
 
        except formencode.Invalid as errors:
 
            return htmlfill.render(
 
                render('admin/repo_groups/repo_group_add.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8",
 
                force_defaults=False)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during creation of repository group %s') \
 
                    % request.POST.get('group_name'), category='error')
 
            parent_group_id = form_result['group_parent_id']
 
            parent_group_id = form_result['parent_group_id']
 
            #TODO: maybe we should get back to the main view, not the admin one
 
            raise HTTPFound(location=url('repos_groups', parent_group=parent_group_id))
 
        h.flash(_('Created repository group %s') % gr.group_name,
 
                category='success')
 
        raise HTTPFound(location=url('repos_group_home', group_name=gr.group_name))
 

	
 
    def new(self):
 
        if HasPermissionAny('hg.admin')('group create'):
 
            #we're global admin, we're ok and we can create TOP level groups
 
            pass
 
        else:
 
            # we pass in parent group into creation form, thus we know
 
            # what would be the group, we can check perms here !
 
            group_id = safe_int(request.GET.get('parent_group'))
 
            group = RepoGroup.get(group_id) if group_id else None
 
            group_name = group.group_name if group else None
 
            if HasRepoGroupPermissionAny('group.admin')(group_name, 'group create'):
 
                pass
 
            else:
 
                raise HTTPForbidden()
 

	
 
        self.__load_defaults()
 
        return render('admin/repo_groups/repo_group_add.html')
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    def update(self, group_name):
 
        c.repo_group = RepoGroupModel()._get_repo_group(group_name)
 
        self.__load_defaults(extras=[c.repo_group.parent_group],
 
                             exclude=[c.repo_group])
 

	
 
        # TODO: kill allow_empty_group - it is only used for redundant form validation!
 
        if HasPermissionAny('hg.admin')('group edit'):
 
            #we're global admin, we're ok and we can create TOP level groups
 
            allow_empty_group = True
 
        elif not c.repo_group.parent_group:
 
            allow_empty_group = True
 
        else:
 
            allow_empty_group = False
 
        repo_group_form = RepoGroupForm(
 
            edit=True,
 
            old_data=c.repo_group.get_dict(),
 
            repo_groups=c.repo_groups,
 
            can_create_in_root=allow_empty_group,
 
        )()
 
        try:
 
            form_result = repo_group_form.to_python(dict(request.POST))
 

	
 
            new_gr = RepoGroupModel().update(group_name, form_result)
 
            Session().commit()
 
            h.flash(_('Updated repository group %s') \
 
                    % form_result['group_name'], category='success')
 
            # we now have new name !
 
            group_name = new_gr.group_name
 
            #TODO: in future action_logger(, '', '', '', self.sa)
 
        except formencode.Invalid as errors:
 

	
 
            return htmlfill.render(
 
                render('admin/repo_groups/repo_group_edit.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8",
 
                force_defaults=False)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during update of repository group %s') \
 
                    % request.POST.get('group_name'), category='error')
 

	
 
        raise HTTPFound(location=url('edit_repo_group', group_name=group_name))
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    def delete(self, group_name):
 
        gr = c.repo_group = RepoGroupModel()._get_repo_group(group_name)
 
        repos = gr.repositories.all()
 
        if repos:
 
            h.flash(_('This group contains %s repositories and cannot be '
 
                      'deleted') % len(repos), category='warning')
 
            raise HTTPFound(location=url('repos_groups'))
 

	
 
        children = gr.children.all()
 
        if children:
 
            h.flash(_('This group contains %s subgroups and cannot be deleted'
 
                      % (len(children))), category='warning')
 
            raise HTTPFound(location=url('repos_groups'))
 

	
 
        try:
 
            RepoGroupModel().delete(group_name)
 
            Session().commit()
 
            h.flash(_('Removed repository group %s') % group_name,
 
                    category='success')
 
            #TODO: in future action_logger(, '', '', '', self.sa)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during deletion of repository group %s')
 
                    % group_name, category='error')
 

	
 
        if gr.parent_group:
 
            raise HTTPFound(location=url('repos_group_home', group_name=gr.parent_group.group_name))
 
        raise HTTPFound(location=url('repos_groups'))
 

	
 
    def show_by_name(self, group_name):
 
        """
 
        This is a proxy that does a lookup group_name -> id, and shows
 
        the group by id view instead
 
        """
 
        group_name = group_name.rstrip('/')
 
        id_ = RepoGroup.get_by_group_name(group_name)
 
        if id_:
 
            return self.show(group_name)
 
        raise HTTPNotFound
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.read', 'group.write',
 
                                         'group.admin')
 
    def show(self, group_name):
 
        c.active = 'settings'
 

	
 
        c.group = c.repo_group = RepoGroupModel()._get_repo_group(group_name)
 

	
 
        groups = RepoGroup.query(sorted=True).filter_by(parent_group=c.group).all()
 
        c.groups = self.scm_model.get_repo_groups(groups)
 

	
 
        repos_list = Repository.query(sorted=True).filter_by(group=c.group).all()
 
        repos_data = RepoModel().get_repos_as_dict(repos_list=repos_list,
 
                                                   admin=False, short_name=True)
 
        #json used to render the grid
 
        c.data = json.dumps(repos_data)
 

	
 
        return render('admin/repo_groups/repo_group_show.html')
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    def edit(self, group_name):
 
        c.active = 'settings'
 

	
 
        c.repo_group = RepoGroupModel()._get_repo_group(group_name)
 
        self.__load_defaults(extras=[c.repo_group.parent_group],
 
                             exclude=[c.repo_group])
 
        defaults = self.__load_data(c.repo_group.group_id)
 

	
 
        return htmlfill.render(
 
            render('admin/repo_groups/repo_group_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False
 
        )
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    def edit_repo_group_advanced(self, group_name):
 
        c.active = 'advanced'
 
        c.repo_group = RepoGroupModel()._get_repo_group(group_name)
 

	
 
        return render('admin/repo_groups/repo_group_edit.html')
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    def edit_repo_group_perms(self, group_name):
 
        c.active = 'perms'
 
        c.repo_group = RepoGroupModel()._get_repo_group(group_name)
 
        self.__load_defaults()
 
        defaults = self.__load_data(c.repo_group.group_id)
 

	
 
        return htmlfill.render(
 
            render('admin/repo_groups/repo_group_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False
 
        )
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    def update_perms(self, group_name):
 
        """
 
        Update permissions for given repository group
 

	
 
        :param group_name:
 
        """
 

	
 
        c.repo_group = RepoGroupModel()._get_repo_group(group_name)
 
        valid_recursive_choices = ['none', 'repos', 'groups', 'all']
 
        form_result = RepoGroupPermsForm(valid_recursive_choices)().to_python(request.POST)
 
        if not c.authuser.is_admin:
 
            if self._revoke_perms_on_yourself(form_result):
 
                msg = _('Cannot revoke permission for yourself as admin')
 
                h.flash(msg, category='warning')
 
                raise HTTPFound(location=url('edit_repo_group_perms', group_name=group_name))
 
        recursive = form_result['recursive']
 
        # iterate over all members(if in recursive mode) of this groups and
 
        # set the permissions !
 
        # this can be potentially heavy operation
 
        RepoGroupModel()._update_permissions(c.repo_group,
 
                                             form_result['perms_new'],
 
                                             form_result['perms_updates'],
 
                                             recursive)
 
        #TODO: implement this
 
        #action_logger(self.authuser, 'admin_changed_repo_permissions',
 
        #              repo_name, self.ip_addr, self.sa)
 
        Session().commit()
 
        h.flash(_('Repository group permissions updated'), category='success')
 
        raise HTTPFound(location=url('edit_repo_group_perms', group_name=group_name))
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    def delete_perms(self, group_name):
 
        try:
 
            obj_type = request.POST.get('obj_type')
 
            obj_id = None
 
            if obj_type == 'user':
 
                obj_id = safe_int(request.POST.get('user_id'))
 
            elif obj_type == 'user_group':
 
                obj_id = safe_int(request.POST.get('user_group_id'))
 

	
 
            if not c.authuser.is_admin:
 
                if obj_type == 'user' and c.authuser.user_id == obj_id:
 
                    msg = _('Cannot revoke permission for yourself as admin')
 
                    h.flash(msg, category='warning')
 
                    raise Exception('revoke admin permission on self')
 
            recursive = request.POST.get('recursive', 'none')
 
            if obj_type == 'user':
 
                RepoGroupModel().delete_permission(repo_group=group_name,
 
                                                   obj=obj_id, obj_type='user',
 
                                                   recursive=recursive)
 
            elif obj_type == 'user_group':
 
                RepoGroupModel().delete_permission(repo_group=group_name,
 
                                                   obj=obj_id,
 
                                                   obj_type='user_group',
 
                                                   recursive=recursive)
 

	
 
            Session().commit()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during revoking of permission'),
 
                    category='error')
 
            raise HTTPInternalServerError()
kallithea/model/db.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.db
 
~~~~~~~~~~~~~~~~~~
 

	
 
Database Models for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 08, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import time
 
import logging
 
import datetime
 
import traceback
 
import hashlib
 
import collections
 
import functools
 

	
 
import sqlalchemy
 
from sqlalchemy import *
 
from sqlalchemy.ext.hybrid import hybrid_property
 
from sqlalchemy.orm import relationship, joinedload, class_mapper, validates
 
from beaker.cache import cache_region, region_invalidate
 
from webob.exc import HTTPNotFound
 

	
 
from pylons.i18n.translation import lazy_ugettext as _
 

	
 
from kallithea.lib.exceptions import DefaultUserException
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 

	
 
from kallithea.lib.utils2 import str2bool, safe_str, get_changeset_safe, \
 
    safe_unicode, remove_prefix, time_to_datetime, aslist, Optional, safe_int, \
 
    get_clone_url, urlreadable
 
from kallithea.lib.compat import json
 
from kallithea.lib.caching_query import FromCache
 

	
 
from kallithea.model.meta import Base, Session
 

	
 
URL_SEP = '/'
 
log = logging.getLogger(__name__)
 

	
 
#==============================================================================
 
# BASE CLASSES
 
#==============================================================================
 

	
 
_hash_key = lambda k: hashlib.md5(safe_str(k)).hexdigest()
 

	
 

	
 
class BaseDbModel(object):
 
    """
 
    Base Model for all classes
 
    """
 

	
 
    @classmethod
 
    def _get_keys(cls):
 
        """return column names for this model """
 
        return class_mapper(cls).c.keys()
 

	
 
    def get_dict(self):
 
        """
 
        return dict with keys and values corresponding
 
        to this model data """
 

	
 
        d = {}
 
        for k in self._get_keys():
 
            d[k] = getattr(self, k)
 

	
 
        # also use __json__() if present to get additional fields
 
        _json_attr = getattr(self, '__json__', None)
 
        if _json_attr:
 
            # update with attributes from __json__
 
            if callable(_json_attr):
 
                _json_attr = _json_attr()
 
            for k, val in _json_attr.iteritems():
 
                d[k] = val
 
        return d
 

	
 
    def get_appstruct(self):
 
        """return list with keys and values tuples corresponding
 
        to this model data """
 

	
 
        l = []
 
        for k in self._get_keys():
 
            l.append((k, getattr(self, k),))
 
        return l
 

	
 
    def populate_obj(self, populate_dict):
 
        """populate model with data from given populate_dict"""
 

	
 
        for k in self._get_keys():
 
            if k in populate_dict:
 
                setattr(self, k, populate_dict[k])
 

	
 
    @classmethod
 
    def query(cls):
 
        return Session().query(cls)
 

	
 
    @classmethod
 
    def get(cls, id_):
 
        if id_:
 
            return cls.query().get(id_)
 

	
 
    @classmethod
 
    def guess_instance(cls, value, callback=None):
 
        """Haphazardly attempt to convert `value` to a `cls` instance.
 

	
 
        If `value` is None or already a `cls` instance, return it. If `value`
 
        is a number (or looks like one if you squint just right), assume it's
 
        a database primary key and let SQLAlchemy sort things out. Otherwise,
 
        fall back to resolving it using `callback` (if specified); this could
 
        e.g. be a function that looks up instances by name (though that won't
 
        work if the name begins with a digit). Otherwise, raise Exception.
 
        """
 

	
 
        if value is None:
 
            return None
 
        if isinstance(value, cls):
 
            return value
 
        if isinstance(value, (int, long)) or safe_str(value).isdigit():
 
            return cls.get(value)
 
        if callback is not None:
 
            return callback(value)
 

	
 
        raise Exception(
 
            'given object must be int, long or Instance of %s '
 
            'got %s, no callback provided' % (cls, type(value))
 
        )
 

	
 
    @classmethod
 
    def get_or_404(cls, id_):
 
        try:
 
            id_ = int(id_)
 
        except (TypeError, ValueError):
 
            raise HTTPNotFound
 

	
 
        res = cls.query().get(id_)
 
        if res is None:
 
            raise HTTPNotFound
 
        return res
 

	
 
    @classmethod
 
    def delete(cls, id_):
 
        obj = cls.query().get(id_)
 
        Session().delete(obj)
 

	
 
    def __repr__(self):
 
        if hasattr(self, '__unicode__'):
 
            # python repr needs to return str
 
            try:
 
                return safe_str(self.__unicode__())
 
            except UnicodeDecodeError:
 
                pass
 
        return '<DB:%s>' % (self.__class__.__name__)
 

	
 

	
 
_table_args_default_dict = {'extend_existing': True,
 
                            'mysql_engine': 'InnoDB',
 
                            'mysql_charset': 'utf8',
 
                            'sqlite_autoincrement': True,
 
                           }
 

	
 
class Setting(Base, BaseDbModel):
 
    __tablename__ = 'settings'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    SETTINGS_TYPES = {
 
        'str': safe_str,
 
        'int': safe_int,
 
        'unicode': safe_unicode,
 
        'bool': str2bool,
 
        'list': functools.partial(aslist, sep=',')
 
    }
 
    DEFAULT_UPDATE_URL = ''
 

	
 
    app_settings_id = Column(Integer(), primary_key=True)
 
    app_settings_name = Column(String(255), nullable=False, unique=True)
 
    _app_settings_value = Column("app_settings_value", Unicode(4096), nullable=False)
 
    _app_settings_type = Column("app_settings_type", String(255), nullable=True) # FIXME: not nullable?
 

	
 
    def __init__(self, key='', val='', type='unicode'):
 
        self.app_settings_name = key
 
        self.app_settings_value = val
 
        self.app_settings_type = type
 

	
 
    @validates('_app_settings_value')
 
    def validate_settings_value(self, key, val):
 
        assert type(val) == unicode
 
        return val
 

	
 
    @hybrid_property
 
    def app_settings_value(self):
 
        v = self._app_settings_value
 
        _type = self.app_settings_type
 
        converter = self.SETTINGS_TYPES.get(_type) or self.SETTINGS_TYPES['unicode']
 
        return converter(v)
 

	
 
    @app_settings_value.setter
 
    def app_settings_value(self, val):
 
        """
 
        Setter that will always make sure we use unicode in app_settings_value
 

	
 
        :param val:
 
        """
 
        self._app_settings_value = safe_unicode(val)
 

	
 
    @hybrid_property
 
    def app_settings_type(self):
 
        return self._app_settings_type
 

	
 
    @app_settings_type.setter
 
    def app_settings_type(self, val):
 
        if val not in self.SETTINGS_TYPES:
 
            raise Exception('type must be one of %s got %s'
 
                            % (self.SETTINGS_TYPES.keys(), val))
 
        self._app_settings_type = val
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s[%s]')>" % (
 
            self.__class__.__name__,
 
            self.app_settings_name, self.app_settings_value, self.app_settings_type
 
        )
 

	
 
    @classmethod
 
    def get_by_name(cls, key):
 
        return cls.query() \
 
            .filter(cls.app_settings_name == key).scalar()
 

	
 
    @classmethod
 
    def get_by_name_or_create(cls, key, val='', type='unicode'):
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            res = cls(key, val, type)
 
        return res
 

	
 
    @classmethod
 
    def create_or_update(cls, key, val=Optional(''), type=Optional('unicode')):
 
        """
 
        Creates or updates Kallithea setting. If updates are triggered, it will only
 
        update parameters that are explicitly set. Optional instance will be skipped.
 

	
 
        :param key:
 
        :param val:
 
        :param type:
 
        :return:
 
        """
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            val = Optional.extract(val)
 
            type = Optional.extract(type)
 
            res = cls(key, val, type)
 
            Session().add(res)
 
        else:
 
            res.app_settings_name = key
 
            if not isinstance(val, Optional):
 
                # update if set
 
                res.app_settings_value = val
 
            if not isinstance(type, Optional):
 
                # update if set
 
                res.app_settings_type = type
 
        return res
 

	
 
    @classmethod
 
    def get_app_settings(cls, cache=False):
 

	
 
        ret = cls.query()
 

	
 
        if cache:
 
            ret = ret.options(FromCache("sql_cache_short", "get_hg_settings"))
 

	
 
        if ret is None:
 
            raise Exception('Could not get application settings !')
 
        settings = {}
 
        for each in ret:
 
            settings[each.app_settings_name] = \
 
                each.app_settings_value
 

	
 
        return settings
 

	
 
    @classmethod
 
    def get_auth_plugins(cls, cache=False):
 
        auth_plugins = cls.get_by_name("auth_plugins").app_settings_value
 
        return auth_plugins
 

	
 
    @classmethod
 
    def get_auth_settings(cls, cache=False):
 
        ret = cls.query() \
 
                .filter(cls.app_settings_name.startswith('auth_')).all()
 
        fd = {}
 
        for row in ret:
 
            fd[row.app_settings_name] = row.app_settings_value
 
        return fd
 

	
 
    @classmethod
 
    def get_default_repo_settings(cls, cache=False, strip_prefix=False):
 
        ret = cls.query() \
 
                .filter(cls.app_settings_name.startswith('default_')).all()
 
        fd = {}
 
        for row in ret:
 
            key = row.app_settings_name
 
            if strip_prefix:
 
                key = remove_prefix(key, prefix='default_')
 
            fd.update({key: row.app_settings_value})
 

	
 
        return fd
 

	
 
    @classmethod
 
    def get_server_info(cls):
 
        import pkg_resources
 
        import platform
 
        import kallithea
 
        from kallithea.lib.utils import check_git_version
 
        mods = [(p.project_name, p.version) for p in pkg_resources.working_set]
 
        info = {
 
            'modules': sorted(mods, key=lambda k: k[0].lower()),
 
            'py_version': platform.python_version(),
 
            'platform': safe_unicode(platform.platform()),
 
            'kallithea_version': kallithea.__version__,
 
            'git_version': safe_unicode(check_git_version()),
 
            'git_path': kallithea.CONFIG.get('git_path')
 
        }
 
        return info
 

	
 

	
 
class Ui(Base, BaseDbModel):
 
    __tablename__ = 'ui'
 
    __table_args__ = (
 
        # FIXME: ui_key as key is wrong and should be removed when the corresponding
 
        # Ui.get_by_key has been replaced by the composite key
 
        UniqueConstraint('ui_key'),
 
        UniqueConstraint('ui_section', 'ui_key'),
 
        _table_args_default_dict,
 
    )
 

	
 
    HOOK_UPDATE = 'changegroup.update'
 
    HOOK_REPO_SIZE = 'changegroup.repo_size'
 
    HOOK_PUSH = 'changegroup.push_logger'
 
    HOOK_PRE_PUSH = 'prechangegroup.pre_push'
 
    HOOK_PULL = 'outgoing.pull_logger'
 
    HOOK_PRE_PULL = 'preoutgoing.pre_pull'
 

	
 
    ui_id = Column(Integer(), primary_key=True)
 
    ui_section = Column(String(255), nullable=False)
 
    ui_key = Column(String(255), nullable=False)
 
    ui_value = Column(String(255), nullable=True) # FIXME: not nullable?
 
    ui_active = Column(Boolean(), nullable=False, default=True)
 

	
 
    @classmethod
 
    def get_by_key(cls, section, key):
 
        """ Return specified Ui object, or None if not found. """
 
        return cls.query().filter_by(ui_section=section, ui_key=key).scalar()
 

	
 
    @classmethod
 
    def get_or_create(cls, section, key):
 
        """ Return specified Ui object, creating it if necessary. """
 
        setting = cls.get_by_key(section, key)
 
        if setting is None:
 
            setting = cls(ui_section=section, ui_key=key)
 
            Session().add(setting)
 
        return setting
 

	
 
    @classmethod
 
    def get_builtin_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE,
 
                                     cls.HOOK_PUSH, cls.HOOK_PRE_PUSH,
 
                                     cls.HOOK_PULL, cls.HOOK_PRE_PULL]))
 
        q = q.filter(cls.ui_section == 'hooks')
 
        return q.all()
 

	
 
    @classmethod
 
    def get_custom_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE,
 
                                      cls.HOOK_PUSH, cls.HOOK_PRE_PUSH,
 
                                      cls.HOOK_PULL, cls.HOOK_PRE_PULL]))
 
        q = q.filter(cls.ui_section == 'hooks')
 
        return q.all()
 

	
 
    @classmethod
 
    def get_repos_location(cls):
 
        return cls.get_by_key('paths', '/').ui_value
 

	
 
    @classmethod
 
    def create_or_update_hook(cls, key, val):
 
        new_ui = cls.get_or_create('hooks', key)
 
        new_ui.ui_active = True
 
        new_ui.ui_value = val
 

	
 
    def __repr__(self):
 
        return '<%s[%s]%s=>%s]>' % (self.__class__.__name__, self.ui_section,
 
                                    self.ui_key, self.ui_value)
 

	
 

	
 
class User(Base, BaseDbModel):
 
    __tablename__ = 'users'
 
    __table_args__ = (
 
        Index('u_username_idx', 'username'),
 
        Index('u_email_idx', 'email'),
 
        _table_args_default_dict,
 
    )
 

	
 
    DEFAULT_USER = 'default'
 
    DEFAULT_GRAVATAR_URL = 'https://secure.gravatar.com/avatar/{md5email}?d=identicon&s={size}'
 
    # The name of the default auth type in extern_type, 'internal' lives in auth_internal.py
 
    DEFAULT_AUTH_TYPE = 'internal'
 

	
 
    user_id = Column(Integer(), primary_key=True)
 
    username = Column(String(255), nullable=False, unique=True)
 
    password = Column(String(255), nullable=False)
 
    active = Column(Boolean(), nullable=False, default=True)
 
    admin = Column(Boolean(), nullable=False, default=False)
 
    name = Column("firstname", Unicode(255), nullable=False)
 
    lastname = Column(Unicode(255), nullable=False)
 
    _email = Column("email", String(255), nullable=True, unique=True) # FIXME: not nullable?
 
    last_login = Column(DateTime(timezone=False), nullable=True)
 
    extern_type = Column(String(255), nullable=True) # FIXME: not nullable?
 
    extern_name = Column(String(255), nullable=True) # FIXME: not nullable?
 
    api_key = Column(String(255), nullable=False)
 
    inherit_default_permissions = Column(Boolean(), nullable=False, default=True)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _user_data = Column("user_data", LargeBinary(), nullable=True)  # JSON data # FIXME: not nullable?
 

	
 
    user_log = relationship('UserLog')
 
    user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
 

	
 
    repositories = relationship('Repository')
 
    repo_groups = relationship('RepoGroup')
 
    user_groups = relationship('UserGroup')
 
    user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
 
    followings = relationship('UserFollowing', primaryjoin='UserFollowing.user_id==User.user_id', cascade='all')
 

	
 
    repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
 
    repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
 

	
 
    group_member = relationship('UserGroupMember', cascade='all')
 

	
 
    notifications = relationship('UserNotification', cascade='all')
 
    # notifications assigned to this user
 
    user_created_notifications = relationship('Notification', cascade='all')
 
    # comments created by this user
 
    user_comments = relationship('ChangesetComment', cascade='all')
 
    #extra emails for this user
 
    user_emails = relationship('UserEmailMap', cascade='all')
 
    #extra API keys
 
    user_api_keys = relationship('UserApiKeys', cascade='all')
 

	
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 
    @property
 
    def firstname(self):
 
        # alias for future
 
        return self.name
 

	
 
    @property
 
    def emails(self):
 
        other = UserEmailMap.query().filter(UserEmailMap.user==self).all()
 
        return [self.email] + [x.email for x in other]
 

	
 
    @property
 
    def api_keys(self):
 
        other = UserApiKeys.query().filter(UserApiKeys.user==self).all()
 
        return [self.api_key] + [x.api_key for x in other]
 

	
 
    @property
 
    def ip_addresses(self):
 
        ret = UserIpMap.query().filter(UserIpMap.user == self).all()
 
        return [x.ip_addr for x in ret]
 

	
 
    @property
 
    def full_name(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def full_name_or_username(self):
 
        """
 
        Show full name.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s' % (self.firstname, self.lastname)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_name_and_username(self):
 
        """
 
        Show full name and username as 'Firstname Lastname (username)'.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s (%s)' % (self.firstname, self.lastname, self.username)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_contact(self):
 
        return '%s %s <%s>' % (self.firstname, self.lastname, self.email)
 

	
 
    @property
 
    def short_contact(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def is_admin(self):
 
        return self.admin
 

	
 
    @property
 
    def AuthUser(self):
 
        """
 
        Returns instance of AuthUser for this user
 
        """
 
        from kallithea.lib.auth import AuthUser
 
        return AuthUser(dbuser=self)
 

	
 
    @hybrid_property
 
    def user_data(self):
 
        if not self._user_data:
 
            return {}
 

	
 
        try:
 
            return json.loads(self._user_data)
 
        except TypeError:
 
            return {}
 

	
 
    @user_data.setter
 
    def user_data(self, val):
 
        try:
 
            self._user_data = json.dumps(val)
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
 
                                      self.user_id, self.username)
 

	
 
    @classmethod
 
    def get_or_404(cls, id_, allow_default=True):
 
        '''
 
        Overridden version of BaseDbModel.get_or_404, with an extra check on
 
        the default user.
 
        '''
 
        user = super(User, cls).get_or_404(id_)
 
        if allow_default == False:
 
            if user.username == User.DEFAULT_USER:
 
                raise DefaultUserException
 
        return user
 

	
 
    @classmethod
 
    def get_by_username_or_email(cls, username_or_email, case_insensitive=False, cache=False):
 
        """
 
        For anything that looks like an email address, look up by the email address (matching
 
        case insensitively).
 
        For anything else, try to look up by the user name.
 

	
 
        This assumes no normal username can have '@' symbol.
 
        """
 
        if '@' in username_or_email:
 
            return User.get_by_email(username_or_email, cache=cache)
 
        else:
 
            return User.get_by_username(username_or_email, case_insensitive=case_insensitive, cache=cache)
 

	
 
    @classmethod
 
    def get_by_username(cls, username, case_insensitive=False, cache=False):
 
        if case_insensitive:
 
            q = cls.query().filter(func.lower(cls.username) == func.lower(username))
 
        else:
 
            q = cls.query().filter(cls.username == username)
 

	
 
        if cache:
 
            q = q.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_user_%s" % _hash_key(username)
 
                          )
 
            )
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get_by_api_key(cls, api_key, cache=False, fallback=True):
 
        if len(api_key) != 40 or not api_key.isalnum():
 
            return None
 

	
 
        q = cls.query().filter(cls.api_key == api_key)
 

	
 
        if cache:
 
            q = q.options(FromCache("sql_cache_short",
 
                                    "get_api_key_%s" % api_key))
 
        res = q.scalar()
 

	
 
        if fallback and not res:
 
            #fallback to additional keys
 
            _res = UserApiKeys.query() \
 
                .filter(UserApiKeys.api_key == api_key) \
 
                .filter(or_(UserApiKeys.expires == -1,
 
                            UserApiKeys.expires >= time.time())) \
 
                .first()
 
            if _res:
 
                res = _res.user
 
        return res
 

	
 
    @classmethod
 
    def get_by_email(cls, email, cache=False):
 
        q = cls.query().filter(func.lower(cls.email) == func.lower(email))
 

	
 
        if cache:
 
            q = q.options(FromCache("sql_cache_short",
 
                                    "get_email_key_%s" % email))
 

	
 
        ret = q.scalar()
 
        if ret is None:
 
            q = UserEmailMap.query()
 
            # try fetching in alternate email map
 
            q = q.filter(func.lower(UserEmailMap.email) == func.lower(email))
 
            q = q.options(joinedload(UserEmailMap.user))
 
            if cache:
 
                q = q.options(FromCache("sql_cache_short",
 
                                        "get_email_map_key_%s" % email))
 
            ret = getattr(q.scalar(), 'user', None)
 

	
 
        return ret
 

	
 
    @classmethod
 
    def get_from_cs_author(cls, author):
 
        """
 
        Tries to get User objects out of commit author string
 

	
 
        :param author:
 
        """
 
        from kallithea.lib.helpers import email, author_name
 
        # Valid email in the attribute passed, see if they're in the system
 
        _email = email(author)
 
        if _email:
 
            user = cls.get_by_email(_email)
 
            if user is not None:
 
                return user
 
        # Maybe we can match by username?
 
        _author = author_name(author)
 
        user = cls.get_by_username(_author, case_insensitive=True)
 
        if user is not None:
 
            return user
 

	
 
    def update_lastlogin(self):
 
        """Update user lastlogin"""
 
        self.last_login = datetime.datetime.now()
 
        log.debug('updated user %s lastlogin', self.username)
 

	
 
    @classmethod
 
    def get_first_admin(cls):
 
        user = User.query().filter(User.admin == True).first()
 
        if user is None:
 
            raise Exception('Missing administrative account!')
 
        return user
 

	
 
    @classmethod
 
    def get_default_user(cls, cache=False):
 
        user = User.get_by_username(User.DEFAULT_USER, cache=cache)
 
        if user is None:
 
            raise Exception('Missing default account!')
 
        return user
 

	
 
    def get_api_data(self, details=False):
 
        """
 
        Common function for generating user related data for API
 
        """
 
        user = self
 
        data = dict(
 
            user_id=user.user_id,
 
            username=user.username,
 
            firstname=user.name,
 
            lastname=user.lastname,
 
            email=user.email,
 
            emails=user.emails,
 
            active=user.active,
 
            admin=user.admin,
 
        )
 
        if details:
 
            data.update(dict(
 
                extern_type=user.extern_type,
 
                extern_name=user.extern_name,
 
                api_key=user.api_key,
 
                api_keys=user.api_keys,
 
                last_login=user.last_login,
 
                ip_addresses=user.ip_addresses
 
                ))
 
        return data
 

	
 
    def __json__(self):
 
        data = dict(
 
            full_name=self.full_name,
 
            full_name_or_username=self.full_name_or_username,
 
            short_contact=self.short_contact,
 
            full_contact=self.full_contact
 
        )
 
        data.update(self.get_api_data())
 
        return data
 

	
 

	
 
class UserApiKeys(Base, BaseDbModel):
 
    __tablename__ = 'user_api_keys'
 
    __table_args__ = (
 
        Index('uak_api_key_idx', 'api_key'),
 
        Index('uak_api_key_expires_idx', 'api_key', 'expires'),
 
        _table_args_default_dict,
 
    )
 
    __mapper_args__ = {}
 

	
 
    user_api_key_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    api_key = Column(String(255), nullable=False, unique=True)
 
    description = Column(UnicodeText(), nullable=False)
 
    expires = Column(Float(53), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    user = relationship('User')
 

	
 
    @property
 
    def expired(self):
 
        if self.expires == -1:
 
            return False
 
        return time.time() > self.expires
 

	
 

	
 
class UserEmailMap(Base, BaseDbModel):
 
    __tablename__ = 'user_email_map'
 
    __table_args__ = (
 
        Index('uem_email_idx', 'email'),
 
        _table_args_default_dict,
 
    )
 
    __mapper_args__ = {}
 

	
 
    email_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    _email = Column("email", String(255), nullable=False, unique=True)
 
    user = relationship('User')
 

	
 
    @validates('_email')
 
    def validate_email(self, key, email):
 
        # check if this email is not main one
 
        main_email = Session().query(User).filter(User.email == email).scalar()
 
        if main_email is not None:
 
            raise AttributeError('email %s is present is user table' % email)
 
        return email
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 

	
 
class UserIpMap(Base, BaseDbModel):
 
    __tablename__ = 'user_ip_map'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'ip_addr'),
 
        _table_args_default_dict,
 
    )
 
    __mapper_args__ = {}
 

	
 
    ip_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    ip_addr = Column(String(255), nullable=False)
 
    active = Column(Boolean(), nullable=False, default=True)
 
    user = relationship('User')
 

	
 
    @classmethod
 
    def _get_ip_range(cls, ip_addr):
 
        from kallithea.lib import ipaddr
 
        net = ipaddr.IPNetwork(address=ip_addr)
 
        return [str(net.network), str(net.broadcast)]
 

	
 
    def __json__(self):
 
        return dict(
 
          ip_addr=self.ip_addr,
 
          ip_range=self._get_ip_range(self.ip_addr)
 
        )
 

	
 
    def __unicode__(self):
 
        return u"<%s('user_id:%s=>%s')>" % (self.__class__.__name__,
 
                                            self.user_id, self.ip_addr)
 

	
 
class UserLog(Base, BaseDbModel):
 
    __tablename__ = 'user_logs'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    user_log_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=True)
 
    username = Column(String(255), nullable=False)
 
    repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=True)
 
    repository_name = Column(Unicode(255), nullable=False)
 
    user_ip = Column(String(255), nullable=True)
 
    action = Column(UnicodeText(), nullable=False)
 
    action_date = Column(DateTime(timezone=False), nullable=False)
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
 
                                      self.repository_name,
 
                                      self.action)
 

	
 
    @property
 
    def action_as_day(self):
 
        return datetime.date(*self.action_date.timetuple()[:3])
 

	
 
    user = relationship('User')
 
    repository = relationship('Repository', cascade='')
 

	
 

	
 
class UserGroup(Base, BaseDbModel):
 
    __tablename__ = 'users_groups'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    users_group_id = Column(Integer(), primary_key=True)
 
    users_group_name = Column(Unicode(255), nullable=False, unique=True)
 
    user_group_description = Column(Unicode(10000), nullable=True) # FIXME: not nullable?
 
    users_group_active = Column(Boolean(), nullable=False)
 
    inherit_default_permissions = Column("users_group_inherit_default_permissions", Boolean(), nullable=False, default=True)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _group_data = Column("group_data", LargeBinary(), nullable=True)  # JSON data # FIXME: not nullable?
 

	
 
    members = relationship('UserGroupMember', cascade="all, delete-orphan")
 
    users_group_to_perm = relationship('UserGroupToPerm', cascade='all')
 
    users_group_repo_to_perm = relationship('UserGroupRepoToPerm', cascade='all')
 
    users_group_repo_group_to_perm = relationship('UserGroupRepoGroupToPerm', cascade='all')
 
    user_user_group_to_perm = relationship('UserUserGroupToPerm ', cascade='all')
 
    user_group_user_group_to_perm = relationship('UserGroupUserGroupToPerm ', primaryjoin="UserGroupUserGroupToPerm.target_user_group_id==UserGroup.users_group_id", cascade='all')
 

	
 
    owner = relationship('User')
 

	
 
    @hybrid_property
 
    def group_data(self):
 
        if not self._group_data:
 
            return {}
 

	
 
        try:
 
            return json.loads(self._group_data)
 
        except TypeError:
 
            return {}
 

	
 
    @group_data.setter
 
    def group_data(self, val):
 
        try:
 
            self._group_data = json.dumps(val)
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
 
                                      self.users_group_id,
 
                                      self.users_group_name)
 

	
 
    @classmethod
 
    def get_by_group_name(cls, group_name, cache=False,
 
                          case_insensitive=False):
 
        if case_insensitive:
 
            q = cls.query().filter(func.lower(cls.users_group_name) == func.lower(group_name))
 
        else:
 
            q = cls.query().filter(cls.users_group_name == group_name)
 
        if cache:
 
            q = q.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_group_%s" % _hash_key(group_name)
 
                          )
 
            )
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get(cls, user_group_id, cache=False):
 
        user_group = cls.query()
 
        if cache:
 
            user_group = user_group.options(FromCache("sql_cache_short",
 
                                    "get_users_group_%s" % user_group_id))
 
        return user_group.get(user_group_id)
 

	
 
    def get_api_data(self, with_members=True):
 
        user_group = self
 

	
 
        data = dict(
 
            users_group_id=user_group.users_group_id,
 
            group_name=user_group.users_group_name,
 
            group_description=user_group.user_group_description,
 
            active=user_group.users_group_active,
 
            owner=user_group.owner.username,
 
        )
 
        if with_members:
 
            members = []
 
            for user in user_group.members:
 
                user = user.user
 
                members.append(user.get_api_data())
 
            data['members'] = members
 

	
 
        return data
 

	
 

	
 
class UserGroupMember(Base, BaseDbModel):
 
    __tablename__ = 'users_groups_members'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    users_group_member_id = Column(Integer(), primary_key=True)
 
    users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    users_group = relationship('UserGroup')
 

	
 
    def __init__(self, gr_id='', u_id=''):
 
        self.users_group_id = gr_id
 
        self.user_id = u_id
 

	
 

	
 
class RepositoryField(Base, BaseDbModel):
 
    __tablename__ = 'repositories_fields'
 
    __table_args__ = (
 
        UniqueConstraint('repository_id', 'field_key'),  # no-multi field
 
        _table_args_default_dict,
 
    )
 

	
 
    PREFIX = 'ex_'  # prefix used in form to not conflict with already existing fields
 

	
 
    repo_field_id = Column(Integer(), primary_key=True)
 
    repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    field_key = Column(String(250), nullable=False)
 
    field_label = Column(String(1024), nullable=False)
 
    field_value = Column(String(10000), nullable=False)
 
    field_desc = Column(String(1024), nullable=False)
 
    field_type = Column(String(255), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    repository = relationship('Repository')
 

	
 
    @property
 
    def field_key_prefixed(self):
 
        return 'ex_%s' % self.field_key
 

	
 
    @classmethod
 
    def un_prefix_key(cls, key):
 
        if key.startswith(cls.PREFIX):
 
            return key[len(cls.PREFIX):]
 
        return key
 

	
 
    @classmethod
 
    def get_by_key_name(cls, key, repo):
 
        row = cls.query() \
 
                .filter(cls.repository == repo) \
 
                .filter(cls.field_key == key).scalar()
 
        return row
 

	
 

	
 
class Repository(Base, BaseDbModel):
 
    __tablename__ = 'repositories'
 
    __table_args__ = (
 
        Index('r_repo_name_idx', 'repo_name'),
 
        _table_args_default_dict,
 
    )
 

	
 
    DEFAULT_CLONE_URI = '{scheme}://{user}@{netloc}/{repo}'
 
    DEFAULT_CLONE_URI_ID = '{scheme}://{user}@{netloc}/_{repoid}'
 

	
 
    STATE_CREATED = u'repo_state_created'
 
    STATE_PENDING = u'repo_state_pending'
 
    STATE_ERROR = u'repo_state_error'
 

	
 
    repo_id = Column(Integer(), primary_key=True)
 
    repo_name = Column(Unicode(255), nullable=False, unique=True)
 
    repo_state = Column(String(255), nullable=False)
 

	
 
    clone_uri = Column(String(255), nullable=True) # FIXME: not nullable?
 
    repo_type = Column(String(255), nullable=False)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    private = Column(Boolean(), nullable=False)
 
    enable_statistics = Column("statistics", Boolean(), nullable=False, default=True)
 
    enable_downloads = Column("downloads", Boolean(), nullable=False, default=True)
 
    description = Column(Unicode(10000), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    updated_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _landing_revision = Column("landing_revision", String(255), nullable=False)
 
    enable_locking = Column(Boolean(), nullable=False, default=False)
 
    _locked = Column("locked", String(255), nullable=True) # FIXME: not nullable?
 
    _changeset_cache = Column("changeset_cache", LargeBinary(), nullable=True) # JSON data # FIXME: not nullable?
 

	
 
    fork_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=True)
 
    group_id = Column(Integer(), ForeignKey('groups.group_id'), nullable=True)
 

	
 
    owner = relationship('User')
 
    fork = relationship('Repository', remote_side=repo_id)
 
    group = relationship('RepoGroup')
 
    repo_to_perm = relationship('UserRepoToPerm', cascade='all', order_by='UserRepoToPerm.repo_to_perm_id')
 
    users_group_to_perm = relationship('UserGroupRepoToPerm', cascade='all')
 
    stats = relationship('Statistics', cascade='all', uselist=False)
 

	
 
    followers = relationship('UserFollowing',
 
                             primaryjoin='UserFollowing.follows_repo_id==Repository.repo_id',
 
                             cascade='all')
 
    extra_fields = relationship('RepositoryField',
 
                                cascade="all, delete-orphan")
 

	
 
    logs = relationship('UserLog')
 
    comments = relationship('ChangesetComment', cascade="all, delete-orphan")
 

	
 
    pull_requests_org = relationship('PullRequest',
 
                    primaryjoin='PullRequest.org_repo_id==Repository.repo_id',
 
                    cascade="all, delete-orphan")
 

	
 
    pull_requests_other = relationship('PullRequest',
 
                    primaryjoin='PullRequest.other_repo_id==Repository.repo_id',
 
                    cascade="all, delete-orphan")
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s')>" % (self.__class__.__name__, self.repo_id,
 
                                   safe_unicode(self.repo_name))
 

	
 
    @hybrid_property
 
    def landing_rev(self):
 
        # always should return [rev_type, rev]
 
        if self._landing_revision:
 
            _rev_info = self._landing_revision.split(':')
 
            if len(_rev_info) < 2:
 
                _rev_info.insert(0, 'rev')
 
            return [_rev_info[0], _rev_info[1]]
 
        return [None, None]
 

	
 
    @landing_rev.setter
 
    def landing_rev(self, val):
 
        if ':' not in val:
 
            raise ValueError('value must be delimited with `:` and consist '
 
                             'of <rev_type>:<rev>, got %s instead' % val)
 
        self._landing_revision = val
 

	
 
    @hybrid_property
 
    def locked(self):
 
        # always should return [user_id, timelocked]
 
        if self._locked:
 
            _lock_info = self._locked.split(':')
 
            return int(_lock_info[0]), _lock_info[1]
 
        return [None, None]
 

	
 
    @locked.setter
 
    def locked(self, val):
 
        if val and isinstance(val, (list, tuple)):
 
            self._locked = ':'.join(map(str, val))
 
        else:
 
            self._locked = None
 

	
 
    @hybrid_property
 
    def changeset_cache(self):
 
        try:
 
            cs_cache = json.loads(self._changeset_cache) # might raise on bad data
 
            cs_cache['raw_id'] # verify data, raise exception on error
 
            return cs_cache
 
        except (TypeError, KeyError, ValueError):
 
            return EmptyChangeset().__json__()
 

	
 
    @changeset_cache.setter
 
    def changeset_cache(self, val):
 
        try:
 
            self._changeset_cache = json.dumps(val)
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    @classmethod
 
    def query(cls, sorted=False):
 
        """Add Repository-specific helpers for common query constructs.
 

	
 
        sorted: if True, apply the default ordering (name, case insensitive).
 
        """
 
        q = super(Repository, cls).query()
 

	
 
        if sorted:
 
            q = q.order_by(func.lower(Repository.repo_name))
 

	
 
        return q
 

	
 
    @classmethod
 
    def url_sep(cls):
 
        return URL_SEP
 

	
 
    @classmethod
 
    def normalize_repo_name(cls, repo_name):
 
        """
 
        Normalizes os specific repo_name to the format internally stored inside
 
        database using URL_SEP
 

	
 
        :param cls:
 
        :param repo_name:
 
        """
 
        return cls.url_sep().join(repo_name.split(os.sep))
 

	
 
    @classmethod
 
    def get_by_repo_name(cls, repo_name):
 
        q = Session().query(cls).filter(cls.repo_name == repo_name)
 
        q = q.options(joinedload(Repository.fork)) \
 
                .options(joinedload(Repository.owner)) \
 
                .options(joinedload(Repository.group))
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get_by_full_path(cls, repo_full_path):
 
        base_full_path = os.path.realpath(cls.base_path())
 
        repo_full_path = os.path.realpath(repo_full_path)
 
        assert repo_full_path.startswith(base_full_path + os.path.sep)
 
        repo_name = repo_full_path[len(base_full_path) + 1:]
 
        repo_name = cls.normalize_repo_name(repo_name)
 
        return cls.get_by_repo_name(repo_name.strip(URL_SEP))
 

	
 
    @classmethod
 
    def get_repo_forks(cls, repo_id):
 
        return cls.query().filter(Repository.fork_id == repo_id)
 

	
 
    @classmethod
 
    def base_path(cls):
 
        """
 
        Returns base path where all repos are stored
 

	
 
        :param cls:
 
        """
 
        q = Session().query(Ui) \
 
            .filter(Ui.ui_key == cls.url_sep())
 
        q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
 
        return q.one().ui_value
 

	
 
    @property
 
    def forks(self):
 
        """
 
        Return forks of this repo
 
        """
 
        return Repository.get_repo_forks(self.repo_id)
 

	
 
    @property
 
    def parent(self):
 
        """
 
        Returns fork parent
 
        """
 
        return self.fork
 

	
 
    @property
 
    def just_name(self):
 
        return self.repo_name.split(Repository.url_sep())[-1]
 

	
 
    @property
 
    def groups_with_parents(self):
 
        groups = []
 
        group = self.group
 
        while group is not None:
 
            groups.append(group)
 
            group = group.parent_group
 
            assert group not in groups, group # avoid recursion on bad db content
 
        groups.reverse()
 
        return groups
 

	
 
    @LazyProperty
 
    def repo_path(self):
 
        """
 
        Returns base full path for that repository means where it actually
 
        exists on a filesystem
 
        """
 
        q = Session().query(Ui).filter(Ui.ui_key ==
 
                                              Repository.url_sep())
 
        q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
 
        return q.one().ui_value
 

	
 
    @property
 
    def repo_full_path(self):
 
        p = [self.repo_path]
 
        # we need to split the name by / since this is how we store the
 
        # names in the database, but that eventually needs to be converted
 
        # into a valid system path
 
        p += self.repo_name.split(Repository.url_sep())
 
        return os.path.join(*map(safe_unicode, p))
 

	
 
    @property
 
    def cache_keys(self):
 
        """
 
        Returns associated cache keys for that repo
 
        """
 
        return CacheInvalidation.query() \
 
            .filter(CacheInvalidation.cache_args == self.repo_name) \
 
            .order_by(CacheInvalidation.cache_key) \
 
            .all()
 

	
 
    def get_new_name(self, repo_name):
 
        """
 
        returns new full repository name based on assigned group and new new
 

	
 
        :param group_name:
 
        """
 
        path_prefix = self.group.full_path_splitted if self.group else []
 
        return Repository.url_sep().join(path_prefix + [repo_name])
 

	
 
    @property
 
    def _ui(self):
 
        """
 
        Creates an db based ui object for this repository
 
        """
 
        from kallithea.lib.utils import make_ui
 
        return make_ui('db', clear_session=False)
 

	
 
    @classmethod
 
    def is_valid(cls, repo_name):
 
        """
 
        returns True if given repo name is a valid filesystem repository
 

	
 
        :param cls:
 
        :param repo_name:
 
        """
 
        from kallithea.lib.utils import is_valid_repo
 

	
 
        return is_valid_repo(repo_name, cls.base_path())
 

	
 
    def get_api_data(self):
 
        """
 
        Common function for generating repo api data
 

	
 
        """
 
        repo = self
 
        data = dict(
 
            repo_id=repo.repo_id,
 
            repo_name=repo.repo_name,
 
            repo_type=repo.repo_type,
 
            clone_uri=repo.clone_uri,
 
            private=repo.private,
 
            created_on=repo.created_on,
 
            description=repo.description,
 
            landing_rev=repo.landing_rev,
 
            owner=repo.owner.username,
 
            fork_of=repo.fork.repo_name if repo.fork else None,
 
            enable_statistics=repo.enable_statistics,
 
            enable_locking=repo.enable_locking,
 
            enable_downloads=repo.enable_downloads,
 
            last_changeset=repo.changeset_cache,
 
            locked_by=User.get(self.locked[0]).get_api_data() \
 
                if self.locked[0] else None,
 
            locked_date=time_to_datetime(self.locked[1]) \
 
                if self.locked[1] else None
 
        )
 
        rc_config = Setting.get_app_settings()
 
        repository_fields = str2bool(rc_config.get('repository_fields'))
 
        if repository_fields:
 
            for f in self.extra_fields:
 
                data[f.field_key_prefixed] = f.field_value
 

	
 
        return data
 

	
 
    @classmethod
 
    def lock(cls, repo, user_id, lock_time=None):
 
        if lock_time is not None:
 
            lock_time = time.time()
 
        repo.locked = [user_id, lock_time]
 
        Session().commit()
 

	
 
    @classmethod
 
    def unlock(cls, repo):
 
        repo.locked = None
 
        Session().commit()
 

	
 
    @classmethod
 
    def getlock(cls, repo):
 
        return repo.locked
 

	
 
    @property
 
    def last_db_change(self):
 
        return self.updated_on
 

	
 
    @property
 
    def clone_uri_hidden(self):
 
        clone_uri = self.clone_uri
 
        if clone_uri:
 
            import urlobject
 
            url_obj = urlobject.URLObject(self.clone_uri)
 
            if url_obj.password:
 
                clone_uri = url_obj.with_password('*****')
 
        return clone_uri
 

	
 
    def clone_url(self, **override):
 
        import kallithea.lib.helpers as h
 
        qualified_home_url = h.canonical_url('home')
 

	
 
        uri_tmpl = None
 
        if 'with_id' in override:
 
            uri_tmpl = self.DEFAULT_CLONE_URI_ID
 
            del override['with_id']
 

	
 
        if 'uri_tmpl' in override:
 
            uri_tmpl = override['uri_tmpl']
 
            del override['uri_tmpl']
 

	
 
        # we didn't override our tmpl from **overrides
 
        if not uri_tmpl:
 
            uri_tmpl = self.DEFAULT_CLONE_URI
 
            try:
 
                from pylons import tmpl_context as c
 
                uri_tmpl = c.clone_uri_tmpl
 
            except AttributeError:
 
                # in any case if we call this outside of request context,
 
                # ie, not having tmpl_context set up
 
                pass
 

	
 
        return get_clone_url(uri_tmpl=uri_tmpl,
 
                             qualified_home_url=qualified_home_url,
 
                             repo_name=self.repo_name,
 
                             repo_id=self.repo_id, **override)
 

	
 
    def set_state(self, state):
 
        self.repo_state = state
 

	
 
    #==========================================================================
 
    # SCM PROPERTIES
 
    #==========================================================================
 

	
 
    def get_changeset(self, rev=None):
 
        return get_changeset_safe(self.scm_instance, rev)
 

	
 
    def get_landing_changeset(self):
 
        """
 
        Returns landing changeset, or if that doesn't exist returns the tip
 
        """
 
        _rev_type, _rev = self.landing_rev
 
        cs = self.get_changeset(_rev)
 
        if isinstance(cs, EmptyChangeset):
 
            return self.get_changeset()
 
        return cs
 

	
 
    def update_changeset_cache(self, cs_cache=None):
 
        """
 
        Update cache of last changeset for repository, keys should be::
 

	
 
            short_id
 
            raw_id
 
            revision
 
            message
 
            date
 
            author
 

	
 
        :param cs_cache:
 
        """
 
        from kallithea.lib.vcs.backends.base import BaseChangeset
 
        if cs_cache is None:
 
            cs_cache = EmptyChangeset()
 
            # use no-cache version here
 
            scm_repo = self.scm_instance_no_cache()
 
            if scm_repo:
 
                cs_cache = scm_repo.get_changeset()
 

	
 
        if isinstance(cs_cache, BaseChangeset):
 
            cs_cache = cs_cache.__json__()
 

	
 
        if (not self.changeset_cache or cs_cache['raw_id'] != self.changeset_cache['raw_id']):
 
            _default = datetime.datetime.fromtimestamp(0)
 
            last_change = cs_cache.get('date') or _default
 
            log.debug('updated repo %s with new cs cache %s',
 
                      self.repo_name, cs_cache)
 
            self.updated_on = last_change
 
            self.changeset_cache = cs_cache
 
            Session().commit()
 
        else:
 
            log.debug('changeset_cache for %s already up to date with %s',
 
                      self.repo_name, cs_cache['raw_id'])
 

	
 
    @property
 
    def tip(self):
 
        return self.get_changeset('tip')
 

	
 
    @property
 
    def author(self):
 
        return self.tip.author
 

	
 
    @property
 
    def last_change(self):
 
        return self.scm_instance.last_change
 

	
 
    def get_comments(self, revisions=None):
 
        """
 
        Returns comments for this repository grouped by revisions
 

	
 
        :param revisions: filter query by revisions only
 
        """
 
        cmts = ChangesetComment.query() \
 
            .filter(ChangesetComment.repo == self)
 
        if revisions is not None:
 
            if not revisions:
 
                return {} # don't use sql 'in' on empty set
 
            cmts = cmts.filter(ChangesetComment.revision.in_(revisions))
 
        grouped = collections.defaultdict(list)
 
        for cmt in cmts.all():
 
            grouped[cmt.revision].append(cmt)
 
        return grouped
 

	
 
    def statuses(self, revisions):
 
        """
 
        Returns statuses for this repository.
 
        PRs without any votes do _not_ show up as unreviewed.
 

	
 
        :param revisions: list of revisions to get statuses for
 
        """
 
        if not revisions:
 
            return {}
 

	
 
        statuses = ChangesetStatus.query() \
 
            .filter(ChangesetStatus.repo == self) \
 
            .filter(ChangesetStatus.version == 0) \
 
            .filter(ChangesetStatus.revision.in_(revisions))
 

	
 
        grouped = {}
 
        for stat in statuses.all():
 
            pr_id = pr_nice_id = pr_repo = None
 
            if stat.pull_request:
 
                pr_id = stat.pull_request.pull_request_id
 
                pr_nice_id = PullRequest.make_nice_id(pr_id)
 
                pr_repo = stat.pull_request.other_repo.repo_name
 
            grouped[stat.revision] = [str(stat.status), stat.status_lbl,
 
                                      pr_id, pr_repo, pr_nice_id,
 
                                      stat.author]
 
        return grouped
 

	
 
    def _repo_size(self):
 
        from kallithea.lib import helpers as h
 
        log.debug('calculating repository size...')
 
        return h.format_byte_size(self.scm_instance.size)
 

	
 
    #==========================================================================
 
    # SCM CACHE INSTANCE
 
    #==========================================================================
 

	
 
    def set_invalidate(self):
 
        """
 
        Mark caches of this repo as invalid.
 
        """
 
        CacheInvalidation.set_invalidate(self.repo_name)
 

	
 
    _scm_instance = None
 

	
 
    @property
 
    def scm_instance(self):
 
        if self._scm_instance is None:
 
            self._scm_instance = self.scm_instance_cached()
 
        return self._scm_instance
 

	
 
    def scm_instance_cached(self, valid_cache_keys=None):
 
        @cache_region('long_term', 'scm_instance_cached')
 
        def _c(repo_name): # repo_name is just for the cache key
 
            log.debug('Creating new %s scm_instance and populating cache', repo_name)
 
            return self.scm_instance_no_cache()
 
        rn = self.repo_name
 

	
 
        valid = CacheInvalidation.test_and_set_valid(rn, None, valid_cache_keys=valid_cache_keys)
 
        if not valid:
 
            log.debug('Cache for %s invalidated, getting new object', rn)
 
            region_invalidate(_c, None, 'scm_instance_cached', rn)
 
        else:
 
            log.debug('Trying to get scm_instance of %s from cache', rn)
 
        return _c(rn)
 

	
 
    def scm_instance_no_cache(self):
 
        repo_full_path = safe_str(self.repo_full_path)
 
        alias = get_scm(repo_full_path)[0]
 
        log.debug('Creating instance of %s repository from %s',
 
                  alias, self.repo_full_path)
 
        backend = get_backend(alias)
 

	
 
        if alias == 'hg':
 
            repo = backend(repo_full_path, create=False,
 
                           baseui=self._ui)
 
        else:
 
            repo = backend(repo_full_path, create=False)
 

	
 
        return repo
 

	
 
    def __json__(self):
 
        return dict(landing_rev = self.landing_rev)
 

	
 
class RepoGroup(Base, BaseDbModel):
 
    __tablename__ = 'groups'
 
    __table_args__ = (
 
        CheckConstraint('group_id != group_parent_id', name='ck_groups_no_self_parent'),
 
        _table_args_default_dict,
 
    )
 
    __mapper_args__ = {'order_by': 'group_name'} # TODO: Deprecated as of SQLAlchemy 1.1.
 

	
 
    SEP = ' &raquo; '
 

	
 
    group_id = Column(Integer(), primary_key=True)
 
    group_name = Column(Unicode(255), nullable=False, unique=True) # full path
 
    group_parent_id = Column(Integer(), ForeignKey('groups.group_id'), nullable=True)
 
    parent_group_id = Column('group_parent_id', Integer(), ForeignKey('groups.group_id'), nullable=True)
 
    group_description = Column(Unicode(10000), nullable=False)
 
    enable_locking = Column(Boolean(), nullable=False, default=False)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    repo_group_to_perm = relationship('UserRepoGroupToPerm', cascade='all', order_by='UserRepoGroupToPerm.group_to_perm_id')
 
    users_group_to_perm = relationship('UserGroupRepoGroupToPerm', cascade='all')
 
    parent_group = relationship('RepoGroup', remote_side=group_id)
 
    owner = relationship('User')
 

	
 
    @classmethod
 
    def query(cls, sorted=False):
 
        """Add RepoGroup-specific helpers for common query constructs.
 

	
 
        sorted: if True, apply the default ordering (name, case insensitive).
 
        """
 
        q = super(RepoGroup, cls).query()
 

	
 
        if sorted:
 
            q = q.order_by(func.lower(RepoGroup.group_name))
 

	
 
        return q
 

	
 
    def __init__(self, group_name='', parent_group=None):
 
        self.group_name = group_name
 
        self.parent_group = parent_group
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__, self.group_id,
 
                                      self.group_name)
 

	
 
    @classmethod
 
    def _generate_choice(cls, repo_group):
 
        """Return tuple with group_id and name as html literal"""
 
        from webhelpers.html import literal
 
        if repo_group is None:
 
            return (-1, u'-- %s --' % _('top level'))
 
        return repo_group.group_id, literal(cls.SEP.join(repo_group.full_path_splitted))
 

	
 
    @classmethod
 
    def groups_choices(cls, groups):
 
        """Return tuples with group_id and name as html literal."""
 
        return sorted((cls._generate_choice(g) for g in groups),
 
                      key=lambda c: c[1].split(cls.SEP))
 

	
 
    @classmethod
 
    def url_sep(cls):
 
        return URL_SEP
 

	
 
    @classmethod
 
    def get_by_group_name(cls, group_name, cache=False, case_insensitive=False):
 
        if case_insensitive:
 
            gr = cls.query() \
 
                .filter(func.lower(cls.group_name) == func.lower(group_name))
 
        else:
 
            gr = cls.query() \
 
                .filter(cls.group_name == group_name)
 
        if cache:
 
            gr = gr.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_group_%s" % _hash_key(group_name)
 
                            )
 
            )
 
        return gr.scalar()
 

	
 
    @property
 
    def parents(self):
 
        groups = []
 
        group = self.parent_group
 
        while group is not None:
 
            groups.append(group)
 
            group = group.parent_group
 
            assert group not in groups, group # avoid recursion on bad db content
 
        groups.reverse()
 
        return groups
 

	
 
    @property
 
    def children(self):
 
        return RepoGroup.query().filter(RepoGroup.parent_group == self)
 

	
 
    @property
 
    def name(self):
 
        return self.group_name.split(RepoGroup.url_sep())[-1]
 

	
 
    @property
 
    def full_path(self):
 
        return self.group_name
 

	
 
    @property
 
    def full_path_splitted(self):
 
        return self.group_name.split(RepoGroup.url_sep())
 

	
 
    @property
 
    def repositories(self):
 
        return Repository.query(sorted=True).filter_by(group=self)
 

	
 
    @property
 
    def repositories_recursive_count(self):
 
        cnt = self.repositories.count()
 

	
 
        def children_count(group):
 
            cnt = 0
 
            for child in group.children:
 
                cnt += child.repositories.count()
 
                cnt += children_count(child)
 
            return cnt
 

	
 
        return cnt + children_count(self)
 

	
 
    def _recursive_objects(self, include_repos=True):
 
        all_ = []
 

	
 
        def _get_members(root_gr):
 
            if include_repos:
 
                for r in root_gr.repositories:
 
                    all_.append(r)
 
            childs = root_gr.children.all()
 
            if childs:
 
                for gr in childs:
 
                    all_.append(gr)
 
                    _get_members(gr)
 

	
 
        _get_members(self)
 
        return [self] + all_
 

	
 
    def recursive_groups_and_repos(self):
 
        """
 
        Recursive return all groups, with repositories in those groups
 
        """
 
        return self._recursive_objects()
 

	
 
    def recursive_groups(self):
 
        """
 
        Returns all children groups for this group including children of children
 
        """
 
        return self._recursive_objects(include_repos=False)
 

	
 
    def get_new_name(self, group_name):
 
        """
 
        returns new full group name based on parent and new name
 

	
 
        :param group_name:
 
        """
 
        path_prefix = (self.parent_group.full_path_splitted if
 
                       self.parent_group else [])
 
        return RepoGroup.url_sep().join(path_prefix + [group_name])
 

	
 
    def get_api_data(self):
 
        """
 
        Common function for generating api data
 

	
 
        """
 
        group = self
 
        data = dict(
 
            group_id=group.group_id,
 
            group_name=group.group_name,
 
            group_description=group.group_description,
 
            parent_group=group.parent_group.group_name if group.parent_group else None,
 
            repositories=[x.repo_name for x in group.repositories],
 
            owner=group.owner.username
 
        )
 
        return data
 

	
 

	
 
class Permission(Base, BaseDbModel):
 
    __tablename__ = 'permissions'
 
    __table_args__ = (
 
        Index('p_perm_name_idx', 'permission_name'),
 
        _table_args_default_dict,
 
    )
 

	
 
    PERMS = [
 
        ('hg.admin', _('Kallithea Administrator')),
 

	
 
        ('repository.none', _('Default user has no access to new repositories')),
 
        ('repository.read', _('Default user has read access to new repositories')),
 
        ('repository.write', _('Default user has write access to new repositories')),
 
        ('repository.admin', _('Default user has admin access to new repositories')),
 

	
 
        ('group.none', _('Default user has no access to new repository groups')),
 
        ('group.read', _('Default user has read access to new repository groups')),
 
        ('group.write', _('Default user has write access to new repository groups')),
 
        ('group.admin', _('Default user has admin access to new repository groups')),
 

	
 
        ('usergroup.none', _('Default user has no access to new user groups')),
 
        ('usergroup.read', _('Default user has read access to new user groups')),
 
        ('usergroup.write', _('Default user has write access to new user groups')),
 
        ('usergroup.admin', _('Default user has admin access to new user groups')),
 

	
 
        ('hg.repogroup.create.false', _('Only admins can create repository groups')),
 
        ('hg.repogroup.create.true', _('Non-admins can create repository groups')),
 

	
 
        ('hg.usergroup.create.false', _('Only admins can create user groups')),
 
        ('hg.usergroup.create.true', _('Non-admins can create user groups')),
 

	
 
        ('hg.create.none', _('Only admins can create top level repositories')),
 
        ('hg.create.repository', _('Non-admins can create top level repositories')),
 

	
 
        ('hg.create.write_on_repogroup.true', _('Repository creation enabled with write permission to a repository group')),
 
        ('hg.create.write_on_repogroup.false', _('Repository creation disabled with write permission to a repository group')),
 

	
 
        ('hg.fork.none', _('Only admins can fork repositories')),
 
        ('hg.fork.repository', _('Non-admins can fork repositories')),
 

	
 
        ('hg.register.none', _('Registration disabled')),
 
        ('hg.register.manual_activate', _('User registration with manual account activation')),
 
        ('hg.register.auto_activate', _('User registration with automatic account activation')),
 

	
 
        ('hg.extern_activate.manual', _('Manual activation of external account')),
 
        ('hg.extern_activate.auto', _('Automatic activation of external account')),
 
    ]
 

	
 
    #definition of system default permissions for DEFAULT user
 
    DEFAULT_USER_PERMISSIONS = [
 
        'repository.read',
 
        'group.read',
 
        'usergroup.read',
 
        'hg.create.repository',
 
        'hg.create.write_on_repogroup.true',
 
        'hg.fork.repository',
 
        'hg.register.manual_activate',
 
        'hg.extern_activate.auto',
 
    ]
 

	
 
    # defines which permissions are more important higher the more important
 
    # Weight defines which permissions are more important.
 
    # The higher number the more important.
 
    PERM_WEIGHTS = {
 
        'repository.none': 0,
 
        'repository.read': 1,
 
        'repository.write': 3,
 
        'repository.admin': 4,
 

	
 
        'group.none': 0,
 
        'group.read': 1,
 
        'group.write': 3,
 
        'group.admin': 4,
 

	
 
        'usergroup.none': 0,
 
        'usergroup.read': 1,
 
        'usergroup.write': 3,
 
        'usergroup.admin': 4,
 

	
 
        'hg.repogroup.create.false': 0,
 
        'hg.repogroup.create.true': 1,
 

	
 
        'hg.usergroup.create.false': 0,
 
        'hg.usergroup.create.true': 1,
 

	
 
        'hg.fork.none': 0,
 
        'hg.fork.repository': 1,
 

	
 
        'hg.create.none': 0,
 
        'hg.create.repository': 1
 
    }
 

	
 
    permission_id = Column(Integer(), primary_key=True)
 
    permission_name = Column(String(255), nullable=False)
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s')>" % (
 
            self.__class__.__name__, self.permission_id, self.permission_name
 
        )
 

	
 
    @classmethod
 
    def get_by_key(cls, key):
 
        return cls.query().filter(cls.permission_name == key).scalar()
 

	
 
    @classmethod
 
    def get_default_perms(cls, default_user_id):
 
        q = Session().query(UserRepoToPerm, Repository, cls) \
 
         .join((Repository, UserRepoToPerm.repository_id == Repository.repo_id)) \
 
         .join((cls, UserRepoToPerm.permission_id == cls.permission_id)) \
 
         .filter(UserRepoToPerm.user_id == default_user_id)
 

	
 
        return q.all()
 

	
 
    @classmethod
 
    def get_default_group_perms(cls, default_user_id):
 
        q = Session().query(UserRepoGroupToPerm, RepoGroup, cls) \
 
         .join((RepoGroup, UserRepoGroupToPerm.group_id == RepoGroup.group_id)) \
 
         .join((cls, UserRepoGroupToPerm.permission_id == cls.permission_id)) \
 
         .filter(UserRepoGroupToPerm.user_id == default_user_id)
 

	
 
        return q.all()
 

	
 
    @classmethod
 
    def get_default_user_group_perms(cls, default_user_id):
 
        q = Session().query(UserUserGroupToPerm, UserGroup, cls) \
 
         .join((UserGroup, UserUserGroupToPerm.user_group_id == UserGroup.users_group_id)) \
 
         .join((cls, UserUserGroupToPerm.permission_id == cls.permission_id)) \
 
         .filter(UserUserGroupToPerm.user_id == default_user_id)
 

	
 
        return q.all()
 

	
 

	
 
class UserRepoToPerm(Base, BaseDbModel):
 
    __tablename__ = 'repo_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'repository_id', 'permission_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    repo_to_perm_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
 
    repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    repository = relationship('Repository')
 
    permission = relationship('Permission')
 

	
 
    @classmethod
 
    def create(cls, user, repository, permission):
 
        n = cls()
 
        n.user = user
 
        n.repository = repository
 
        n.permission = permission
 
        Session().add(n)
 
        return n
 

	
 
    def __unicode__(self):
 
        return u'<%s => %s >' % (self.user, self.repository)
 

	
 

	
 
class UserUserGroupToPerm(Base, BaseDbModel):
 
    __tablename__ = 'user_user_group_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'user_group_id', 'permission_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    user_user_group_to_perm_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
 
    user_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    user_group = relationship('UserGroup')
 
    permission = relationship('Permission')
 

	
 
    @classmethod
 
    def create(cls, user, user_group, permission):
 
        n = cls()
 
        n.user = user
 
        n.user_group = user_group
 
        n.permission = permission
 
        Session().add(n)
 
        return n
 

	
 
    def __unicode__(self):
 
        return u'<%s => %s >' % (self.user, self.user_group)
 

	
 

	
 
class UserToPerm(Base, BaseDbModel):
 
    __tablename__ = 'user_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'permission_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    user_to_perm_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    permission = relationship('Permission')
 

	
 
    def __unicode__(self):
 
        return u'<%s => %s >' % (self.user, self.permission)
 

	
 

	
 
class UserGroupRepoToPerm(Base, BaseDbModel):
 
    __tablename__ = 'users_group_repo_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('repository_id', 'users_group_id', 'permission_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    users_group_to_perm_id = Column(Integer(), primary_key=True)
 
    users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
 
    permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
 
    repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 

	
 
    users_group = relationship('UserGroup')
 
    permission = relationship('Permission')
 
    repository = relationship('Repository')
 

	
 
    @classmethod
 
    def create(cls, users_group, repository, permission):
 
        n = cls()
 
        n.users_group = users_group
 
        n.repository = repository
 
        n.permission = permission
 
        Session().add(n)
 
        return n
 

	
 
    def __unicode__(self):
 
        return u'<UserGroupRepoToPerm:%s => %s >' % (self.users_group, self.repository)
 

	
 

	
 
class UserGroupUserGroupToPerm(Base, BaseDbModel):
 
    __tablename__ = 'user_group_user_group_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('target_user_group_id', 'user_group_id', 'permission_id'),
 
        CheckConstraint('target_user_group_id != user_group_id', name='ck_user_group_user_group_to_perm_no_self_target'),
 
        _table_args_default_dict,
 
    )
 

	
 
    user_group_user_group_to_perm_id = Column(Integer(), primary_key=True)
 
    target_user_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
 
    permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
 
    user_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
 

	
 
    target_user_group = relationship('UserGroup', primaryjoin='UserGroupUserGroupToPerm.target_user_group_id==UserGroup.users_group_id')
 
    user_group = relationship('UserGroup', primaryjoin='UserGroupUserGroupToPerm.user_group_id==UserGroup.users_group_id')
 
    permission = relationship('Permission')
 

	
 
    @classmethod
 
    def create(cls, target_user_group, user_group, permission):
 
        n = cls()
 
        n.target_user_group = target_user_group
 
        n.user_group = user_group
 
        n.permission = permission
 
        Session().add(n)
 
        return n
 

	
 
    def __unicode__(self):
 
        return u'<UserGroupUserGroup:%s => %s >' % (self.target_user_group, self.user_group)
 

	
 

	
 
class UserGroupToPerm(Base, BaseDbModel):
 
    __tablename__ = 'users_group_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('users_group_id', 'permission_id',),
 
        _table_args_default_dict,
 
    )
 

	
 
    users_group_to_perm_id = Column(Integer(), primary_key=True)
 
    users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
 
    permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
 

	
 
    users_group = relationship('UserGroup')
 
    permission = relationship('Permission')
 

	
 

	
 
class UserRepoGroupToPerm(Base, BaseDbModel):
 
    __tablename__ = 'user_repo_group_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'group_id', 'permission_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    group_to_perm_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    group_id = Column(Integer(), ForeignKey('groups.group_id'), nullable=False)
 
    permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    group = relationship('RepoGroup')
 
    permission = relationship('Permission')
 

	
 
    @classmethod
 
    def create(cls, user, repository_group, permission):
 
        n = cls()
 
        n.user = user
 
        n.group = repository_group
 
        n.permission = permission
 
        Session().add(n)
 
        return n
 

	
 

	
 
class UserGroupRepoGroupToPerm(Base, BaseDbModel):
 
    __tablename__ = 'users_group_repo_group_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('users_group_id', 'group_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    users_group_repo_group_to_perm_id = Column(Integer(), primary_key=True)
 
    users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
 
    group_id = Column(Integer(), ForeignKey('groups.group_id'), nullable=False)
 
    permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
 

	
 
    users_group = relationship('UserGroup')
 
    permission = relationship('Permission')
 
    group = relationship('RepoGroup')
 

	
 
    @classmethod
 
    def create(cls, user_group, repository_group, permission):
 
        n = cls()
 
        n.users_group = user_group
 
        n.group = repository_group
 
        n.permission = permission
 
        Session().add(n)
 
        return n
 

	
 

	
 
class Statistics(Base, BaseDbModel):
 
    __tablename__ = 'statistics'
 
    __table_args__ = (
 
         _table_args_default_dict,
 
    )
 

	
 
    stat_id = Column(Integer(), primary_key=True)
 
    repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False, unique=True)
 
    stat_on_revision = Column(Integer(), nullable=False)
 
    commit_activity = Column(LargeBinary(1000000), nullable=False)#JSON data
 
    commit_activity_combined = Column(LargeBinary(), nullable=False)#JSON data
 
    languages = Column(LargeBinary(1000000), nullable=False)#JSON data
 

	
 
    repository = relationship('Repository', single_parent=True)
 

	
 

	
 
class UserFollowing(Base, BaseDbModel):
 
    __tablename__ = 'user_followings'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'follows_repository_id', name='uq_user_followings_user_repo'),
 
        UniqueConstraint('user_id', 'follows_user_id', name='uq_user_followings_user_user'),
 
        _table_args_default_dict,
 
    )
 

	
 
    user_following_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    follows_repo_id = Column("follows_repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True)
 
    follows_user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=True)
 
    follows_from = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    user = relationship('User', primaryjoin='User.user_id==UserFollowing.user_id')
 

	
 
    follows_user = relationship('User', primaryjoin='User.user_id==UserFollowing.follows_user_id')
 
    follows_repository = relationship('Repository', order_by=lambda: func.lower(Repository.repo_name))
 

	
 
    @classmethod
 
    def get_repo_followers(cls, repo_id):
 
        return cls.query().filter(cls.follows_repo_id == repo_id)
 

	
 

	
 
class CacheInvalidation(Base, BaseDbModel):
 
    __tablename__ = 'cache_invalidation'
 
    __table_args__ = (
 
        Index('key_idx', 'cache_key'),
 
        _table_args_default_dict,
 
    )
 

	
 
    # cache_id, not used
 
    cache_id = Column(Integer(), primary_key=True)
 
    # cache_key as created by _get_cache_key
 
    cache_key = Column(Unicode(255), nullable=False, unique=True)
 
    # cache_args is a repo_name
 
    cache_args = Column(Unicode(255), nullable=False)
 
    # instance sets cache_active True when it is caching, other instances set
 
    # cache_active to False to indicate that this cache is invalid
 
    cache_active = Column(Boolean(), nullable=False, default=False)
 

	
 
    def __init__(self, cache_key, repo_name=''):
 
        self.cache_key = cache_key
 
        self.cache_args = repo_name
 
        self.cache_active = False
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s[%s]')>" % (
 
            self.__class__.__name__,
 
            self.cache_id, self.cache_key, self.cache_active)
 

	
 
    def _cache_key_partition(self):
 
        prefix, repo_name, suffix = self.cache_key.partition(self.cache_args)
 
        return prefix, repo_name, suffix
 

	
 
    def get_prefix(self):
 
        """
 
        get prefix that might have been used in _get_cache_key to
 
        generate self.cache_key. Only used for informational purposes
 
        in repo_edit.html.
 
        """
 
        # prefix, repo_name, suffix
 
        return self._cache_key_partition()[0]
 

	
 
    def get_suffix(self):
 
        """
 
        get suffix that might have been used in _get_cache_key to
 
        generate self.cache_key. Only used for informational purposes
 
        in repo_edit.html.
 
        """
 
        # prefix, repo_name, suffix
 
        return self._cache_key_partition()[2]
 

	
 
    @classmethod
 
    def clear_cache(cls):
 
        """
 
        Delete all cache keys from database.
 
        Should only be run when all instances are down and all entries thus stale.
 
        """
 
        cls.query().delete()
 
        Session().commit()
 

	
 
    @classmethod
 
    def _get_cache_key(cls, key):
 
        """
 
        Wrapper for generating a unique cache key for this instance and "key".
 
        key must / will start with a repo_name which will be stored in .cache_args .
 
        """
 
        import kallithea
 
        prefix = kallithea.CONFIG.get('instance_id', '')
 
        return "%s%s" % (prefix, key)
 

	
 
    @classmethod
 
    def set_invalidate(cls, repo_name):
 
        """
 
        Mark all caches of a repo as invalid in the database.
 
        """
 
        inv_objs = Session().query(cls).filter(cls.cache_args == repo_name).all()
 
        log.debug('for repo %s got %s invalidation objects',
 
                  safe_str(repo_name), inv_objs)
 

	
 
        for inv_obj in inv_objs:
 
            log.debug('marking %s key for invalidation based on repo_name=%s',
 
                      inv_obj, safe_str(repo_name))
 
            Session().delete(inv_obj)
 
        Session().commit()
 

	
 
    @classmethod
 
    def test_and_set_valid(cls, repo_name, kind, valid_cache_keys=None):
 
        """
 
        Mark this cache key as active and currently cached.
 
        Return True if the existing cache registration still was valid.
 
        Return False to indicate that it had been invalidated and caches should be refreshed.
 
        """
 

	
 
        key = (repo_name + '_' + kind) if kind else repo_name
 
        cache_key = cls._get_cache_key(key)
 

	
 
        if valid_cache_keys and cache_key in valid_cache_keys:
 
            return True
 

	
 
        inv_obj = cls.query().filter(cls.cache_key == cache_key).scalar()
 
        if inv_obj is None:
 
            inv_obj = cls(cache_key, repo_name)
 
            Session().add(inv_obj)
 
        elif inv_obj.cache_active:
 
            return True
 
        inv_obj.cache_active = True
 
        try:
 
            Session().commit()
 
        except sqlalchemy.exc.IntegrityError:
 
            log.error('commit of CacheInvalidation failed - retrying')
 
            Session().rollback()
 
            inv_obj = cls.query().filter(cls.cache_key == cache_key).scalar()
 
            if inv_obj is None:
 
                log.error('failed to create CacheInvalidation entry')
 
                # TODO: fail badly?
 
            # else: TOCTOU - another thread added the key at the same time; no further action required
 
        return False
 

	
 
    @classmethod
 
    def get_valid_cache_keys(cls):
 
        """
 
        Return opaque object with information of which caches still are valid
 
        and can be used without checking for invalidation.
 
        """
 
        return set(inv_obj.cache_key for inv_obj in cls.query().filter(cls.cache_active).all())
 

	
 

	
 
class ChangesetComment(Base, BaseDbModel):
 
    __tablename__ = 'changeset_comments'
 
    __table_args__ = (
 
        Index('cc_revision_idx', 'revision'),
 
        Index('cc_pull_request_id_idx', 'pull_request_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    comment_id = Column(Integer(), primary_key=True)
 
    repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    revision = Column(String(40), nullable=True)
 
    pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=True)
 
    line_no = Column(Unicode(10), nullable=True)
 
    f_path = Column(Unicode(1000), nullable=True)
 
    author_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    text = Column(UnicodeText(), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    modified_at = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    author = relationship('User')
 
    repo = relationship('Repository')
 
    # status_change is frequently used directly in templates - make it a lazy
 
    # join to avoid fetching each related ChangesetStatus on demand.
 
    # There will only be one ChangesetStatus referencing each comment so the join will not explode.
 
    status_change = relationship('ChangesetStatus',
 
                                 cascade="all, delete-orphan", lazy='joined')
 
    pull_request = relationship('PullRequest')
 

	
 
    @classmethod
 
    def get_users(cls, revision=None, pull_request_id=None):
 
        """
 
        Returns user associated with this ChangesetComment. ie those
 
        who actually commented
 

	
 
        :param cls:
 
        :param revision:
 
        """
 
        q = Session().query(User) \
 
                .join(ChangesetComment.author)
 
        if revision is not None:
 
            q = q.filter(cls.revision == revision)
 
        elif pull_request_id is not None:
 
            q = q.filter(cls.pull_request_id == pull_request_id)
 
        return q.all()
 

	
 
    def url(self):
 
        anchor = "comment-%s" % self.comment_id
 
        import kallithea.lib.helpers as h
 
        if self.revision:
 
            return h.url('changeset_home', repo_name=self.repo.repo_name, revision=self.revision, anchor=anchor)
 
        elif self.pull_request_id is not None:
 
            return self.pull_request.url(anchor=anchor)
 

	
 
    def deletable(self):
 
        return self.created_on > datetime.datetime.now() - datetime.timedelta(minutes=5)
 

	
 

	
 
class ChangesetStatus(Base, BaseDbModel):
 
    __tablename__ = 'changeset_statuses'
 
    __table_args__ = (
 
        Index('cs_revision_idx', 'revision'),
 
        Index('cs_version_idx', 'version'),
 
        Index('cs_pull_request_id_idx', 'pull_request_id'),
 
        Index('cs_changeset_comment_id_idx', 'changeset_comment_id'),
 
        Index('cs_pull_request_id_user_id_version_idx', 'pull_request_id', 'user_id', 'version'),
 
        Index('cs_repo_id_pull_request_id_idx', 'repo_id', 'pull_request_id'),
 
        UniqueConstraint('repo_id', 'revision', 'version'),
 
        _table_args_default_dict,
 
    )
 

	
 
    STATUS_NOT_REVIEWED = DEFAULT = 'not_reviewed'
 
    STATUS_APPROVED = 'approved'
 
    STATUS_REJECTED = 'rejected' # is shown as "Not approved" - TODO: change database content / scheme
 
    STATUS_UNDER_REVIEW = 'under_review'
 

	
 
    STATUSES = [
 
        (STATUS_NOT_REVIEWED, _("Not reviewed")),  # (no icon) and default
 
        (STATUS_UNDER_REVIEW, _("Under review")),
 
        (STATUS_REJECTED, _("Not approved")),
 
        (STATUS_APPROVED, _("Approved")),
 
    ]
 
    STATUSES_DICT = dict(STATUSES)
 

	
 
    changeset_status_id = Column(Integer(), primary_key=True)
 
    repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    revision = Column(String(40), nullable=True)
 
    status = Column(String(128), nullable=False, default=DEFAULT)
 
    changeset_comment_id = Column(Integer(), ForeignKey('changeset_comments.comment_id'), nullable=False)
 
    modified_at = Column(DateTime(), nullable=False, default=datetime.datetime.now)
 
    version = Column(Integer(), nullable=False, default=0)
 
    pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=True)
 

	
 
    author = relationship('User')
 
    repo = relationship('Repository')
 
    comment = relationship('ChangesetComment')
 
    pull_request = relationship('PullRequest')
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s')>" % (
 
            self.__class__.__name__,
 
            self.status, self.author
 
        )
 

	
 
    @classmethod
 
    def get_status_lbl(cls, value):
 
        return cls.STATUSES_DICT.get(value)
 

	
 
    @property
 
    def status_lbl(self):
 
        return ChangesetStatus.get_status_lbl(self.status)
 

	
 

	
 
class PullRequest(Base, BaseDbModel):
 
    __tablename__ = 'pull_requests'
 
    __table_args__ = (
 
        Index('pr_org_repo_id_idx', 'org_repo_id'),
 
        Index('pr_other_repo_id_idx', 'other_repo_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    # values for .status
 
    STATUS_NEW = u'new'
 
    STATUS_CLOSED = u'closed'
 

	
 
    pull_request_id = Column(Integer(), primary_key=True)
 
    title = Column(Unicode(255), nullable=False)
 
    description = Column(UnicodeText(), nullable=False)
 
    status = Column(Unicode(255), nullable=False, default=STATUS_NEW) # only for closedness, not approve/reject/etc
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    updated_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    _revisions = Column('revisions', UnicodeText(), nullable=False)
 
    org_repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    org_ref = Column(Unicode(255), nullable=False)
 
    other_repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    other_ref = Column(Unicode(255), nullable=False)
 

	
 
    @hybrid_property
 
    def revisions(self):
 
        return self._revisions.split(':')
 

	
 
    @revisions.setter
 
    def revisions(self, val):
 
        self._revisions = safe_unicode(':'.join(val))
 

	
 
    @property
 
    def org_ref_parts(self):
 
        return self.org_ref.split(':')
 

	
 
    @property
 
    def other_ref_parts(self):
 
        return self.other_ref.split(':')
 

	
 
    owner = relationship('User')
 
    reviewers = relationship('PullRequestReviewers',
 
                             cascade="all, delete-orphan")
 
    org_repo = relationship('Repository', primaryjoin='PullRequest.org_repo_id==Repository.repo_id')
 
    other_repo = relationship('Repository', primaryjoin='PullRequest.other_repo_id==Repository.repo_id')
 
    statuses = relationship('ChangesetStatus', order_by='ChangesetStatus.changeset_status_id')
 
    comments = relationship('ChangesetComment', order_by='ChangesetComment.comment_id',
 
                             cascade="all, delete-orphan")
 

	
 
    @classmethod
 
    def query(cls, reviewer_id=None, include_closed=True, sorted=False):
 
        """Add PullRequest-specific helpers for common query constructs.
 

	
 
        reviewer_id: only PRs with the specified user added as reviewer.
 

	
 
        include_closed: if False, do not include closed PRs.
 

	
 
        sorted: if True, apply the default ordering (newest first).
 
        """
 
        q = super(PullRequest, cls).query()
 

	
 
        if reviewer_id is not None:
 
            q = q.join(PullRequestReviewers).filter(PullRequestReviewers.user_id == reviewer_id)
 

	
 
        if not include_closed:
 
            q = q.filter(PullRequest.status != PullRequest.STATUS_CLOSED)
 

	
 
        if sorted:
 
            q = q.order_by(PullRequest.created_on.desc())
 

	
 
        return q
 

	
 
    def get_reviewer_users(self):
 
        """Like .reviewers, but actually returning the users"""
 
        return User.query() \
 
            .join(PullRequestReviewers) \
 
            .filter(PullRequestReviewers.pull_request == self) \
 
            .order_by(PullRequestReviewers.pull_request_reviewers_id) \
 
            .all()
 

	
 
    def is_closed(self):
 
        return self.status == self.STATUS_CLOSED
 

	
 
    def user_review_status(self, user_id):
 
        """Return the user's latest status votes on PR"""
 
        # note: no filtering on repo - that would be redundant
 
        status = ChangesetStatus.query() \
 
            .filter(ChangesetStatus.pull_request == self) \
 
            .filter(ChangesetStatus.user_id == user_id) \
 
            .order_by(ChangesetStatus.version) \
 
            .first()
 
        return str(status.status) if status else ''
 

	
 
    @classmethod
 
    def make_nice_id(cls, pull_request_id):
 
        '''Return pull request id nicely formatted for displaying'''
 
        return '#%s' % pull_request_id
 

	
 
    def nice_id(self):
 
        '''Return the id of this pull request, nicely formatted for displaying'''
 
        return self.make_nice_id(self.pull_request_id)
 

	
 
    def __json__(self):
 
        return dict(
 
            revisions=self.revisions
 
        )
 

	
 
    def url(self, **kwargs):
 
        canonical = kwargs.pop('canonical', None)
 
        import kallithea.lib.helpers as h
 
        b = self.org_ref_parts[1]
 
        if b != self.other_ref_parts[1]:
 
            s = '/_/' + b
 
        else:
 
            s = '/_/' + self.title
 
        kwargs['extra'] = urlreadable(s)
 
        if canonical:
 
            return h.canonical_url('pullrequest_show', repo_name=self.other_repo.repo_name,
 
                                   pull_request_id=self.pull_request_id, **kwargs)
 
        return h.url('pullrequest_show', repo_name=self.other_repo.repo_name,
 
                     pull_request_id=self.pull_request_id, **kwargs)
 

	
 
class PullRequestReviewers(Base, BaseDbModel):
 
    __tablename__ = 'pull_request_reviewers'
 
    __table_args__ = (
 
        Index('pull_request_reviewers_user_id_idx', 'user_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    def __init__(self, user=None, pull_request=None):
 
        self.user = user
 
        self.pull_request = pull_request
 

	
 
    pull_request_reviewers_id = Column('pull_requests_reviewers_id', Integer(), primary_key=True)
 
    pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=False)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    pull_request = relationship('PullRequest')
 

	
 

	
 
class Notification(Base, BaseDbModel):
 
    __tablename__ = 'notifications'
 
    __table_args__ = (
 
        Index('notification_type_idx', 'type'),
 
        _table_args_default_dict,
 
    )
 

	
 
    TYPE_CHANGESET_COMMENT = u'cs_comment'
 
    TYPE_MESSAGE = u'message'
 
    TYPE_MENTION = u'mention' # not used
 
    TYPE_REGISTRATION = u'registration'
 
    TYPE_PULL_REQUEST = u'pull_request'
 
    TYPE_PULL_REQUEST_COMMENT = u'pull_request_comment'
 

	
 
    notification_id = Column(Integer(), primary_key=True)
 
    subject = Column(Unicode(512), nullable=False)
 
    body = Column(UnicodeText(), nullable=False)
 
    created_by = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    type_ = Column('type', Unicode(255), nullable=False)
 

	
 
    created_by_user = relationship('User')
 
    notifications_to_users = relationship('UserNotification', cascade="all, delete-orphan")
 

	
 
    @property
 
    def recipients(self):
 
        return [x.user for x in UserNotification.query()
 
                .filter(UserNotification.notification == self)
 
                .order_by(UserNotification.user_id.asc()).all()]
 

	
 
    @classmethod
 
    def create(cls, created_by, subject, body, recipients, type_=None):
 
        if type_ is None:
 
            type_ = Notification.TYPE_MESSAGE
 

	
 
        notification = cls()
 
        notification.created_by_user = created_by
 
        notification.subject = subject
 
        notification.body = body
 
        notification.type_ = type_
 
        notification.created_on = datetime.datetime.now()
 

	
 
        for recipient in recipients:
 
            un = UserNotification()
 
            un.notification = notification
 
            un.user_id = recipient.user_id
 
            # Mark notifications to self "pre-read" - should perhaps just be skipped
 
            if recipient == created_by:
 
                un.read = True
 
            Session().add(un)
 

	
 
        Session().add(notification)
 
        Session().flush() # assign notification.notification_id
 
        return notification
 

	
 
    @property
 
    def description(self):
 
        from kallithea.model.notification import NotificationModel
 
        return NotificationModel().make_description(self)
 

	
 

	
 
class UserNotification(Base, BaseDbModel):
 
    __tablename__ = 'user_to_notification'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'notification_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), primary_key=True)
 
    notification_id = Column(Integer(), ForeignKey('notifications.notification_id'), primary_key=True)
 
    read = Column(Boolean, nullable=False, default=False)
 
    sent_on = Column(DateTime(timezone=False), nullable=True) # FIXME: not nullable?
 

	
 
    user = relationship('User')
 
    notification = relationship('Notification')
 

	
 
    def mark_as_read(self):
 
        self.read = True
 

	
 

	
 
class Gist(Base, BaseDbModel):
 
    __tablename__ = 'gists'
 
    __table_args__ = (
 
        Index('g_gist_access_id_idx', 'gist_access_id'),
 
        Index('g_created_on_idx', 'created_on'),
 
        _table_args_default_dict,
 
    )
 

	
 
    GIST_PUBLIC = u'public'
 
    GIST_PRIVATE = u'private'
 
    DEFAULT_FILENAME = u'gistfile1.txt'
 

	
 
    gist_id = Column(Integer(), primary_key=True)
 
    gist_access_id = Column(Unicode(250), nullable=False)
 
    gist_description = Column(UnicodeText(), nullable=False)
 
    gist_owner = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    gist_expires = Column(Float(53), nullable=False)
 
    gist_type = Column(Unicode(128), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    modified_at = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    owner = relationship('User')
 

	
 
    def __repr__(self):
 
        return '<Gist:[%s]%s>' % (self.gist_type, self.gist_access_id)
 

	
 
    @classmethod
 
    def get_or_404(cls, id_):
 
        res = cls.query().filter(cls.gist_access_id == id_).scalar()
 
        if res is None:
 
            raise HTTPNotFound
 
        return res
 

	
 
    @classmethod
 
    def get_by_access_id(cls, gist_access_id):
 
        return cls.query().filter(cls.gist_access_id == gist_access_id).scalar()
 

	
 
    def gist_url(self):
 
        import kallithea
 
        alias_url = kallithea.CONFIG.get('gist_alias_url')
 
        if alias_url:
 
            return alias_url.replace('{gistid}', self.gist_access_id)
 

	
 
        import kallithea.lib.helpers as h
 
        return h.canonical_url('gist', gist_id=self.gist_access_id)
 

	
 
    @classmethod
 
    def base_path(cls):
 
        """
 
        Returns base path where all gists are stored
 

	
 
        :param cls:
 
        """
 
        from kallithea.model.gist import GIST_STORE_LOC
 
        q = Session().query(Ui) \
 
            .filter(Ui.ui_key == URL_SEP)
 
        q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
 
        return os.path.join(q.one().ui_value, GIST_STORE_LOC)
 

	
 
    def get_api_data(self):
 
        """
 
        Common function for generating gist related data for API
 
        """
 
        gist = self
 
        data = dict(
 
            gist_id=gist.gist_id,
 
            type=gist.gist_type,
 
            access_id=gist.gist_access_id,
 
            description=gist.gist_description,
 
            url=gist.gist_url(),
 
            expires=gist.gist_expires,
 
            created_on=gist.created_on,
 
        )
 
        return data
 

	
 
    def __json__(self):
 
        data = dict(
 
        )
 
        data.update(self.get_api_data())
 
        return data
 
    ## SCM functions
 

	
 
    @property
 
    def scm_instance(self):
 
        from kallithea.lib.vcs import get_repo
 
        base_path = self.base_path()
 
        return get_repo(os.path.join(*map(safe_str,
 
                                          [base_path, self.gist_access_id])))
kallithea/model/forms.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
these are form validation classes
 
http://formencode.org/module-formencode.validators.html
 
for list of all available validators
 

	
 
we can create our own validators
 

	
 
The table below outlines the options which can be used in a schema in addition to the validators themselves
 
pre_validators          []     These validators will be applied before the schema
 
chained_validators      []     These validators will be applied after the schema
 
allow_extra_fields      False     If True, then it is not an error when keys that aren't associated with a validator are present
 
filter_extra_fields     False     If True, then keys that aren't associated with a validator are removed
 
if_key_missing          NoDefault If this is given, then any keys that aren't available but are expected will be replaced with this value (and then validated). This does not override a present .if_missing attribute on validators. NoDefault is a special FormEncode class to mean that no default values has been specified and therefore missing keys shouldn't take a default value.
 
ignore_key_missing      False     If True, then missing keys will be missing in the result, if the validator doesn't have .if_missing on it already
 

	
 

	
 
<name> = formencode.validators.<name of validator>
 
<name> must equal form name
 
list=[1,2,3,4,5]
 
for SELECT use formencode.All(OneOf(list), Int())
 

	
 
"""
 
import logging
 

	
 
import formencode
 
from formencode import All
 

	
 
from pylons.i18n.translation import _
 

	
 
from kallithea import BACKENDS
 
from kallithea.model import validators as v
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class LoginForm(formencode.Schema):
 
    allow_extra_fields = True
 
    filter_extra_fields = True
 
    username = v.UnicodeString(
 
        strip=True,
 
        min=1,
 
        not_empty=True,
 
        messages={
 
           'empty': _('Please enter a login'),
 
           'tooShort': _('Enter a value %(min)i characters long or more')}
 
    )
 

	
 
    password = v.UnicodeString(
 
        strip=False,
 
        min=3,
 
        not_empty=True,
 
        messages={
 
            'empty': _('Please enter a password'),
 
            'tooShort': _('Enter %(min)i characters or more')}
 
    )
 

	
 
    remember = v.StringBoolean(if_missing=False)
 

	
 
    chained_validators = [v.ValidAuth()]
 

	
 

	
 
def PasswordChangeForm(username):
 
    class _PasswordChangeForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 

	
 
        current_password = v.ValidOldPassword(username)(not_empty=True)
 
        new_password = All(v.ValidPassword(), v.UnicodeString(strip=False, min=6))
 
        new_password_confirmation = All(v.ValidPassword(), v.UnicodeString(strip=False, min=6))
 

	
 
        chained_validators = [v.ValidPasswordsMatch('new_password',
 
                                                    'new_password_confirmation')]
 
    return _PasswordChangeForm
 

	
 

	
 
def UserForm(edit=False, old_data=None):
 
    old_data = old_data or {}
 
    class _UserForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        username = All(v.UnicodeString(strip=True, min=1, not_empty=True),
 
                       v.ValidUsername(edit, old_data))
 
        if edit:
 
            new_password = All(
 
                v.ValidPassword(),
 
                v.UnicodeString(strip=False, min=6, not_empty=False)
 
            )
 
            password_confirmation = All(
 
                v.ValidPassword(),
 
                v.UnicodeString(strip=False, min=6, not_empty=False),
 
            )
 
            admin = v.StringBoolean(if_missing=False)
 
            chained_validators = [v.ValidPasswordsMatch('new_password',
 
                                                        'password_confirmation')]
 
        else:
 
            password = All(
 
                v.ValidPassword(),
 
                v.UnicodeString(strip=False, min=6, not_empty=True)
 
            )
 
            password_confirmation = All(
 
                v.ValidPassword(),
 
                v.UnicodeString(strip=False, min=6, not_empty=False)
 
            )
 
            chained_validators = [v.ValidPasswordsMatch('password',
 
                                                        'password_confirmation')]
 

	
 
        active = v.StringBoolean(if_missing=False)
 
        firstname = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        lastname = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        email = All(v.Email(not_empty=True), v.UniqSystemEmail(old_data))
 
        extern_name = v.UnicodeString(strip=True, if_missing=None)
 
        extern_type = v.UnicodeString(strip=True, if_missing=None)
 
    return _UserForm
 

	
 

	
 
def UserGroupForm(edit=False, old_data=None, available_members=None):
 
    old_data = old_data or {}
 
    available_members = available_members or []
 
    class _UserGroupForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 

	
 
        users_group_name = All(
 
            v.UnicodeString(strip=True, min=1, not_empty=True),
 
            v.ValidUserGroup(edit, old_data)
 
        )
 
        user_group_description = v.UnicodeString(strip=True, min=1,
 
                                                 not_empty=False)
 

	
 
        users_group_active = v.StringBoolean(if_missing=False)
 

	
 
        if edit:
 
            users_group_members = v.OneOf(
 
                available_members, hideList=False, testValueList=True,
 
                if_missing=None, not_empty=False
 
            )
 

	
 
    return _UserGroupForm
 

	
 

	
 
def RepoGroupForm(edit=False, old_data=None, repo_groups=None,
 
                   can_create_in_root=False):
 
    old_data = old_data or {}
 
    repo_groups = repo_groups or []
 
    repo_group_ids = [rg[0] for rg in repo_groups]
 
    class _RepoGroupForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 

	
 
        group_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
 
                         v.SlugifyName(),
 
                         v.ValidRegex(msg=_('Name must not contain only digits'))(r'(?!^\d+$)^.+$'))
 
        group_description = v.UnicodeString(strip=True, min=1,
 
                                            not_empty=False)
 
        group_copy_permissions = v.StringBoolean(if_missing=False)
 

	
 
        if edit:
 
            #FIXME: do a special check that we cannot move a group to one of
 
            #its children
 
            pass
 

	
 
        group_parent_id = All(v.CanCreateGroup(can_create_in_root),
 
        parent_group_id = All(v.CanCreateGroup(can_create_in_root),
 
                              v.OneOf(repo_group_ids, hideList=False,
 
                                      testValueList=True,
 
                                      if_missing=None, not_empty=True),
 
                              v.Int(min=-1, not_empty=True))
 
        enable_locking = v.StringBoolean(if_missing=False)
 
        chained_validators = [v.ValidRepoGroup(edit, old_data)]
 

	
 
    return _RepoGroupForm
 

	
 

	
 
def RegisterForm(edit=False, old_data=None):
 
    class _RegisterForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        username = All(
 
            v.ValidUsername(edit, old_data),
 
            v.UnicodeString(strip=True, min=1, not_empty=True)
 
        )
 
        password = All(
 
            v.ValidPassword(),
 
            v.UnicodeString(strip=False, min=6, not_empty=True)
 
        )
 
        password_confirmation = All(
 
            v.ValidPassword(),
 
            v.UnicodeString(strip=False, min=6, not_empty=True)
 
        )
 
        active = v.StringBoolean(if_missing=False)
 
        firstname = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        lastname = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        email = All(v.Email(not_empty=True), v.UniqSystemEmail(old_data))
 

	
 
        chained_validators = [v.ValidPasswordsMatch('password',
 
                                                    'password_confirmation')]
 

	
 
    return _RegisterForm
 

	
 

	
 
def PasswordResetRequestForm():
 
    class _PasswordResetRequestForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        email = v.Email(not_empty=True)
 
    return _PasswordResetRequestForm
 

	
 
def PasswordResetConfirmationForm():
 
    class _PasswordResetConfirmationForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 

	
 
        email = v.UnicodeString(strip=True, not_empty=True)
 
        timestamp = v.Number(strip=True, not_empty=True)
 
        token = v.UnicodeString(strip=True, not_empty=True)
 
        password = All(v.ValidPassword(), v.UnicodeString(strip=False, min=6))
 
        password_confirm = All(v.ValidPassword(), v.UnicodeString(strip=False, min=6))
 

	
 
        chained_validators = [v.ValidPasswordsMatch('password',
 
                                                    'password_confirm')]
 
    return _PasswordResetConfirmationForm
 

	
 
def RepoForm(edit=False, old_data=None, supported_backends=BACKENDS.keys(),
 
             repo_groups=None, landing_revs=None):
 
    old_data = old_data or {}
 
    repo_groups = repo_groups or []
 
    landing_revs = landing_revs or []
 
    repo_group_ids = [rg[0] for rg in repo_groups]
 
    class _RepoForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        repo_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
 
                        v.SlugifyName())
 
        repo_group = All(v.CanWriteGroup(old_data),
 
                         v.OneOf(repo_group_ids, hideList=True),
 
                         v.Int(min=-1, not_empty=True))
 
        repo_type = v.OneOf(supported_backends, required=False,
 
                            if_missing=old_data.get('repo_type'))
 
        repo_description = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        repo_private = v.StringBoolean(if_missing=False)
 
        repo_landing_rev = v.OneOf(landing_revs, hideList=True)
 
        repo_copy_permissions = v.StringBoolean(if_missing=False)
 
        clone_uri = All(v.UnicodeString(strip=True, min=1, not_empty=False))
 

	
 
        repo_enable_statistics = v.StringBoolean(if_missing=False)
 
        repo_enable_downloads = v.StringBoolean(if_missing=False)
 
        repo_enable_locking = v.StringBoolean(if_missing=False)
 

	
 
        if edit:
 
            owner = All(v.UnicodeString(not_empty=True), v.ValidRepoUser())
 
            # Not a real field - just for reference for validation:
 
            # clone_uri_hidden = v.UnicodeString(if_missing='')
 

	
 
        chained_validators = [v.ValidCloneUri(),
 
                              v.ValidRepoName(edit, old_data)]
 
    return _RepoForm
 

	
 

	
 
def RepoPermsForm():
 
    class _RepoPermsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        chained_validators = [v.ValidPerms(type_='repo')]
 
    return _RepoPermsForm
 

	
 

	
 
def RepoGroupPermsForm(valid_recursive_choices):
 
    class _RepoGroupPermsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        recursive = v.OneOf(valid_recursive_choices)
 
        chained_validators = [v.ValidPerms(type_='repo_group')]
 
    return _RepoGroupPermsForm
 

	
 

	
 
def UserGroupPermsForm():
 
    class _UserPermsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        chained_validators = [v.ValidPerms(type_='user_group')]
 
    return _UserPermsForm
 

	
 

	
 
def RepoFieldForm():
 
    class _RepoFieldForm(formencode.Schema):
 
        filter_extra_fields = True
 
        allow_extra_fields = True
 

	
 
        new_field_key = All(v.FieldKey(),
 
                            v.UnicodeString(strip=True, min=3, not_empty=True))
 
        new_field_value = v.UnicodeString(not_empty=False, if_missing='')
 
        new_field_type = v.OneOf(['str', 'unicode', 'list', 'tuple'],
 
                                 if_missing='str')
 
        new_field_label = v.UnicodeString(not_empty=False)
 
        new_field_desc = v.UnicodeString(not_empty=False)
 

	
 
    return _RepoFieldForm
 

	
 

	
 
def RepoForkForm(edit=False, old_data=None, supported_backends=BACKENDS.keys(),
 
                 repo_groups=None, landing_revs=None):
 
    old_data = old_data or {}
 
    repo_groups = repo_groups or []
 
    landing_revs = landing_revs or []
 
    repo_group_ids = [rg[0] for rg in repo_groups]
 
    class _RepoForkForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        repo_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
 
                        v.SlugifyName())
 
        repo_group = All(v.CanWriteGroup(),
 
                         v.OneOf(repo_group_ids, hideList=True),
 
                         v.Int(min=-1, not_empty=True))
 
        repo_type = All(v.ValidForkType(old_data), v.OneOf(supported_backends))
 
        description = v.UnicodeString(strip=True, min=1, not_empty=True)
 
        private = v.StringBoolean(if_missing=False)
 
        copy_permissions = v.StringBoolean(if_missing=False)
 
        update_after_clone = v.StringBoolean(if_missing=False)
 
        fork_parent_id = v.UnicodeString()
 
        chained_validators = [v.ValidForkName(edit, old_data)]
 
        landing_rev = v.OneOf(landing_revs, hideList=True)
 

	
 
    return _RepoForkForm
 

	
 

	
 
def ApplicationSettingsForm():
 
    class _ApplicationSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        title = v.UnicodeString(strip=True, not_empty=False)
 
        realm = v.UnicodeString(strip=True, min=1, not_empty=True)
 
        ga_code = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        captcha_public_key = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        captcha_private_key = v.UnicodeString(strip=True, min=1, not_empty=False)
 

	
 
    return _ApplicationSettingsForm
 

	
 

	
 
def ApplicationVisualisationForm():
 
    class _ApplicationVisualisationForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        show_public_icon = v.StringBoolean(if_missing=False)
 
        show_private_icon = v.StringBoolean(if_missing=False)
 
        stylify_metatags = v.StringBoolean(if_missing=False)
 

	
 
        repository_fields = v.StringBoolean(if_missing=False)
 
        lightweight_journal = v.StringBoolean(if_missing=False)
 
        page_size = v.Int(min=5, not_empty=True)
 
        admin_grid_items = v.Int(min=5, not_empty=True)
 
        show_version = v.StringBoolean(if_missing=False)
 
        use_gravatar = v.StringBoolean(if_missing=False)
 
        gravatar_url = v.UnicodeString(min=3)
 
        clone_uri_tmpl = v.UnicodeString(min=3)
 

	
 
    return _ApplicationVisualisationForm
 

	
 

	
 
def ApplicationUiSettingsForm():
 
    class _ApplicationUiSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        paths_root_path = All(
 
            v.ValidPath(),
 
            v.UnicodeString(strip=True, min=1, not_empty=True)
 
        )
 
        hooks_changegroup_update = v.StringBoolean(if_missing=False)
 
        hooks_changegroup_repo_size = v.StringBoolean(if_missing=False)
 
        hooks_changegroup_push_logger = v.StringBoolean(if_missing=False)
 
        hooks_outgoing_pull_logger = v.StringBoolean(if_missing=False)
 

	
 
        extensions_largefiles = v.StringBoolean(if_missing=False)
 
        extensions_hgsubversion = v.StringBoolean(if_missing=False)
 
        extensions_hggit = v.StringBoolean(if_missing=False)
 

	
 
    return _ApplicationUiSettingsForm
 

	
 

	
 
def DefaultPermissionsForm(repo_perms_choices, group_perms_choices,
 
                           user_group_perms_choices, create_choices,
 
                           create_on_write_choices, repo_group_create_choices,
 
                           user_group_create_choices, fork_choices,
 
                           register_choices, extern_activate_choices):
 
    class _DefaultPermissionsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        overwrite_default_repo = v.StringBoolean(if_missing=False)
 
        overwrite_default_group = v.StringBoolean(if_missing=False)
 
        overwrite_default_user_group = v.StringBoolean(if_missing=False)
 
        anonymous = v.StringBoolean(if_missing=False)
 
        default_repo_perm = v.OneOf(repo_perms_choices)
 
        default_group_perm = v.OneOf(group_perms_choices)
 
        default_user_group_perm = v.OneOf(user_group_perms_choices)
 

	
 
        default_repo_create = v.OneOf(create_choices)
 
        create_on_write = v.OneOf(create_on_write_choices)
 
        default_user_group_create = v.OneOf(user_group_create_choices)
 
        #default_repo_group_create = v.OneOf(repo_group_create_choices) #not impl. yet
 
        default_fork = v.OneOf(fork_choices)
 

	
 
        default_register = v.OneOf(register_choices)
 
        default_extern_activate = v.OneOf(extern_activate_choices)
 
    return _DefaultPermissionsForm
 

	
 

	
 
def CustomDefaultPermissionsForm():
 
    class _CustomDefaultPermissionsForm(formencode.Schema):
 
        filter_extra_fields = True
 
        allow_extra_fields = True
 
        inherit_default_permissions = v.StringBoolean(if_missing=False)
 

	
 
        create_repo_perm = v.StringBoolean(if_missing=False)
 
        create_user_group_perm = v.StringBoolean(if_missing=False)
 
        #create_repo_group_perm Impl. later
 

	
 
        fork_repo_perm = v.StringBoolean(if_missing=False)
 

	
 
    return _CustomDefaultPermissionsForm
 

	
 

	
 
def DefaultsForm(edit=False, old_data=None, supported_backends=BACKENDS.keys()):
 
    class _DefaultsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        default_repo_type = v.OneOf(supported_backends)
 
        default_repo_private = v.StringBoolean(if_missing=False)
 
        default_repo_enable_statistics = v.StringBoolean(if_missing=False)
 
        default_repo_enable_downloads = v.StringBoolean(if_missing=False)
 
        default_repo_enable_locking = v.StringBoolean(if_missing=False)
 

	
 
    return _DefaultsForm
 

	
 

	
 
def AuthSettingsForm(current_active_modules):
 
    class _AuthSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        auth_plugins = All(v.ValidAuthPlugins(),
 
                           v.UniqueListFromString()(not_empty=True))
 

	
 
        def __init__(self, *args, **kwargs):
 
            # The auth plugins tell us what form validators they use
 
            if current_active_modules:
 
                import kallithea.lib.auth_modules
 
                from kallithea.lib.auth_modules import LazyFormencode
 
                for module in current_active_modules:
 
                    plugin = kallithea.lib.auth_modules.loadplugin(module)
 
                    plugin_name = plugin.name
 
                    for sv in plugin.plugin_settings():
 
                        newk = "auth_%s_%s" % (plugin_name, sv["name"])
 
                        # can be a LazyFormencode object from plugin settings
 
                        validator = sv["validator"]
 
                        if isinstance(validator, LazyFormencode):
 
                            validator = validator()
 
                        #init all lazy validators from formencode.All
 
                        if isinstance(validator, All):
 
                            init_validators = []
 
                            for validator in validator.validators:
 
                                if isinstance(validator, LazyFormencode):
 
                                    validator = validator()
 
                                init_validators.append(validator)
 
                            validator.validators = init_validators
 

	
 
                        self.add_field(newk, validator)
 
            formencode.Schema.__init__(self, *args, **kwargs)
 

	
 
    return _AuthSettingsForm
 

	
 

	
 
def LdapSettingsForm(tls_reqcert_choices, search_scope_choices,
 
                     tls_kind_choices):
 
    class _LdapSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        #pre_validators = [LdapLibValidator]
 
        ldap_active = v.StringBoolean(if_missing=False)
 
        ldap_host = v.UnicodeString(strip=True,)
 
        ldap_port = v.Number(strip=True,)
 
        ldap_tls_kind = v.OneOf(tls_kind_choices)
 
        ldap_tls_reqcert = v.OneOf(tls_reqcert_choices)
 
        ldap_dn_user = v.UnicodeString(strip=True,)
 
        ldap_dn_pass = v.UnicodeString(strip=True,)
 
        ldap_base_dn = v.UnicodeString(strip=True,)
 
        ldap_filter = v.UnicodeString(strip=True,)
 
        ldap_search_scope = v.OneOf(search_scope_choices)
 
        ldap_attr_login = v.AttrLoginValidator()(not_empty=True)
 
        ldap_attr_firstname = v.UnicodeString(strip=True,)
 
        ldap_attr_lastname = v.UnicodeString(strip=True,)
 
        ldap_attr_email = v.UnicodeString(strip=True,)
 

	
 
    return _LdapSettingsForm
 

	
 

	
 
def UserExtraEmailForm():
 
    class _UserExtraEmailForm(formencode.Schema):
 
        email = All(v.UniqSystemEmail(), v.Email(not_empty=True))
 
    return _UserExtraEmailForm
 

	
 

	
 
def UserExtraIpForm():
 
    class _UserExtraIpForm(formencode.Schema):
 
        ip = v.ValidIp()(not_empty=True)
 
    return _UserExtraIpForm
 

	
 

	
 
def PullRequestForm(repo_id):
 
    class _PullRequestForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 

	
 
        org_repo = v.UnicodeString(strip=True, required=True)
 
        org_ref = v.UnicodeString(strip=True, required=True)
 
        other_repo = v.UnicodeString(strip=True, required=True)
 
        other_ref = v.UnicodeString(strip=True, required=True)
 

	
 
        pullrequest_title = v.UnicodeString(strip=True, required=True)
 
        pullrequest_desc = v.UnicodeString(strip=True, required=False)
 

	
 
    return _PullRequestForm
 

	
 

	
 
def PullRequestPostForm():
 
    class _PullRequestPostForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 

	
 
        pullrequest_title = v.UnicodeString(strip=True, required=True)
 
        pullrequest_desc = v.UnicodeString(strip=True, required=False)
 
        org_review_members = v.Set()
 
        review_members = v.Set()
 
        updaterev = v.UnicodeString(strip=True, required=False, if_missing=None)
 
        owner = All(v.UnicodeString(strip=True, required=True),
 
                    v.ValidRepoUser())
 

	
 
    return _PullRequestPostForm
 

	
 

	
 
def GistForm(lifetime_options):
 
    class _GistForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 

	
 
        filename = All(v.BasePath()(),
 
                       v.UnicodeString(strip=True, required=False))
 
        description = v.UnicodeString(required=False, if_missing=u'')
 
        lifetime = v.OneOf(lifetime_options)
 
        mimetype = v.UnicodeString(required=False, if_missing=None)
 
        content = v.UnicodeString(required=True, not_empty=True)
 
        public = v.UnicodeString(required=False, if_missing=u'')
 
        private = v.UnicodeString(required=False, if_missing=u'')
 

	
 
    return _GistForm
kallithea/model/repo_group.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.repo_group
 
~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
repo group model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jan 25, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import os
 
import logging
 
import traceback
 
import shutil
 
import datetime
 

	
 
from kallithea.lib.utils2 import LazyProperty
 

	
 
from kallithea.model.base import BaseModel
 
from kallithea.model.db import RepoGroup, Ui, UserRepoGroupToPerm, \
 
    User, Permission, UserGroupRepoGroupToPerm, UserGroup, Repository
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class RepoGroupModel(BaseModel):
 

	
 
    def _get_user_group(self, users_group):
 
        return UserGroup.guess_instance(users_group,
 
                                  callback=UserGroup.get_by_group_name)
 

	
 
    def _get_repo_group(self, repo_group):
 
        return RepoGroup.guess_instance(repo_group,
 
                                  callback=RepoGroup.get_by_group_name)
 

	
 
    @LazyProperty
 
    def repos_path(self):
 
        """
 
        Gets the repositories root path from database
 
        """
 

	
 
        q = Ui.get_by_key('paths', '/')
 
        return q.ui_value
 

	
 
    def _create_default_perms(self, new_group):
 
        # create default permission
 
        default_perm = 'group.read'
 
        def_user = User.get_default_user()
 
        for p in def_user.user_perms:
 
            if p.permission.permission_name.startswith('group.'):
 
                default_perm = p.permission.permission_name
 
                break
 

	
 
        repo_group_to_perm = UserRepoGroupToPerm()
 
        repo_group_to_perm.permission = Permission.get_by_key(default_perm)
 

	
 
        repo_group_to_perm.group = new_group
 
        repo_group_to_perm.user_id = def_user.user_id
 
        return repo_group_to_perm
 

	
 
    def _create_group(self, group_name):
 
        """
 
        makes repository group on filesystem
 

	
 
        :param repo_name:
 
        :param parent_id:
 
        """
 

	
 
        create_path = os.path.join(self.repos_path, group_name)
 
        log.debug('creating new group in %s', create_path)
 

	
 
        if os.path.isdir(create_path):
 
            raise Exception('That directory already exists !')
 

	
 
        os.makedirs(create_path)
 
        log.debug('Created group in %s', create_path)
 

	
 
    def _rename_group(self, old, new):
 
        """
 
        Renames a group on filesystem
 

	
 
        :param group_name:
 
        """
 

	
 
        if old == new:
 
            log.debug('skipping group rename')
 
            return
 

	
 
        log.debug('renaming repository group from %s to %s', old, new)
 

	
 
        old_path = os.path.join(self.repos_path, old)
 
        new_path = os.path.join(self.repos_path, new)
 

	
 
        log.debug('renaming repos paths from %s to %s', old_path, new_path)
 

	
 
        if os.path.isdir(new_path):
 
            raise Exception('Was trying to rename to already '
 
                            'existing dir %s' % new_path)
 
        shutil.move(old_path, new_path)
 

	
 
    def _delete_group(self, group, force_delete=False):
 
        """
 
        Deletes a group from a filesystem
 

	
 
        :param group: instance of group from database
 
        :param force_delete: use shutil rmtree to remove all objects
 
        """
 
        paths = group.full_path.split(RepoGroup.url_sep())
 
        paths = os.sep.join(paths)
 

	
 
        rm_path = os.path.join(self.repos_path, paths)
 
        log.info("Removing group %s", rm_path)
 
        # delete only if that path really exists
 
        if os.path.isdir(rm_path):
 
            if force_delete:
 
                shutil.rmtree(rm_path)
 
            else:
 
                #archive that group`
 
                _now = datetime.datetime.now()
 
                _ms = str(_now.microsecond).rjust(6, '0')
 
                _d = 'rm__%s_GROUP_%s' % (_now.strftime('%Y%m%d_%H%M%S_' + _ms),
 
                                          group.name)
 
                shutil.move(rm_path, os.path.join(self.repos_path, _d))
 

	
 
    def create(self, group_name, group_description, owner, parent=None,
 
               just_db=False, copy_permissions=False):
 
        try:
 
            owner = self._get_user(owner)
 
            parent_group = self._get_repo_group(parent)
 
            new_repo_group = RepoGroup()
 
            new_repo_group.owner = owner
 
            new_repo_group.group_description = group_description or group_name
 
            new_repo_group.parent_group = parent_group
 
            new_repo_group.group_name = new_repo_group.get_new_name(group_name)
 

	
 
            self.sa.add(new_repo_group)
 

	
 
            # create an ADMIN permission for owner except if we're super admin,
 
            # later owner should go into the owner field of groups
 
            if not owner.is_admin:
 
                self.grant_user_permission(repo_group=new_repo_group,
 
                                           user=owner, perm='group.admin')
 

	
 
            if parent_group and copy_permissions:
 
                # copy permissions from parent
 
                user_perms = UserRepoGroupToPerm.query() \
 
                    .filter(UserRepoGroupToPerm.group == parent_group).all()
 

	
 
                group_perms = UserGroupRepoGroupToPerm.query() \
 
                    .filter(UserGroupRepoGroupToPerm.group == parent_group).all()
 

	
 
                for perm in user_perms:
 
                    # don't copy over the permission for user who is creating
 
                    # this group, if he is not super admin he get's admin
 
                    # permission set above
 
                    if perm.user != owner or owner.is_admin:
 
                        UserRepoGroupToPerm.create(perm.user, new_repo_group, perm.permission)
 

	
 
                for perm in group_perms:
 
                    UserGroupRepoGroupToPerm.create(perm.users_group, new_repo_group, perm.permission)
 
            else:
 
                perm_obj = self._create_default_perms(new_repo_group)
 
                self.sa.add(perm_obj)
 

	
 
            if not just_db:
 
                # we need to flush here, in order to check if database won't
 
                # throw any exceptions, create filesystem dirs at the very end
 
                self.sa.flush()
 
                self._create_group(new_repo_group.group_name)
 

	
 
            return new_repo_group
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def _update_permissions(self, repo_group, perms_new=None,
 
                            perms_updates=None, recursive=None,
 
                            check_perms=True):
 
        from kallithea.model.repo import RepoModel
 
        from kallithea.lib.auth import HasUserGroupPermissionAny
 

	
 
        if not perms_new:
 
            perms_new = []
 
        if not perms_updates:
 
            perms_updates = []
 

	
 
        def _set_perm_user(obj, user, perm):
 
            if isinstance(obj, RepoGroup):
 
                self.grant_user_permission(repo_group=obj, user=user, perm=perm)
 
            elif isinstance(obj, Repository):
 
                # private repos will not allow to change the default permissions
 
                # using recursive mode
 
                if obj.private and user == User.DEFAULT_USER:
 
                    return
 

	
 
                # we set group permission but we have to switch to repo
 
                # permission
 
                perm = perm.replace('group.', 'repository.')
 
                RepoModel().grant_user_permission(
 
                    repo=obj, user=user, perm=perm
 
                )
 

	
 
        def _set_perm_group(obj, users_group, perm):
 
            if isinstance(obj, RepoGroup):
 
                self.grant_user_group_permission(repo_group=obj,
 
                                                  group_name=users_group,
 
                                                  perm=perm)
 
            elif isinstance(obj, Repository):
 
                # we set group permission but we have to switch to repo
 
                # permission
 
                perm = perm.replace('group.', 'repository.')
 
                RepoModel().grant_user_group_permission(
 
                    repo=obj, group_name=users_group, perm=perm
 
                )
 

	
 
        # start updates
 
        updates = []
 
        log.debug('Now updating permissions for %s in recursive mode:%s',
 
                  repo_group, recursive)
 

	
 
        for obj in repo_group.recursive_groups_and_repos():
 
            # iterated obj is an instance of a repos group or repository in
 
            # that group, recursive option can be: none, repos, groups, all
 
            if recursive == 'all':
 
                obj = obj
 
            elif recursive == 'repos':
 
                # skip groups, other than this one
 
                if isinstance(obj, RepoGroup) and not obj == repo_group:
 
                    continue
 
            elif recursive == 'groups':
 
                # skip repos
 
                if isinstance(obj, Repository):
 
                    continue
 
            else:  # recursive == 'none': # DEFAULT don't apply to iterated objects
 
                obj = repo_group
 
                # also we do a break at the end of this loop.
 

	
 
            # update permissions
 
            for member, perm, member_type in perms_updates:
 
                ## set for user
 
                if member_type == 'user':
 
                    # this updates also current one if found
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                ## set for user group
 
                else:
 
                    #check if we have permissions to alter this usergroup
 
                    req_perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin')
 
                    if not check_perms or HasUserGroupPermissionAny(*req_perms)(member):
 
                        _set_perm_group(obj, users_group=member, perm=perm)
 
            # set new permissions
 
            for member, perm, member_type in perms_new:
 
                if member_type == 'user':
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                else:
 
                    #check if we have permissions to alter this usergroup
 
                    req_perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin')
 
                    if not check_perms or HasUserGroupPermissionAny(*req_perms)(member):
 
                        _set_perm_group(obj, users_group=member, perm=perm)
 
            updates.append(obj)
 
            # if it's not recursive call for all,repos,groups
 
            # break the loop and don't proceed with other changes
 
            if recursive not in ['all', 'repos', 'groups']:
 
                break
 

	
 
        return updates
 

	
 
    def update(self, repo_group, form_data):
 

	
 
        try:
 
            repo_group = self._get_repo_group(repo_group)
 
            old_path = repo_group.full_path
 

	
 
            # change properties
 
            repo_group.group_description = form_data['group_description']
 
            repo_group.group_parent_id = form_data['group_parent_id']
 
            repo_group.parent_group_id = form_data['parent_group_id']
 
            repo_group.enable_locking = form_data['enable_locking']
 

	
 
            repo_group.parent_group = RepoGroup.get(form_data['group_parent_id'])
 
            repo_group.parent_group = RepoGroup.get(form_data['parent_group_id'])
 
            repo_group.group_name = repo_group.get_new_name(form_data['group_name'])
 
            new_path = repo_group.full_path
 
            self.sa.add(repo_group)
 

	
 
            # iterate over all members of this groups and do fixes
 
            # set locking if given
 
            # if obj is a repoGroup also fix the name of the group according
 
            # to the parent
 
            # if obj is a Repo fix it's name
 
            # this can be potentially heavy operation
 
            for obj in repo_group.recursive_groups_and_repos():
 
                #set the value from it's parent
 
                obj.enable_locking = repo_group.enable_locking
 
                if isinstance(obj, RepoGroup):
 
                    new_name = obj.get_new_name(obj.name)
 
                    log.debug('Fixing group %s to new name %s' \
 
                                % (obj.group_name, new_name))
 
                    obj.group_name = new_name
 
                elif isinstance(obj, Repository):
 
                    # we need to get all repositories from this new group and
 
                    # rename them accordingly to new group path
 
                    new_name = obj.get_new_name(obj.just_name)
 
                    log.debug('Fixing repo %s to new name %s' \
 
                                % (obj.repo_name, new_name))
 
                    obj.repo_name = new_name
 
                self.sa.add(obj)
 

	
 
            self._rename_group(old_path, new_path)
 

	
 
            return repo_group
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def delete(self, repo_group, force_delete=False):
 
        repo_group = self._get_repo_group(repo_group)
 
        try:
 
            self.sa.delete(repo_group)
 
            self._delete_group(repo_group, force_delete)
 
        except Exception:
 
            log.error('Error removing repo_group %s', repo_group)
 
            raise
 

	
 
    def add_permission(self, repo_group, obj, obj_type, perm, recursive):
 
        from kallithea.model.repo import RepoModel
 
        repo_group = self._get_repo_group(repo_group)
 
        perm = self._get_perm(perm)
 

	
 
        for el in repo_group.recursive_groups_and_repos():
 
            # iterated obj is an instance of a repos group or repository in
 
            # that group, recursive option can be: none, repos, groups, all
 
            if recursive == 'all':
 
                el = el
 
            elif recursive == 'repos':
 
                # skip groups, other than this one
 
                if isinstance(el, RepoGroup) and not el == repo_group:
 
                    continue
 
            elif recursive == 'groups':
 
                # skip repos
 
                if isinstance(el, Repository):
 
                    continue
 
            else:  # recursive == 'none': # DEFAULT don't apply to iterated objects
 
                el = repo_group
 
                # also we do a break at the end of this loop.
 

	
 
            if isinstance(el, RepoGroup):
 
                if obj_type == 'user':
 
                    RepoGroupModel().grant_user_permission(el, user=obj, perm=perm)
 
                elif obj_type == 'user_group':
 
                    RepoGroupModel().grant_user_group_permission(el, group_name=obj, perm=perm)
 
                else:
 
                    raise Exception('undefined object type %s' % obj_type)
 
            elif isinstance(el, Repository):
 
                # for repos we need to hotfix the name of permission
 
                _perm = perm.permission_name.replace('group.', 'repository.')
 
                if obj_type == 'user':
 
                    RepoModel().grant_user_permission(el, user=obj, perm=_perm)
 
                elif obj_type == 'user_group':
 
                    RepoModel().grant_user_group_permission(el, group_name=obj, perm=_perm)
 
                else:
 
                    raise Exception('undefined object type %s' % obj_type)
 
            else:
 
                raise Exception('el should be instance of Repository or '
 
                                'RepositoryGroup got %s instead' % type(el))
 

	
 
            # if it's not recursive call for all,repos,groups
 
            # break the loop and don't proceed with other changes
 
            if recursive not in ['all', 'repos', 'groups']:
 
                break
 

	
 
    def delete_permission(self, repo_group, obj, obj_type, recursive):
 
        """
 
        Revokes permission for repo_group for given obj(user or users_group),
 
        obj_type can be user or user group
 

	
 
        :param repo_group:
 
        :param obj: user or user group id
 
        :param obj_type: user or user group type
 
        :param recursive: recurse to all children of group
 
        """
 
        from kallithea.model.repo import RepoModel
 
        repo_group = self._get_repo_group(repo_group)
 

	
 
        for el in repo_group.recursive_groups_and_repos():
 
            # iterated obj is an instance of a repos group or repository in
 
            # that group, recursive option can be: none, repos, groups, all
 
            if recursive == 'all':
 
                el = el
 
            elif recursive == 'repos':
 
                # skip groups, other than this one
 
                if isinstance(el, RepoGroup) and not el == repo_group:
 
                    continue
 
            elif recursive == 'groups':
 
                # skip repos
 
                if isinstance(el, Repository):
 
                    continue
 
            else:  # recursive == 'none': # DEFAULT don't apply to iterated objects
 
                el = repo_group
 
                # also we do a break at the end of this loop.
 

	
 
            if isinstance(el, RepoGroup):
 
                if obj_type == 'user':
 
                    RepoGroupModel().revoke_user_permission(el, user=obj)
 
                elif obj_type == 'user_group':
 
                    RepoGroupModel().revoke_user_group_permission(el, group_name=obj)
 
                else:
 
                    raise Exception('undefined object type %s' % obj_type)
 
            elif isinstance(el, Repository):
 
                if obj_type == 'user':
 
                    RepoModel().revoke_user_permission(el, user=obj)
 
                elif obj_type == 'user_group':
 
                    RepoModel().revoke_user_group_permission(el, group_name=obj)
 
                else:
 
                    raise Exception('undefined object type %s' % obj_type)
 
            else:
 
                raise Exception('el should be instance of Repository or '
 
                                'RepositoryGroup got %s instead' % type(el))
 

	
 
            # if it's not recursive call for all,repos,groups
 
            # break the loop and don't proceed with other changes
 
            if recursive not in ['all', 'repos', 'groups']:
 
                break
 

	
 
    def grant_user_permission(self, repo_group, user, perm):
 
        """
 
        Grant permission for user on given repository group, or update
 
        existing one if found
 

	
 
        :param repo_group: Instance of RepoGroup, repositories_group_id,
 
            or repositories_group name
 
        :param user: Instance of User, user_id or username
 
        :param perm: Instance of Permission, or permission_name
 
        """
 

	
 
        repo_group = self._get_repo_group(repo_group)
 
        user = self._get_user(user)
 
        permission = self._get_perm(perm)
 

	
 
        # check if we have that permission already
 
        obj = self.sa.query(UserRepoGroupToPerm) \
 
            .filter(UserRepoGroupToPerm.user == user) \
 
            .filter(UserRepoGroupToPerm.group == repo_group) \
 
            .scalar()
 
        if obj is None:
 
            # create new !
 
            obj = UserRepoGroupToPerm()
 
        obj.group = repo_group
 
        obj.user = user
 
        obj.permission = permission
 
        self.sa.add(obj)
 
        log.debug('Granted perm %s to %s on %s', perm, user, repo_group)
 
        return obj
 

	
 
    def revoke_user_permission(self, repo_group, user):
 
        """
 
        Revoke permission for user on given repository group
 

	
 
        :param repo_group: Instance of RepoGroup, repositories_group_id,
 
            or repositories_group name
 
        :param user: Instance of User, user_id or username
 
        """
 

	
 
        repo_group = self._get_repo_group(repo_group)
 
        user = self._get_user(user)
 

	
 
        obj = self.sa.query(UserRepoGroupToPerm) \
 
            .filter(UserRepoGroupToPerm.user == user) \
 
            .filter(UserRepoGroupToPerm.group == repo_group) \
 
            .scalar()
 
        if obj is not None:
 
            self.sa.delete(obj)
 
            log.debug('Revoked perm on %s on %s', repo_group, user)
 

	
 
    def grant_user_group_permission(self, repo_group, group_name, perm):
 
        """
 
        Grant permission for user group on given repository group, or update
 
        existing one if found
 

	
 
        :param repo_group: Instance of RepoGroup, repositories_group_id,
 
            or repositories_group name
 
        :param group_name: Instance of UserGroup, users_group_id,
 
            or user group name
 
        :param perm: Instance of Permission, or permission_name
 
        """
 
        repo_group = self._get_repo_group(repo_group)
 
        group_name = self._get_user_group(group_name)
 
        permission = self._get_perm(perm)
 

	
 
        # check if we have that permission already
 
        obj = self.sa.query(UserGroupRepoGroupToPerm) \
 
            .filter(UserGroupRepoGroupToPerm.group == repo_group) \
 
            .filter(UserGroupRepoGroupToPerm.users_group == group_name) \
 
            .scalar()
 

	
 
        if obj is None:
 
            # create new
 
            obj = UserGroupRepoGroupToPerm()
 

	
 
        obj.group = repo_group
 
        obj.users_group = group_name
 
        obj.permission = permission
 
        self.sa.add(obj)
 
        log.debug('Granted perm %s to %s on %s', perm, group_name, repo_group)
 
        return obj
 

	
 
    def revoke_user_group_permission(self, repo_group, group_name):
 
        """
 
        Revoke permission for user group on given repository group
 

	
 
        :param repo_group: Instance of RepoGroup, repositories_group_id,
 
            or repositories_group name
 
        :param group_name: Instance of UserGroup, users_group_id,
 
            or user group name
 
        """
 
        repo_group = self._get_repo_group(repo_group)
 
        group_name = self._get_user_group(group_name)
 

	
 
        obj = self.sa.query(UserGroupRepoGroupToPerm) \
 
            .filter(UserGroupRepoGroupToPerm.group == repo_group) \
 
            .filter(UserGroupRepoGroupToPerm.users_group == group_name) \
 
            .scalar()
 
        if obj is not None:
 
            self.sa.delete(obj)
 
            log.debug('Revoked perm to %s on %s', repo_group, group_name)
kallithea/model/scm.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.scm
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Scm model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 9, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import sys
 
import posixpath
 
import re
 
import time
 
import traceback
 
import logging
 
import cStringIO
 
import pkg_resources
 

	
 
from sqlalchemy import func
 
from pylons.i18n.translation import _
 

	
 
import kallithea
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.exceptions import RepositoryError
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 

	
 
from kallithea import BACKENDS
 
from kallithea.lib import helpers as h
 
from kallithea.lib.utils2 import safe_str, safe_unicode, get_server_url, \
 
    _set_extras
 
from kallithea.lib.auth import HasRepoPermissionAny, HasRepoGroupPermissionAny, \
 
    HasUserGroupPermissionAny, HasPermissionAny, HasPermissionAny
 
from kallithea.lib.utils import get_filesystem_repos, make_ui, \
 
    action_logger
 
from kallithea.model.base import BaseModel
 
from kallithea.model.db import Repository, Ui, CacheInvalidation, \
 
    UserFollowing, UserLog, User, RepoGroup, PullRequest
 
from kallithea.lib.hooks import log_push_action
 
from kallithea.lib.exceptions import NonRelativePathError, IMCCommitError
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UserTemp(object):
 
    def __init__(self, user_id):
 
        self.user_id = user_id
 

	
 
    def __repr__(self):
 
        return "<%s('id:%s')>" % (self.__class__.__name__, self.user_id)
 

	
 

	
 
class RepoTemp(object):
 
    def __init__(self, repo_id):
 
        self.repo_id = repo_id
 

	
 
    def __repr__(self):
 
        return "<%s('id:%s')>" % (self.__class__.__name__, self.repo_id)
 

	
 

	
 
class _PermCheckIterator(object):
 
    def __init__(self, obj_list, obj_attr, perm_set, perm_checker, extra_kwargs=None):
 
        """
 
        Creates iterator from given list of objects, additionally
 
        checking permission for them from perm_set var
 

	
 
        :param obj_list: list of db objects
 
        :param obj_attr: attribute of object to pass into perm_checker
 
        :param perm_set: list of permissions to check
 
        :param perm_checker: callable to check permissions against
 
        """
 
        self.obj_list = obj_list
 
        self.obj_attr = obj_attr
 
        self.perm_set = perm_set
 
        self.perm_checker = perm_checker
 
        self.extra_kwargs = extra_kwargs or {}
 

	
 
    def __len__(self):
 
        return len(self.obj_list)
 

	
 
    def __repr__(self):
 
        return '<%s (%s)>' % (self.__class__.__name__, self.__len__())
 

	
 
    def __iter__(self):
 
        for db_obj in self.obj_list:
 
            # check permission at this level
 
            name = getattr(db_obj, self.obj_attr, None)
 
            if not self.perm_checker(*self.perm_set)(
 
                    name, self.__class__.__name__, **self.extra_kwargs):
 
                continue
 

	
 
            yield db_obj
 

	
 

	
 
class RepoList(_PermCheckIterator):
 

	
 
    def __init__(self, db_repo_list, perm_set=None, extra_kwargs=None):
 
        if not perm_set:
 
            perm_set = ['repository.read', 'repository.write', 'repository.admin']
 

	
 
        super(RepoList, self).__init__(obj_list=db_repo_list,
 
                    obj_attr='repo_name', perm_set=perm_set,
 
                    perm_checker=HasRepoPermissionAny,
 
                    extra_kwargs=extra_kwargs)
 

	
 

	
 
class RepoGroupList(_PermCheckIterator):
 

	
 
    def __init__(self, db_repo_group_list, perm_set=None, extra_kwargs=None):
 
        if not perm_set:
 
            perm_set = ['group.read', 'group.write', 'group.admin']
 

	
 
        super(RepoGroupList, self).__init__(obj_list=db_repo_group_list,
 
                    obj_attr='group_name', perm_set=perm_set,
 
                    perm_checker=HasRepoGroupPermissionAny,
 
                    extra_kwargs=extra_kwargs)
 

	
 

	
 
class UserGroupList(_PermCheckIterator):
 

	
 
    def __init__(self, db_user_group_list, perm_set=None, extra_kwargs=None):
 
        if not perm_set:
 
            perm_set = ['usergroup.read', 'usergroup.write', 'usergroup.admin']
 

	
 
        super(UserGroupList, self).__init__(obj_list=db_user_group_list,
 
                    obj_attr='users_group_name', perm_set=perm_set,
 
                    perm_checker=HasUserGroupPermissionAny,
 
                    extra_kwargs=extra_kwargs)
 

	
 

	
 
class ScmModel(BaseModel):
 
    """
 
    Generic Scm Model
 
    """
 

	
 
    def __get_repo(self, instance):
 
        cls = Repository
 
        if isinstance(instance, cls):
 
            return instance
 
        elif isinstance(instance, int) or safe_str(instance).isdigit():
 
            return cls.get(instance)
 
        elif isinstance(instance, basestring):
 
            return cls.get_by_repo_name(instance)
 
        elif instance is not None:
 
            raise Exception('given object must be int, basestr or Instance'
 
                            ' of %s got %s' % (type(cls), type(instance)))
 

	
 
    @LazyProperty
 
    def repos_path(self):
 
        """
 
        Gets the repositories root path from database
 
        """
 

	
 
        q = self.sa.query(Ui).filter(Ui.ui_key == '/').one()
 

	
 
        return q.ui_value
 

	
 
    def repo_scan(self, repos_path=None):
 
        """
 
        Listing of repositories in given path. This path should not be a
 
        repository itself. Return a dictionary of repository objects
 

	
 
        :param repos_path: path to directory containing repositories
 
        """
 

	
 
        if repos_path is None:
 
            repos_path = self.repos_path
 

	
 
        log.info('scanning for repositories in %s', repos_path)
 

	
 
        baseui = make_ui('db')
 
        repos = {}
 

	
 
        for name, path in get_filesystem_repos(repos_path):
 
            # name need to be decomposed and put back together using the /
 
            # since this is internal storage separator for kallithea
 
            name = Repository.normalize_repo_name(name)
 

	
 
            try:
 
                if name in repos:
 
                    raise RepositoryError('Duplicate repository name %s '
 
                                          'found in %s' % (name, path))
 
                else:
 

	
 
                    klass = get_backend(path[0])
 

	
 
                    if path[0] == 'hg' and path[0] in BACKENDS.keys():
 
                        repos[name] = klass(safe_str(path[1]), baseui=baseui)
 

	
 
                    if path[0] == 'git' and path[0] in BACKENDS.keys():
 
                        repos[name] = klass(path[1])
 
            except OSError:
 
                continue
 
        log.debug('found %s paths with repositories', len(repos))
 
        return repos
 

	
 
    def get_repos(self, repos):
 
        """Return the repos the user has access to"""
 
        return RepoList(repos)
 

	
 
    def get_repo_groups(self, groups=None):
 
        """Return the repo groups the user has access to
 
        If no groups are specified, use top level groups.
 
        """
 
        if groups is None:
 
            groups = RepoGroup.query() \
 
                .filter(RepoGroup.group_parent_id == None).all()
 
                .filter(RepoGroup.parent_group_id == None).all()
 
        return RepoGroupList(groups)
 

	
 
    def mark_for_invalidation(self, repo_name):
 
        """
 
        Mark caches of this repo invalid in the database.
 

	
 
        :param repo_name: the repo for which caches should be marked invalid
 
        """
 
        CacheInvalidation.set_invalidate(repo_name)
 
        repo = Repository.get_by_repo_name(repo_name)
 
        if repo is not None:
 
            repo.update_changeset_cache()
 

	
 
    def toggle_following_repo(self, follow_repo_id, user_id):
 

	
 
        f = self.sa.query(UserFollowing) \
 
            .filter(UserFollowing.follows_repo_id == follow_repo_id) \
 
            .filter(UserFollowing.user_id == user_id).scalar()
 

	
 
        if f is not None:
 
            try:
 
                self.sa.delete(f)
 
                action_logger(UserTemp(user_id),
 
                              'stopped_following_repo',
 
                              RepoTemp(follow_repo_id))
 
                return
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                raise
 

	
 
        try:
 
            f = UserFollowing()
 
            f.user_id = user_id
 
            f.follows_repo_id = follow_repo_id
 
            self.sa.add(f)
 

	
 
            action_logger(UserTemp(user_id),
 
                          'started_following_repo',
 
                          RepoTemp(follow_repo_id))
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def toggle_following_user(self, follow_user_id, user_id):
 
        f = self.sa.query(UserFollowing) \
 
            .filter(UserFollowing.follows_user_id == follow_user_id) \
 
            .filter(UserFollowing.user_id == user_id).scalar()
 

	
 
        if f is not None:
 
            try:
 
                self.sa.delete(f)
 
                return
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                raise
 

	
 
        try:
 
            f = UserFollowing()
 
            f.user_id = user_id
 
            f.follows_user_id = follow_user_id
 
            self.sa.add(f)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def is_following_repo(self, repo_name, user_id, cache=False):
 
        r = self.sa.query(Repository) \
 
            .filter(Repository.repo_name == repo_name).scalar()
 

	
 
        f = self.sa.query(UserFollowing) \
 
            .filter(UserFollowing.follows_repository == r) \
 
            .filter(UserFollowing.user_id == user_id).scalar()
 

	
 
        return f is not None
 

	
 
    def is_following_user(self, username, user_id, cache=False):
 
        u = User.get_by_username(username)
 

	
 
        f = self.sa.query(UserFollowing) \
 
            .filter(UserFollowing.follows_user == u) \
 
            .filter(UserFollowing.user_id == user_id).scalar()
 

	
 
        return f is not None
 

	
 
    def get_followers(self, repo):
 
        repo = self._get_repo(repo)
 

	
 
        return self.sa.query(UserFollowing) \
 
                .filter(UserFollowing.follows_repository == repo).count()
 

	
 
    def get_forks(self, repo):
 
        repo = self._get_repo(repo)
 
        return self.sa.query(Repository) \
 
                .filter(Repository.fork == repo).count()
 

	
 
    def get_pull_requests(self, repo):
 
        repo = self._get_repo(repo)
 
        return self.sa.query(PullRequest) \
 
                .filter(PullRequest.other_repo == repo) \
 
                .filter(PullRequest.status != PullRequest.STATUS_CLOSED).count()
 

	
 
    def mark_as_fork(self, repo, fork, user):
 
        repo = self.__get_repo(repo)
 
        fork = self.__get_repo(fork)
 
        if fork and repo.repo_id == fork.repo_id:
 
            raise Exception("Cannot set repository as fork of itself")
 

	
 
        if fork and repo.repo_type != fork.repo_type:
 
            raise RepositoryError("Cannot set repository as fork of repository with other type")
 

	
 
        repo.fork = fork
 
        self.sa.add(repo)
 
        return repo
 

	
 
    def _handle_rc_scm_extras(self, username, repo_name, repo_alias,
 
                              action=None):
 
        from kallithea import CONFIG
 
        from kallithea.lib.base import _get_ip_addr
 
        try:
 
            from pylons import request
 
            environ = request.environ
 
        except TypeError:
 
            # we might use this outside of request context, let's fake the
 
            # environ data
 
            from webob import Request
 
            environ = Request.blank('').environ
 
        extras = {
 
            'ip': _get_ip_addr(environ),
 
            'username': username,
 
            'action': action or 'push_local',
 
            'repository': repo_name,
 
            'scm': repo_alias,
 
            'config': CONFIG['__file__'],
 
            'server_url': get_server_url(environ),
 
            'make_lock': None,
 
            'locked_by': [None, None]
 
        }
 
        _set_extras(extras)
 

	
 
    def _handle_push(self, repo, username, action, repo_name, revisions):
 
        """
 
        Triggers push action hooks
 

	
 
        :param repo: SCM repo
 
        :param username: username who pushes
 
        :param action: push/push_local/push_remote
 
        :param repo_name: name of repo
 
        :param revisions: list of revisions that we pushed
 
        """
 
        self._handle_rc_scm_extras(username, repo_name, repo_alias=repo.alias)
 
        _scm_repo = repo._repo
 
        # trigger push hook
 
        if repo.alias == 'hg':
 
            log_push_action(_scm_repo.ui, _scm_repo, node=revisions[0])
 
        elif repo.alias == 'git':
 
            log_push_action(None, _scm_repo, _git_revs=revisions)
 

	
 
    def _get_IMC_module(self, scm_type):
 
        """
 
        Returns InMemoryCommit class based on scm_type
 

	
 
        :param scm_type:
 
        """
 
        if scm_type == 'hg':
 
            from kallithea.lib.vcs.backends.hg import MercurialInMemoryChangeset
 
            return MercurialInMemoryChangeset
 

	
 
        if scm_type == 'git':
 
            from kallithea.lib.vcs.backends.git import GitInMemoryChangeset
 
            return GitInMemoryChangeset
 

	
 
        raise Exception('Invalid scm_type, must be one of hg,git got %s'
 
                        % (scm_type,))
 

	
 
    def pull_changes(self, repo, username):
 
        """
 
        Pull from "clone URL".
 
        """
 
        dbrepo = self.__get_repo(repo)
 
        clone_uri = dbrepo.clone_uri
 
        if not clone_uri:
 
            raise Exception("This repository doesn't have a clone uri")
 

	
 
        repo = dbrepo.scm_instance
 
        repo_name = dbrepo.repo_name
 
        try:
 
            if repo.alias == 'git':
 
                repo.fetch(clone_uri)
 
                # git doesn't really have something like post-fetch action
 
                # we fake that now. #TODO: extract fetched revisions somehow
 
                # here
 
                self._handle_push(repo,
 
                                  username=username,
 
                                  action='push_remote',
 
                                  repo_name=repo_name,
 
                                  revisions=[])
 
            else:
 
                self._handle_rc_scm_extras(username, dbrepo.repo_name,
 
                                           repo.alias, action='push_remote')
 
                repo.pull(clone_uri)
 

	
 
            self.mark_for_invalidation(repo_name)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def commit_change(self, repo, repo_name, cs, user, author, message,
 
                      content, f_path):
 
        """
 
        Commit a change to a single file
 

	
 
        :param repo: a db_repo.scm_instance
 
        """
 
        user = self._get_user(user)
 
        IMC = self._get_IMC_module(repo.alias)
 

	
 
        # decoding here will force that we have proper encoded values
 
        # in any other case this will throw exceptions and deny commit
 
        content = safe_str(content)
 
        path = safe_str(f_path)
 
        # message and author needs to be unicode
 
        # proper backend should then translate that into required type
 
        message = safe_unicode(message)
 
        author = safe_unicode(author)
 
        imc = IMC(repo)
 
        imc.change(FileNode(path, content, mode=cs.get_file_mode(f_path)))
 
        try:
 
            tip = imc.commit(message=message, author=author,
 
                             parents=[cs], branch=cs.branch)
 
        except Exception as e:
 
            log.error(traceback.format_exc())
 
            raise IMCCommitError(str(e))
 
        finally:
 
            # always clear caches, if commit fails we want fresh object also
 
            self.mark_for_invalidation(repo_name)
 
        self._handle_push(repo,
 
                          username=user.username,
 
                          action='push_local',
 
                          repo_name=repo_name,
 
                          revisions=[tip.raw_id])
 
        return tip
 

	
 
    def _sanitize_path(self, f_path):
 
        if f_path.startswith('/') or f_path.startswith('.') or '../' in f_path:
 
            raise NonRelativePathError('%s is not an relative path' % f_path)
 
        if f_path:
 
            f_path = posixpath.normpath(f_path)
 
        return f_path
 

	
 
    def get_nodes(self, repo_name, revision, root_path='/', flat=True):
 
        """
 
        Recursively walk root dir and return a set of all paths found.
 

	
 
        :param repo_name: name of repository
 
        :param revision: revision for which to list nodes
 
        :param root_path: root path to list
 
        :param flat: return as a list, if False returns a dict with description
 

	
 
        """
 
        _files = list()
 
        _dirs = list()
 
        try:
 
            _repo = self.__get_repo(repo_name)
 
            changeset = _repo.scm_instance.get_changeset(revision)
 
            root_path = root_path.lstrip('/')
 
            for topnode, dirs, files in changeset.walk(root_path):
 
                for f in files:
 
                    _files.append(f.path if flat else {"name": f.path,
 
                                                       "type": "file"})
 
                for d in dirs:
 
                    _dirs.append(d.path if flat else {"name": d.path,
 
                                                      "type": "dir"})
 
        except RepositoryError:
 
            log.debug(traceback.format_exc())
 
            raise
 

	
 
        return _dirs, _files
 

	
 
    def create_nodes(self, user, repo, message, nodes, parent_cs=None,
 
                     author=None, trigger_push_hook=True):
 
        """
 
        Commits specified nodes to repo.
 

	
 
        :param user: Kallithea User object or user_id, the committer
 
        :param repo: Kallithea Repository object
 
        :param message: commit message
 
        :param nodes: mapping {filename:{'content':content},...}
 
        :param parent_cs: parent changeset, can be empty than it's initial commit
 
        :param author: author of commit, cna be different that committer only for git
 
        :param trigger_push_hook: trigger push hooks
 

	
 
        :returns: new committed changeset
 
        """
 

	
 
        user = self._get_user(user)
 
        scm_instance = repo.scm_instance_no_cache()
 

	
 
        processed_nodes = []
 
        for f_path in nodes:
 
            content = nodes[f_path]['content']
 
            f_path = self._sanitize_path(f_path)
 
            f_path = safe_str(f_path)
 
            # decoding here will force that we have proper encoded values
 
            # in any other case this will throw exceptions and deny commit
 
            if isinstance(content, (basestring,)):
 
                content = safe_str(content)
 
            elif isinstance(content, (file, cStringIO.OutputType,)):
 
                content = content.read()
 
            else:
 
                raise Exception('Content is of unrecognized type %s' % (
 
                    type(content)
 
                ))
 
            processed_nodes.append((f_path, content))
 

	
 
        message = safe_unicode(message)
 
        committer = user.full_contact
 
        author = safe_unicode(author) if author else committer
 

	
 
        IMC = self._get_IMC_module(scm_instance.alias)
 
        imc = IMC(scm_instance)
 

	
 
        if not parent_cs:
 
            parent_cs = EmptyChangeset(alias=scm_instance.alias)
 

	
 
        if isinstance(parent_cs, EmptyChangeset):
 
            # EmptyChangeset means we we're editing empty repository
 
            parents = None
 
        else:
 
            parents = [parent_cs]
 
        # add multiple nodes
 
        for path, content in processed_nodes:
 
            imc.add(FileNode(path, content=content))
 

	
 
        tip = imc.commit(message=message,
 
                         author=author,
 
                         parents=parents,
 
                         branch=parent_cs.branch)
 

	
 
        self.mark_for_invalidation(repo.repo_name)
 
        if trigger_push_hook:
 
            self._handle_push(scm_instance,
 
                              username=user.username,
 
                              action='push_local',
 
                              repo_name=repo.repo_name,
 
                              revisions=[tip.raw_id])
 
        return tip
 

	
 
    def update_nodes(self, user, repo, message, nodes, parent_cs=None,
 
                     author=None, trigger_push_hook=True):
 
        """
 
        Commits specified nodes to repo. Again.
 
        """
 
        user = self._get_user(user)
 
        scm_instance = repo.scm_instance_no_cache()
 

	
 
        message = safe_unicode(message)
 
        committer = user.full_contact
 
        author = safe_unicode(author) if author else committer
 

	
 
        imc_class = self._get_IMC_module(scm_instance.alias)
 
        imc = imc_class(scm_instance)
 

	
 
        if not parent_cs:
 
            parent_cs = EmptyChangeset(alias=scm_instance.alias)
 

	
 
        if isinstance(parent_cs, EmptyChangeset):
 
            # EmptyChangeset means we we're editing empty repository
 
            parents = None
 
        else:
 
            parents = [parent_cs]
 

	
 
        # add multiple nodes
 
        for _filename, data in nodes.items():
 
            # new filename, can be renamed from the old one
 
            filename = self._sanitize_path(data['filename'])
 
            old_filename = self._sanitize_path(_filename)
 
            content = data['content']
 

	
 
            filenode = FileNode(old_filename, content=content)
 
            op = data['op']
 
            if op == 'add':
 
                imc.add(filenode)
 
            elif op == 'del':
 
                imc.remove(filenode)
 
            elif op == 'mod':
 
                if filename != old_filename:
 
                    #TODO: handle renames, needs vcs lib changes
 
                    imc.remove(filenode)
 
                    imc.add(FileNode(filename, content=content))
 
                else:
 
                    imc.change(filenode)
 

	
 
        # commit changes
 
        tip = imc.commit(message=message,
 
                         author=author,
 
                         parents=parents,
 
                         branch=parent_cs.branch)
 

	
 
        self.mark_for_invalidation(repo.repo_name)
 
        if trigger_push_hook:
 
            self._handle_push(scm_instance,
 
                              username=user.username,
 
                              action='push_local',
 
                              repo_name=repo.repo_name,
 
                              revisions=[tip.raw_id])
 

	
 
    def delete_nodes(self, user, repo, message, nodes, parent_cs=None,
 
                     author=None, trigger_push_hook=True):
 
        """
 
        Deletes specified nodes from repo.
 

	
 
        :param user: Kallithea User object or user_id, the committer
 
        :param repo: Kallithea Repository object
 
        :param message: commit message
 
        :param nodes: mapping {filename:{'content':content},...}
 
        :param parent_cs: parent changeset, can be empty than it's initial commit
 
        :param author: author of commit, cna be different that committer only for git
 
        :param trigger_push_hook: trigger push hooks
 

	
 
        :returns: new committed changeset after deletion
 
        """
 

	
 
        user = self._get_user(user)
 
        scm_instance = repo.scm_instance_no_cache()
 

	
 
        processed_nodes = []
 
        for f_path in nodes:
 
            f_path = self._sanitize_path(f_path)
 
            # content can be empty but for compatibility it allows same dicts
 
            # structure as add_nodes
 
            content = nodes[f_path].get('content')
 
            processed_nodes.append((f_path, content))
 

	
 
        message = safe_unicode(message)
 
        committer = user.full_contact
 
        author = safe_unicode(author) if author else committer
 

	
 
        IMC = self._get_IMC_module(scm_instance.alias)
 
        imc = IMC(scm_instance)
 

	
 
        if not parent_cs:
 
            parent_cs = EmptyChangeset(alias=scm_instance.alias)
 

	
 
        if isinstance(parent_cs, EmptyChangeset):
 
            # EmptyChangeset means we we're editing empty repository
 
            parents = None
 
        else:
 
            parents = [parent_cs]
 
        # add multiple nodes
 
        for path, content in processed_nodes:
 
            imc.remove(FileNode(path, content=content))
 

	
 
        tip = imc.commit(message=message,
 
                         author=author,
 
                         parents=parents,
 
                         branch=parent_cs.branch)
 

	
 
        self.mark_for_invalidation(repo.repo_name)
 
        if trigger_push_hook:
 
            self._handle_push(scm_instance,
 
                              username=user.username,
 
                              action='push_local',
 
                              repo_name=repo.repo_name,
 
                              revisions=[tip.raw_id])
 
        return tip
 

	
 
    def get_unread_journal(self):
 
        return self.sa.query(UserLog).count()
 

	
 
    def get_repo_landing_revs(self, repo=None):
 
        """
 
        Generates select option with tags branches and bookmarks (for hg only)
 
        grouped by type
 

	
 
        :param repo:
 
        """
 

	
 
        hist_l = []
 
        choices = []
 
        repo = self.__get_repo(repo)
 
        hist_l.append(['rev:tip', _('latest tip')])
 
        choices.append('rev:tip')
 
        if repo is None:
 
            return choices, hist_l
 

	
 
        repo = repo.scm_instance
 

	
 
        branches_group = ([(u'branch:%s' % k, k) for k, v in
 
                           repo.branches.iteritems()], _("Branches"))
 
        hist_l.append(branches_group)
 
        choices.extend([x[0] for x in branches_group[0]])
 

	
 
        if repo.alias == 'hg':
 
            bookmarks_group = ([(u'book:%s' % k, k) for k, v in
 
                                repo.bookmarks.iteritems()], _("Bookmarks"))
 
            hist_l.append(bookmarks_group)
 
            choices.extend([x[0] for x in bookmarks_group[0]])
 

	
 
        tags_group = ([(u'tag:%s' % k, k) for k, v in
 
                       repo.tags.iteritems()], _("Tags"))
 
        hist_l.append(tags_group)
 
        choices.extend([x[0] for x in tags_group[0]])
 

	
 
        return choices, hist_l
 

	
 
    def install_git_hooks(self, repo, force_create=False):
 
        """
 
        Creates a kallithea hook inside a git repository
 

	
 
        :param repo: Instance of VCS repo
 
        :param force_create: Create even if same name hook exists
 
        """
 

	
 
        loc = os.path.join(repo.path, 'hooks')
 
        if not repo.bare:
 
            loc = os.path.join(repo.path, '.git', 'hooks')
 
        if not os.path.isdir(loc):
 
            os.makedirs(loc)
 

	
 
        tmpl_post = "#!/usr/bin/env %s\n" % sys.executable or 'python2'
 
        tmpl_post += pkg_resources.resource_string(
 
            'kallithea', os.path.join('config', 'post_receive_tmpl.py')
 
        )
 
        tmpl_pre = "#!/usr/bin/env %s\n" % sys.executable or 'python2'
 
        tmpl_pre += pkg_resources.resource_string(
 
            'kallithea', os.path.join('config', 'pre_receive_tmpl.py')
 
        )
 

	
 
        for h_type, tmpl in [('pre', tmpl_pre), ('post', tmpl_post)]:
 
            _hook_file = os.path.join(loc, '%s-receive' % h_type)
 
            has_hook = False
 
            log.debug('Installing git hook in repo %s', repo)
 
            if os.path.exists(_hook_file):
 
                # let's take a look at this hook, maybe it's kallithea ?
 
                log.debug('hook exists, checking if it is from kallithea')
 
                with open(_hook_file, 'rb') as f:
 
                    data = f.read()
 
                    matches = re.compile(r'(?:%s)\s*=\s*(.*)'
 
                                         % 'KALLITHEA_HOOK_VER').search(data)
 
                    if matches:
 
                        try:
 
                            ver = matches.groups()[0]
 
                            log.debug('got %s it is kallithea', ver)
 
                            has_hook = True
 
                        except Exception:
 
                            log.error(traceback.format_exc())
 
            else:
 
                # there is no hook in this dir, so we want to create one
 
                has_hook = True
 

	
 
            if has_hook or force_create:
 
                log.debug('writing %s hook file !', h_type)
 
                try:
 
                    with open(_hook_file, 'wb') as f:
 
                        tmpl = tmpl.replace('_TMPL_', kallithea.__version__)
 
                        f.write(tmpl)
 
                    os.chmod(_hook_file, 0755)
 
                except IOError as e:
 
                    log.error('error writing %s: %s', _hook_file, e)
 
            else:
 
                log.debug('skipping writing hook file')
 

	
 
def AvailableRepoGroupChoices(top_perms, repo_group_perms, extras=()):
 
    """Return group_id,string tuples with choices for all the repo groups where
 
    the user has the necessary permissions.
 

	
 
    Top level is -1.
 
    """
 
    groups = RepoGroup.query().all()
 
    if HasPermissionAny('hg.admin')('available repo groups'):
 
        groups.append(None)
 
    else:
 
        groups = list(RepoGroupList(groups, perm_set=repo_group_perms))
 
        if top_perms and HasPermissionAny(*top_perms)('available repo groups'):
 
            groups.append(None)
 
        for extra in extras:
 
            if not any(rg == extra for rg in groups):
 
                groups.append(extra)
 
    return RepoGroup.groups_choices(groups=groups)
kallithea/model/validators.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Set of generic validators
 
"""
 

	
 
import os
 
import re
 
import formencode
 
import logging
 
from collections import defaultdict
 
from pylons.i18n.translation import _
 
from webhelpers.pylonslib.secure_form import authentication_token
 
import sqlalchemy
 

	
 
from formencode.validators import (
 
    UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set,
 
    NotEmpty, IPAddress, CIDR, String, FancyValidator
 
)
 
from kallithea.lib.compat import OrderedSet
 
from kallithea.lib import ipaddr
 
from kallithea.lib.utils import repo_name_slug
 
from kallithea.lib.utils2 import str2bool, aslist
 
from kallithea.model.db import RepoGroup, Repository, UserGroup, User
 
from kallithea.lib.exceptions import LdapImportError
 
from kallithea.config.routing import ADMIN_PREFIX
 
from kallithea.lib.auth import HasRepoGroupPermissionAny, HasPermissionAny
 

	
 
# silence warnings and pylint
 
UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \
 
    NotEmpty, IPAddress, CIDR, String, FancyValidator
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class StateObj(object):
 
    """
 
    this is needed to translate the messages using _() in validators
 
    """
 
    _ = staticmethod(_)
 

	
 

	
 
def M(self, key, state=None, **kwargs):
 
    """
 
    returns string from self.message based on given key,
 
    passed kw params are used to substitute %(named)s params inside
 
    translated strings
 

	
 
    :param msg:
 
    :param state:
 
    """
 
    if state is None:
 
        state = StateObj()
 
    else:
 
        state._ = staticmethod(_)
 
    #inject validator into state object
 
    return self.message(key, state, **kwargs)
 

	
 

	
 
def UniqueListFromString():
 
    class _UniqueListFromString(formencode.FancyValidator):
 
        """
 
        Split value on ',' and make unique while preserving order
 
        """
 
        messages = dict(
 
            empty=_('Value cannot be an empty list'),
 
            missing_value=_('Value cannot be an empty list'),
 
        )
 

	
 
        def _to_python(self, value, state):
 
            value = aslist(value, ',')
 
            seen = set()
 
            return [c for c in value if not (c in seen or seen.add(c))]
 

	
 
        def empty_value(self, value):
 
            return []
 

	
 
    return _UniqueListFromString
 

	
 

	
 
def ValidUsername(edit=False, old_data=None):
 
    old_data = old_data or {}
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'username_exists': _('Username "%(username)s" already exists'),
 
            'system_invalid_username':
 
                _('Username "%(username)s" cannot be used'),
 
            'invalid_username':
 
                _('Username may only contain alphanumeric characters '
 
                  'underscores, periods or dashes and must begin with an '
 
                  'alphanumeric character or underscore')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            if value in ['default', 'new_user']:
 
                msg = M(self, 'system_invalid_username', state, username=value)
 
                raise formencode.Invalid(msg, value, state)
 
            #check if user is unique
 
            old_un = None
 
            if edit:
 
                old_un = User.get(old_data.get('user_id')).username
 

	
 
            if old_un != value or not edit:
 
                if User.get_by_username(value, case_insensitive=True):
 
                    msg = M(self, 'username_exists', state, username=value)
 
                    raise formencode.Invalid(msg, value, state)
 

	
 
            if re.match(r'^[a-zA-Z0-9\_]{1}[a-zA-Z0-9\-\_\.]*$', value) is None:
 
                msg = M(self, 'invalid_username', state)
 
                raise formencode.Invalid(msg, value, state)
 
    return _validator
 

	
 

	
 
def ValidRegex(msg=None):
 
    class _validator(formencode.validators.Regex):
 
        messages = dict(invalid=msg or _('The input is not valid'))
 
    return _validator
 

	
 

	
 
def ValidRepoUser():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_username': _('Username %(username)s is not valid')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            try:
 
                User.query().filter(User.active == True) \
 
                    .filter(User.username == value).one()
 
            except sqlalchemy.exc.InvalidRequestError: # NoResultFound/MultipleResultsFound
 
                msg = M(self, 'invalid_username', state, username=value)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(username=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidUserGroup(edit=False, old_data=None):
 
    old_data = old_data or {}
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_group': _('Invalid user group name'),
 
            'group_exist': _('User group "%(usergroup)s" already exists'),
 
            'invalid_usergroup_name':
 
                _('user group name may only contain alphanumeric '
 
                  'characters underscores, periods or dashes and must begin '
 
                  'with alphanumeric character')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            if value in ['default']:
 
                msg = M(self, 'invalid_group', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(users_group_name=msg)
 
                )
 
            #check if group is unique
 
            old_ugname = None
 
            if edit:
 
                old_id = old_data.get('users_group_id')
 
                old_ugname = UserGroup.get(old_id).users_group_name
 

	
 
            if old_ugname != value or not edit:
 
                is_existing_group = UserGroup.get_by_group_name(value,
 
                                                        case_insensitive=True)
 
                if is_existing_group:
 
                    msg = M(self, 'group_exist', state, usergroup=value)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(users_group_name=msg)
 
                    )
 

	
 
            if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
 
                msg = M(self, 'invalid_usergroup_name', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(users_group_name=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidRepoGroup(edit=False, old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'group_parent_id': _('Cannot assign this group as parent'),
 
            'parent_group_id': _('Cannot assign this group as parent'),
 
            'group_exists': _('Group "%(group_name)s" already exists'),
 
            'repo_exists':
 
                _('Repository with name "%(group_name)s" already exists')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            # TODO WRITE VALIDATIONS
 
            group_name = value.get('group_name')
 
            group_parent_id = value.get('group_parent_id')
 
            parent_group_id = value.get('parent_group_id')
 

	
 
            # slugify repo group just in case :)
 
            slug = repo_name_slug(group_name)
 

	
 
            # check for parent of self
 
            parent_of_self = lambda: (
 
                old_data['group_id'] == group_parent_id
 
                if group_parent_id else False
 
                old_data['group_id'] == parent_group_id
 
                if parent_group_id else False
 
            )
 
            if edit and parent_of_self():
 
                msg = M(self, 'group_parent_id', state)
 
                msg = M(self, 'parent_group_id', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(group_parent_id=msg)
 
                    error_dict=dict(parent_group_id=msg)
 
                )
 

	
 
            old_gname = None
 
            if edit:
 
                old_gname = RepoGroup.get(old_data.get('group_id')).group_name
 

	
 
            if old_gname != group_name or not edit:
 

	
 
                # check group
 
                gr = RepoGroup.query() \
 
                      .filter(RepoGroup.group_name == slug) \
 
                      .filter(RepoGroup.group_parent_id == group_parent_id) \
 
                      .filter(RepoGroup.parent_group_id == parent_group_id) \
 
                      .scalar()
 

	
 
                if gr is not None:
 
                    msg = M(self, 'group_exists', state, group_name=slug)
 
                    raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(group_name=msg)
 
                    )
 

	
 
                # check for same repo
 
                repo = Repository.query() \
 
                      .filter(Repository.repo_name == slug) \
 
                      .scalar()
 

	
 
                if repo is not None:
 
                    msg = M(self, 'repo_exists', state, group_name=slug)
 
                    raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(group_name=msg)
 
                    )
 

	
 
    return _validator
 

	
 

	
 
def ValidPassword():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_password':
 
                _('Invalid characters (non-ascii) in password')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            try:
 
                (value or '').decode('ascii')
 
            except UnicodeError:
 
                msg = M(self, 'invalid_password', state)
 
                raise formencode.Invalid(msg, value, state,)
 
    return _validator
 

	
 

	
 
def ValidOldPassword(username):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_password': _('Invalid old password')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            from kallithea.lib import auth_modules
 
            if auth_modules.authenticate(username, value, '') is None:
 
                msg = M(self, 'invalid_password', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(current_password=msg)
 
                )
 
    return _validator
 

	
 

	
 
def ValidPasswordsMatch(password_field, password_confirmation_field):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'password_mismatch': _('Passwords do not match'),
 
        }
 

	
 
        def validate_python(self, value, state):
 
            if value.get(password_field) != value[password_confirmation_field]:
 
                msg = M(self, 'password_mismatch', state)
 
                raise formencode.Invalid(msg, value, state,
 
                     error_dict={password_field:msg, password_confirmation_field: msg}
 
                )
 
    return _validator
 

	
 

	
 
def ValidAuth():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_auth': _(u'Invalid username or password'),
 
        }
 

	
 
        def validate_python(self, value, state):
 
            from kallithea.lib import auth_modules
 

	
 
            password = value['password']
 
            username = value['username']
 

	
 
            # authenticate returns unused dict but has called
 
            # plugin._authenticate which has create_or_update'ed the username user in db
 
            if auth_modules.authenticate(username, password) is None:
 
                user = User.get_by_username_or_email(username)
 
                if user and not user.active:
 
                    log.warning('user %s is disabled', username)
 
                    msg = M(self, 'invalid_auth', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(username=' ', password=msg)
 
                    )
 
                else:
 
                    log.warning('user %s failed to authenticate', username)
 
                    msg = M(self, 'invalid_auth', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(username=' ', password=msg)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidAuthToken():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_token': _('Token mismatch')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            if value != authentication_token():
 
                msg = M(self, 'invalid_token', state)
 
                raise formencode.Invalid(msg, value, state)
 
    return _validator
 

	
 

	
 
def ValidRepoName(edit=False, old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_repo_name':
 
                _('Repository name %(repo)s is not allowed'),
 
            'repository_exists':
 
                _('Repository named %(repo)s already exists'),
 
            'repository_in_group_exists': _('Repository "%(repo)s" already '
 
                                            'exists in group "%(group)s"'),
 
            'same_group_exists': _('Repository group with name "%(repo)s" '
 
                                   'already exists')
 
        }
 

	
 
        def _to_python(self, value, state):
 
            repo_name = repo_name_slug(value.get('repo_name', ''))
 
            repo_group = value.get('repo_group')
 
            if repo_group:
 
                gr = RepoGroup.get(repo_group)
 
                group_path = gr.full_path
 
                group_name = gr.group_name
 
                # value needs to be aware of group name in order to check
 
                # db key This is an actual just the name to store in the
 
                # database
 
                repo_name_full = group_path + RepoGroup.url_sep() + repo_name
 
            else:
 
                group_name = group_path = ''
 
                repo_name_full = repo_name
 

	
 
            value['repo_name'] = repo_name
 
            value['repo_name_full'] = repo_name_full
 
            value['group_path'] = group_path
 
            value['group_name'] = group_name
 
            return value
 

	
 
        def validate_python(self, value, state):
 
            repo_name = value.get('repo_name')
 
            repo_name_full = value.get('repo_name_full')
 
            group_path = value.get('group_path')
 
            group_name = value.get('group_name')
 

	
 
            if repo_name in [ADMIN_PREFIX, '']:
 
                msg = M(self, 'invalid_repo_name', state, repo=repo_name)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(repo_name=msg)
 
                )
 

	
 
            rename = old_data.get('repo_name') != repo_name_full
 
            create = not edit
 
            if rename or create:
 

	
 
                if group_path != '':
 
                    if Repository.get_by_repo_name(repo_name_full):
 
                        msg = M(self, 'repository_in_group_exists', state,
 
                                repo=repo_name, group=group_name)
 
                        raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(repo_name=msg)
 
                        )
 
                elif RepoGroup.get_by_group_name(repo_name_full):
 
                        msg = M(self, 'same_group_exists', state,
 
                                repo=repo_name)
 
                        raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(repo_name=msg)
 
                        )
 

	
 
                elif Repository.get_by_repo_name(repo_name_full):
 
                        msg = M(self, 'repository_exists', state,
 
                                repo=repo_name)
 
                        raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(repo_name=msg)
 
                        )
 
            return value
 
    return _validator
 

	
 

	
 
def ValidForkName(*args, **kwargs):
 
    return ValidRepoName(*args, **kwargs)
 

	
 

	
 
def SlugifyName():
 
    class _validator(formencode.validators.FancyValidator):
 

	
 
        def _to_python(self, value, state):
 
            return repo_name_slug(value)
 

	
 
        def validate_python(self, value, state):
 
            pass
 

	
 
    return _validator
 

	
 

	
 
def ValidCloneUri():
 
    from kallithea.lib.utils import make_ui
 

	
 
    def url_handler(repo_type, url, ui):
 
        if repo_type == 'hg':
 
            from kallithea.lib.vcs.backends.hg.repository import MercurialRepository
 
            if url.startswith('http') or url.startswith('ssh'):
 
                # initially check if it's at least the proper URL
 
                # or does it pass basic auth
 
                MercurialRepository._check_url(url, ui)
 
            elif url.startswith('svn+http'):
 
                from hgsubversion.svnrepo import svnremoterepo
 
                svnremoterepo(ui, url).svn.uuid
 
            elif url.startswith('git+http'):
 
                raise NotImplementedError()
 
            else:
 
                raise Exception('clone from URI %s not allowed' % (url,))
 

	
 
        elif repo_type == 'git':
 
            from kallithea.lib.vcs.backends.git.repository import GitRepository
 
            if url.startswith('http') or url.startswith('git'):
 
                # initially check if it's at least the proper URL
 
                # or does it pass basic auth
 
                GitRepository._check_url(url)
 
            elif url.startswith('svn+http'):
 
                raise NotImplementedError()
 
            elif url.startswith('hg+http'):
 
                raise NotImplementedError()
 
            else:
 
                raise Exception('clone from URI %s not allowed' % (url))
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'clone_uri': _('Invalid repository URL'),
 
            'invalid_clone_uri': _('Invalid repository URL. It must be a '
 
                                   'valid http, https, ssh, svn+http or svn+https URL'),
 
        }
 

	
 
        def validate_python(self, value, state):
 
            repo_type = value.get('repo_type')
 
            url = value.get('clone_uri')
 

	
 
            if url and url != value.get('clone_uri_hidden'):
 
                try:
 
                    url_handler(repo_type, url, make_ui('db', clear_session=False))
 
                except Exception:
 
                    log.exception('URL validation failed')
 
                    msg = M(self, 'clone_uri')
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(clone_uri=msg)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidForkType(old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_fork_type': _('Fork has to be the same type as parent')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            if old_data['repo_type'] != value:
 
                msg = M(self, 'invalid_fork_type', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(repo_type=msg)
 
                )
 
    return _validator
 

	
 

	
 
def CanWriteGroup(old_data=None):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'permission_denied': _("You don't have permissions "
 
                                   "to create repository in this group"),
 
            'permission_denied_root': _("no permission to create repository "
 
                                        "in root location")
 
        }
 

	
 
        def _to_python(self, value, state):
 
            #root location
 
            if value == -1:
 
                return None
 
            return value
 

	
 
        def validate_python(self, value, state):
 
            gr = RepoGroup.get(value)
 
            gr_name = gr.group_name if gr is not None else None # None means ROOT location
 

	
 
            # create repositories with write permission on group is set to true
 
            create_on_write = HasPermissionAny('hg.create.write_on_repogroup.true')()
 
            group_admin = HasRepoGroupPermissionAny('group.admin')(gr_name,
 
                                            'can write into group validator')
 
            group_write = HasRepoGroupPermissionAny('group.write')(gr_name,
 
                                            'can write into group validator')
 
            forbidden = not (group_admin or (group_write and create_on_write))
 
            can_create_repos = HasPermissionAny('hg.admin', 'hg.create.repository')
 
            gid = (old_data['repo_group'].get('group_id')
 
                   if (old_data and 'repo_group' in old_data) else None)
 
            value_changed = gid != value
 
            new = not old_data
 
            # do check if we changed the value, there's a case that someone got
 
            # revoked write permissions to a repository, he still created, we
 
            # don't need to check permission if he didn't change the value of
 
            # groups in form box
 
            if value_changed or new:
 
                #parent group need to be existing
 
                if gr and forbidden:
 
                    msg = M(self, 'permission_denied', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(repo_type=msg)
 
                    )
 
                ## check if we can write to root location !
 
                elif gr is None and not can_create_repos():
 
                    msg = M(self, 'permission_denied_root', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(repo_type=msg)
 
                    )
 

	
 
    return _validator
 

	
 

	
 
def CanCreateGroup(can_create_in_root=False):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'permission_denied': _("You don't have permissions "
 
                                   "to create a group in this location")
 
        }
 

	
 
        def to_python(self, value, state):
 
            #root location
 
            if value == -1:
 
                return None
 
            return value
 

	
 
        def validate_python(self, value, state):
 
            gr = RepoGroup.get(value)
 
            gr_name = gr.group_name if gr is not None else None # None means ROOT location
 

	
 
            if can_create_in_root and gr is None:
 
                #we can create in root, we're fine no validations required
 
                return
 

	
 
            forbidden_in_root = gr is None and not can_create_in_root
 
            val = HasRepoGroupPermissionAny('group.admin')
 
            forbidden = not val(gr_name, 'can create group validator')
 
            if forbidden_in_root or forbidden:
 
                msg = M(self, 'permission_denied', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(group_parent_id=msg)
 
                    error_dict=dict(parent_group_id=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidPerms(type_='repo'):
 
    if type_ == 'repo_group':
 
        EMPTY_PERM = 'group.none'
 
    elif type_ == 'repo':
 
        EMPTY_PERM = 'repository.none'
 
    elif type_ == 'user_group':
 
        EMPTY_PERM = 'usergroup.none'
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'perm_new_member_name':
 
                _('This username or user group name is not valid')
 
        }
 

	
 
        def to_python(self, value, state):
 
            perms_update = OrderedSet()
 
            perms_new = OrderedSet()
 
            # build a list of permission to update and new permission to create
 

	
 
            #CLEAN OUT ORG VALUE FROM NEW MEMBERS, and group them using
 
            new_perms_group = defaultdict(dict)
 
            for k, v in value.copy().iteritems():
 
                if k.startswith('perm_new_member'):
 
                    del value[k]
 
                    _type, part = k.split('perm_new_member_')
 
                    args = part.split('_')
 
                    if len(args) == 1:
 
                        new_perms_group[args[0]]['perm'] = v
 
                    elif len(args) == 2:
 
                        _key, pos = args
 
                        new_perms_group[pos][_key] = v
 

	
 
            # fill new permissions in order of how they were added
 
            for k in sorted(map(int, new_perms_group.keys())):
 
                perm_dict = new_perms_group[str(k)]
 
                new_member = perm_dict.get('name')
 
                new_perm = perm_dict.get('perm')
 
                new_type = perm_dict.get('type')
 
                if new_member and new_perm and new_type:
 
                    perms_new.add((new_member, new_perm, new_type))
 

	
 
            for k, v in value.iteritems():
 
                if k.startswith('u_perm_') or k.startswith('g_perm_'):
 
                    member = k[7:]
 
                    t = {'u': 'user',
 
                         'g': 'users_group'
 
                    }[k[0]]
 
                    if member == User.DEFAULT_USER:
 
                        if str2bool(value.get('repo_private')):
 
                            # set none for default when updating to
 
                            # private repo protects against form manipulation
 
                            v = EMPTY_PERM
 
                    perms_update.add((member, v, t))
 

	
 
            value['perms_updates'] = list(perms_update)
 
            value['perms_new'] = list(perms_new)
 

	
 
            # update permissions
 
            for k, v, t in perms_new:
 
                try:
 
                    if t is 'user':
 
                        self.user_db = User.query() \
 
                            .filter(User.active == True) \
 
                            .filter(User.username == k).one()
 
                    if t is 'users_group':
 
                        self.user_db = UserGroup.query() \
 
                            .filter(UserGroup.users_group_active == True) \
 
                            .filter(UserGroup.users_group_name == k).one()
 

	
 
                except Exception:
 
                    log.exception('Updated permission failed')
 
                    msg = M(self, 'perm_new_member_type', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(perm_new_member_name=msg)
 
                    )
 
            return value
 
    return _validator
 

	
 

	
 
def ValidSettings():
 
    class _validator(formencode.validators.FancyValidator):
 
        def _to_python(self, value, state):
 
            # settings  form for users that are not admin
 
            # can't edit certain parameters, it's extra backup if they mangle
 
            # with forms
 

	
 
            forbidden_params = [
 
                'user', 'repo_type', 'repo_enable_locking',
 
                'repo_enable_downloads', 'repo_enable_statistics'
 
            ]
 

	
 
            for param in forbidden_params:
 
                if param in value:
 
                    del value[param]
 
            return value
 

	
 
        def validate_python(self, value, state):
 
            pass
 
    return _validator
 

	
 

	
 
def ValidPath():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_path': _('This is not a valid path')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            if not os.path.isdir(value):
 
                msg = M(self, 'invalid_path', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(paths_root_path=msg)
 
                )
 
    return _validator
 

	
 

	
 
def UniqSystemEmail(old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'email_taken': _('This email address is already in use')
 
        }
 

	
 
        def _to_python(self, value, state):
 
            return value.lower()
 

	
 
        def validate_python(self, value, state):
 
            if (old_data.get('email') or '').lower() != value:
 
                user = User.get_by_email(value)
 
                if user is not None:
 
                    msg = M(self, 'email_taken', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(email=msg)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidSystemEmail():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'non_existing_email': _('Email address "%(email)s" not found')
 
        }
 

	
 
        def _to_python(self, value, state):
 
            return value.lower()
 

	
 
        def validate_python(self, value, state):
 
            user = User.get_by_email(value)
 
            if user is None:
 
                msg = M(self, 'non_existing_email', state, email=value)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(email=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def LdapLibValidator():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 

	
 
        }
 

	
 
        def validate_python(self, value, state):
 
            try:
 
                import ldap
 
                ldap  # pyflakes silence !
 
            except ImportError:
 
                raise LdapImportError()
 

	
 
    return _validator
 

	
 

	
 
def AttrLoginValidator():
 
    class _validator(formencode.validators.UnicodeString):
 
        messages = {
 
            'invalid_cn':
 
                  _('The LDAP Login attribute of the CN must be specified - '
 
                    'this is the name of the attribute that is equivalent '
 
                    'to "username"')
 
        }
 
        messages['empty'] = messages['invalid_cn']
 

	
 
    return _validator
 

	
 

	
 
def ValidIp():
 
    class _validator(CIDR):
 
        messages = dict(
 
            badFormat=_('Please enter a valid IPv4 or IPv6 address'),
 
            illegalBits=_('The network size (bits) must be within the range'
 
                ' of 0-32 (not %(bits)r)')
 
        )
 

	
 
        def to_python(self, value, state):
 
            v = super(_validator, self).to_python(value, state)
 
            v = v.strip()
 
            net = ipaddr.IPNetwork(address=v)
 
            if isinstance(net, ipaddr.IPv4Network):
 
                #if IPv4 doesn't end with a mask, add /32
 
                if '/' not in value:
 
                    v += '/32'
 
            if isinstance(net, ipaddr.IPv6Network):
 
                #if IPv6 doesn't end with a mask, add /128
 
                if '/' not in value:
 
                    v += '/128'
 
            return v
 

	
 
        def validate_python(self, value, state):
 
            try:
 
                addr = value.strip()
 
                #this raises an ValueError if address is not IPv4 or IPv6
 
                ipaddr.IPNetwork(address=addr)
 
            except ValueError:
 
                raise formencode.Invalid(self.message('badFormat', state),
 
                                         value, state)
 

	
 
    return _validator
 

	
 

	
 
def FieldKey():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = dict(
 
            badFormat=_('Key name can only consist of letters, '
 
                        'underscore, dash or numbers')
 
        )
 

	
 
        def validate_python(self, value, state):
 
            if not re.match('[a-zA-Z0-9_-]+$', value):
 
                raise formencode.Invalid(self.message('badFormat', state),
 
                                         value, state)
 
    return _validator
 

	
 

	
 
def BasePath():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = dict(
 
            badPath=_('Filename cannot be inside a directory')
 
        )
 

	
 
        def _to_python(self, value, state):
 
            return value
 

	
 
        def validate_python(self, value, state):
 
            if value != os.path.basename(value):
 
                raise formencode.Invalid(self.message('badPath', state),
 
                                         value, state)
 
    return _validator
 

	
 

	
 
def ValidAuthPlugins():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = dict(
 
            import_duplicate=_('Plugins %(loaded)s and %(next_to_load)s both export the same name')
 
        )
 

	
 
        def _to_python(self, value, state):
 
            # filter empty values
 
            return filter(lambda s: s not in [None, ''], value)
 

	
 
        def validate_python(self, value, state):
 
            from kallithea.lib import auth_modules
 
            module_list = value
 
            unique_names = {}
 
            try:
 
                for module in module_list:
 
                    plugin = auth_modules.loadplugin(module)
 
                    plugin_name = plugin.name
 
                    if plugin_name in unique_names:
 
                        msg = M(self, 'import_duplicate', state,
 
                                loaded=unique_names[plugin_name],
 
                                next_to_load=plugin_name)
 
                        raise formencode.Invalid(msg, value, state)
 
                    unique_names[plugin_name] = plugin
 
            except (ImportError, AttributeError, TypeError) as e:
 
                raise formencode.Invalid(str(e), value, state)
 

	
 
    return _validator
kallithea/templates/admin/repo_groups/repo_group_add.html
Show inline comments
 
## -*- coding: utf-8 -*-
 
<%inherit file="/base/base.html"/>
 

	
 
<%block name="title">
 
    ${_('Add Repository Group')}
 
</%block>
 

	
 
<%def name="breadcrumbs_links()">
 
    ${h.link_to(_('Admin'),h.url('admin_home'))}
 
    &raquo;
 
    ${h.link_to(_('Repository Groups'),h.url('repos_groups'))}
 
    &raquo;
 
    ${_('Add Repository Group')}
 
</%def>
 

	
 
<%block name="header_menu">
 
    ${self.menu('admin')}
 
</%block>
 

	
 
<%def name="main()">
 
<div class="panel panel-primary">
 
    <!-- box / title -->
 
    <div class="panel-heading">
 
        ${self.breadcrumbs()}
 
    </div>
 
    <!-- end box / title -->
 
    ${h.form(url('repos_groups'))}
 
    <div class="form">
 
        <!-- fields -->
 
        <div class="form-horizontal">
 
            <div class="form-group">
 
                <label class="control-label" for="group_name">${_('Group name')}:</label>
 
                <div class="input">
 
                    ${h.text('group_name',class_='small')}
 
                </div>
 
            </div>
 

	
 
            <div class="form-group">
 
                <label class="control-label" for="group_description">${_('Description')}:</label>
 
                <div class="textarea-repo editor">
 
                    ${h.textarea('group_description',cols=23,rows=5,class_="medium")}
 
                </div>
 
            </div>
 

	
 
            <div class="form-group">
 
                <label class="control-label" for="group_parent_id">${_('Group parent')}:</label>
 
                <label class="control-label" for="parent_group_id">${_('Group parent')}:</label>
 
                <div class="input">
 
                    ${h.select('group_parent_id',request.GET.get('parent_group'),c.repo_groups,class_="medium")}
 
                    ${h.select('parent_group_id',request.GET.get('parent_group'),c.repo_groups,class_="medium")}
 
                </div>
 
            </div>
 

	
 
            <div id="copy_perms" class="field">
 
                <label class="control-label" for="group_copy_permissions">${_('Copy parent group permissions')}:</label>
 
                <div class="checkboxes">
 
                    ${h.checkbox('group_copy_permissions',value="True")}
 
                    <span class="help-block">${_('Copy permission set from parent repository group.')}</span>
 
                </div>
 
            </div>
 

	
 
            <div class="form-group">
 
                <div class="buttons">
 
                    ${h.submit('save',_('Save'),class_="btn btn-default")}
 
                </div>
 
            </div>
 
        </div>
 
    </div>
 
    ${h.end_form()}
 
</div>
 
<script>
 
    $(document).ready(function(){
 
        var setCopyPermsOption = function(group_val){
 
            if(group_val != "-1"){
 
                $('#copy_perms').show();
 
            }
 
            else{
 
                $('#copy_perms').hide();
 
            }
 
        }
 
        $("#group_parent_id").select2({
 
        $("#parent_group_id").select2({
 
            'dropdownAutoWidth': true
 
        });
 
        setCopyPermsOption($('#group_parent_id').val());
 
        $("#group_parent_id").on("change", function(e) {
 
        setCopyPermsOption($('#parent_group_id').val());
 
        $("#parent_group_id").on("change", function(e) {
 
            setCopyPermsOption(e.val);
 
        });
 
        $('#group_name').focus();
 
    });
 
</script>
 
</%def>
kallithea/templates/admin/repo_groups/repo_group_edit_settings.html
Show inline comments
 
## -*- coding: utf-8 -*-
 
${h.form(url('update_repos_group',group_name=c.repo_group.group_name))}
 
<div class="form">
 
    <!-- fields -->
 
    <div class="form-horizontal">
 
        <div class="form-group">
 
            <label class="control-label" for="group_name">${_('Group name')}:</label>
 
            <div class="input">
 
                ${h.text('group_name',class_='medium')}
 
            </div>
 
        </div>
 

	
 
        <div class="form-group">
 
            <label class="control-label" for="group_description">${_('Description')}:</label>
 
            <div class="textarea text-area editor">
 
                ${h.textarea('group_description',cols=23,rows=5,class_="medium")}
 
            </div>
 
        </div>
 

	
 
        <div class="form-group">
 
            <label class="control-label" for="group_parent_id">${_('Group parent')}:</label>
 
            <label class="control-label" for="parent_group_id">${_('Group parent')}:</label>
 
            <div class="input">
 
                ${h.select('group_parent_id','',c.repo_groups,class_="medium")}
 
                ${h.select('parent_group_id','',c.repo_groups,class_="medium")}
 
            </div>
 
        </div>
 

	
 
        <div class="form-group">
 
            <label class="control-label" for="enable_locking">${_('Enable locking')}:</label>
 
            <div class="checkboxes">
 
                ${h.checkbox('enable_locking',value="True")}
 
                <span class="help-block">${_('Enable lock-by-pulling on group. This option will be applied to all other groups and repositories inside')}</span>
 
            </div>
 
        </div>
 

	
 
        <div class="form-group">
 
            <div class="buttons">
 
                ${h.submit('save',_('Save'),class_="btn btn-default")}
 
                ${h.reset('reset',_('Reset'),class_="btn btn-default")}
 
            </div>
 
        </div>
 
    </div>
 
</div>
 
${h.end_form()}
 

	
 
${h.form(url('delete_repo_group', group_name=c.repo_group.group_name))}
 
<div class="form">
 
    <div class="form-horizontal">
 
        <div class="form-group">
 
            <div class="buttons">
 
                ${h.submit('remove_%s' % c.repo_group.group_name,_('Remove this group'),class_="btn btn-danger",onclick="return confirm('"+_('Confirm to delete this group')+"');")}
 
            </div>
 
        </div>
 
    </div>
 
</div>
 
${h.end_form()}
 

	
 
<script>
 
    $(document).ready(function(){
 
        $("#group_parent_id").select2({
 
        $("#parent_group_id").select2({
 
            'dropdownAutoWidth': true
 
        });
 
    });
 
</script>
kallithea/tests/fixture.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
Helpers for fixture generation
 
"""
 
import os
 
import time
 
from kallithea.tests.base import *
 
from kallithea.model.db import Repository, User, RepoGroup, UserGroup, Gist
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.user import UserModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.model.gist import GistModel
 
from kallithea.model.scm import ScmModel
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from os.path import dirname
 

	
 
FIXTURES = os.path.join(dirname(dirname(os.path.abspath(__file__))), 'tests', 'fixtures')
 

	
 

	
 
def error_function(*args, **kwargs):
 
    raise Exception('Total Crash !')
 

	
 

	
 
class Fixture(object):
 

	
 
    def __init__(self):
 
        pass
 

	
 
    def anon_access(self, status):
 
        """
 
        Context manager for controlling anonymous access.
 
        Anon access will be set and committed, but restored again when exiting the block.
 

	
 
        Usage:
 

	
 
        fixture = Fixture()
 
        with fixture.anon_access(False):
 
            stuff
 
        """
 

	
 
        class context(object):
 
            def __enter__(self):
 
                anon = User.get_default_user()
 
                self._before = anon.active
 
                anon.active = status
 
                Session().commit()
 
                invalidate_all_caches()
 

	
 
            def __exit__(self, exc_type, exc_val, exc_tb):
 
                anon = User.get_default_user()
 
                anon.active = self._before
 
                Session().commit()
 

	
 
        return context()
 

	
 
    def _get_repo_create_params(self, **custom):
 
        defs = dict(
 
            repo_name=None,
 
            repo_type='hg',
 
            clone_uri='',
 
            repo_group=u'-1',
 
            repo_description=u'DESC',
 
            repo_private=False,
 
            repo_landing_rev='rev:tip',
 
            repo_copy_permissions=False,
 
            repo_state=Repository.STATE_CREATED,
 
        )
 
        defs.update(custom)
 
        if 'repo_name_full' not in custom:
 
            defs.update({'repo_name_full': defs['repo_name']})
 

	
 
        # fix the repo name if passed as repo_name_full
 
        if defs['repo_name']:
 
            defs['repo_name'] = defs['repo_name'].split('/')[-1]
 

	
 
        return defs
 

	
 
    def _get_group_create_params(self, **custom):
 
        defs = dict(
 
            group_name=None,
 
            group_description=u'DESC',
 
            group_parent_id=None,
 
            parent_group_id=None,
 
            perms_updates=[],
 
            perms_new=[],
 
            enable_locking=False,
 
            recursive=False
 
        )
 
        defs.update(custom)
 

	
 
        return defs
 

	
 
    def _get_user_create_params(self, name, **custom):
 
        defs = dict(
 
            username=name,
 
            password='qweqwe',
 
            email='%s+test@example.com' % name,
 
            firstname=u'TestUser',
 
            lastname=u'Test',
 
            active=True,
 
            admin=False,
 
            extern_type='internal',
 
            extern_name=None
 
        )
 
        defs.update(custom)
 

	
 
        return defs
 

	
 
    def _get_user_group_create_params(self, name, **custom):
 
        defs = dict(
 
            users_group_name=name,
 
            user_group_description=u'DESC',
 
            users_group_active=True,
 
            user_group_data={},
 
        )
 
        defs.update(custom)
 

	
 
        return defs
 

	
 
    def create_repo(self, name, **kwargs):
 
        if 'skip_if_exists' in kwargs:
 
            del kwargs['skip_if_exists']
 
            r = Repository.get_by_repo_name(name)
 
            if r:
 
                return r
 

	
 
        if isinstance(kwargs.get('repo_group'), RepoGroup):
 
            kwargs['repo_group'] = kwargs['repo_group'].group_id
 

	
 
        form_data = self._get_repo_create_params(repo_name=name, **kwargs)
 
        cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
 
        RepoModel().create(form_data, cur_user)
 
        Session().commit()
 
        ScmModel().mark_for_invalidation(name)
 
        return Repository.get_by_repo_name(name)
 

	
 
    def create_fork(self, repo_to_fork, fork_name, **kwargs):
 
        repo_to_fork = Repository.get_by_repo_name(repo_to_fork)
 

	
 
        form_data = self._get_repo_create_params(repo_name=fork_name,
 
                                            fork_parent_id=repo_to_fork,
 
                                            repo_type=repo_to_fork.repo_type,
 
                                            **kwargs)
 
        form_data['update_after_clone'] = False
 

	
 
        #TODO: fix it !!
 
        form_data['description'] = form_data['repo_description']
 
        form_data['private'] = form_data['repo_private']
 
        form_data['landing_rev'] = form_data['repo_landing_rev']
 

	
 
        owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
 
        RepoModel().create_fork(form_data, cur_user=owner)
 
        Session().commit()
 
        ScmModel().mark_for_invalidation(fork_name)
 
        r = Repository.get_by_repo_name(fork_name)
 
        assert r
 
        return r
 

	
 
    def destroy_repo(self, repo_name, **kwargs):
 
        RepoModel().delete(repo_name, **kwargs)
 
        Session().commit()
 

	
 
    def create_repo_group(self, name, **kwargs):
 
        if 'skip_if_exists' in kwargs:
 
            del kwargs['skip_if_exists']
 
            gr = RepoGroup.get_by_group_name(group_name=name)
 
            if gr:
 
                return gr
 
        form_data = self._get_group_create_params(group_name=name, **kwargs)
 
        owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
 
        gr = RepoGroupModel().create(
 
            group_name=form_data['group_name'],
 
            group_description=form_data['group_name'],
 
            owner=owner, parent=form_data['group_parent_id'])
 
            owner=owner, parent=form_data['parent_group_id'])
 
        Session().commit()
 
        gr = RepoGroup.get_by_group_name(gr.group_name)
 
        return gr
 

	
 
    def destroy_repo_group(self, repogroupid):
 
        RepoGroupModel().delete(repogroupid)
 
        Session().commit()
 

	
 
    def create_user(self, name, **kwargs):
 
        if 'skip_if_exists' in kwargs:
 
            del kwargs['skip_if_exists']
 
            user = User.get_by_username(name)
 
            if user:
 
                return user
 
        form_data = self._get_user_create_params(name, **kwargs)
 
        user = UserModel().create(form_data)
 
        Session().commit()
 
        user = User.get_by_username(user.username)
 
        return user
 

	
 
    def destroy_user(self, userid):
 
        UserModel().delete(userid)
 
        Session().commit()
 

	
 
    def create_user_group(self, name, **kwargs):
 
        if 'skip_if_exists' in kwargs:
 
            del kwargs['skip_if_exists']
 
            gr = UserGroup.get_by_group_name(group_name=name)
 
            if gr:
 
                return gr
 
        form_data = self._get_user_group_create_params(name, **kwargs)
 
        owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
 
        user_group = UserGroupModel().create(
 
            name=form_data['users_group_name'],
 
            description=form_data['user_group_description'],
 
            owner=owner, active=form_data['users_group_active'],
 
            group_data=form_data['user_group_data'])
 
        Session().commit()
 
        user_group = UserGroup.get_by_group_name(user_group.users_group_name)
 
        return user_group
 

	
 
    def destroy_user_group(self, usergroupid):
 
        UserGroupModel().delete(user_group=usergroupid, force=True)
 
        Session().commit()
 

	
 
    def create_gist(self, **kwargs):
 
        form_data = {
 
            'description': u'new-gist',
 
            'owner': TEST_USER_ADMIN_LOGIN,
 
            'gist_type': Gist.GIST_PUBLIC,
 
            'lifetime': -1,
 
            'gist_mapping': {'filename1.txt':{'content':'hello world'},}
 
        }
 
        form_data.update(kwargs)
 
        gist = GistModel().create(
 
            description=form_data['description'],owner=form_data['owner'],
 
            gist_mapping=form_data['gist_mapping'], gist_type=form_data['gist_type'],
 
            lifetime=form_data['lifetime']
 
        )
 
        Session().commit()
 

	
 
        return gist
 

	
 
    def destroy_gists(self, gistid=None):
 
        for g in Gist.query():
 
            if gistid:
 
                if gistid == g.gist_access_id:
 
                    GistModel().delete(g)
 
            else:
 
                GistModel().delete(g)
 
        Session().commit()
 

	
 
    def load_resource(self, resource_name, strip=True):
 
        with open(os.path.join(FIXTURES, resource_name), 'rb') as f:
 
            source = f.read()
 
            if strip:
 
                source = source.strip()
 

	
 
        return source
 

	
 
    def commit_change(self, repo, filename, content, message, vcs_type, parent=None, newfile=False):
 
        repo = Repository.get_by_repo_name(repo)
 
        _cs = parent
 
        if not parent:
 
            _cs = EmptyChangeset(alias=vcs_type)
 

	
 
        if newfile:
 
            nodes = {
 
                filename: {
 
                    'content': content
 
                }
 
            }
 
            cs = ScmModel().create_nodes(
 
                user=TEST_USER_ADMIN_LOGIN, repo=repo,
 
                message=message,
 
                nodes=nodes,
 
                parent_cs=_cs,
 
                author=TEST_USER_ADMIN_LOGIN,
 
            )
 
        else:
 
            cs = ScmModel().commit_change(
 
                repo=repo.scm_instance, repo_name=repo.repo_name,
 
                cs=parent, user=TEST_USER_ADMIN_LOGIN,
 
                author=TEST_USER_ADMIN_LOGIN,
 
                message=message,
 
                content=content,
 
                f_path=filename
 
            )
 
        return cs
kallithea/tests/models/common.py
Show inline comments
 
from kallithea.tests.base import *
 
from kallithea.tests.fixture import Fixture
 

	
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.db import RepoGroup, Repository, User
 
from kallithea.model.user import UserModel
 

	
 
from kallithea.lib.auth import AuthUser
 
from kallithea.model.meta import Session
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
def _destroy_project_tree(test_u1_id):
 
    Session.remove()
 
    repo_group = RepoGroup.get_by_group_name(group_name=u'g0')
 
    for el in reversed(repo_group.recursive_groups_and_repos()):
 
        if isinstance(el, Repository):
 
            RepoModel().delete(el)
 
        elif isinstance(el, RepoGroup):
 
            RepoGroupModel().delete(el, force_delete=True)
 

	
 
    u = User.get(test_u1_id)
 
    Session().delete(u)
 
    Session().commit()
 

	
 

	
 
def _create_project_tree():
 
    """
 
    Creates a tree of groups and repositories to test permissions
 

	
 
    structure
 
     [g0] - group `g0` with 3 subgroups
 
     |
 
     |__[g0_1] group g0_1 with 2 groups 0 repos
 
     |  |
 
     |  |__[g0_1_1] group g0_1_1 with 1 group 2 repos
 
     |  |   |__<g0/g0_1/g0_1_1/g0_1_1_r1>
 
     |  |   |__<g0/g0_1/g0_1_1/g0_1_1_r2>
 
     |  |__<g0/g0_1/g0_1_r1>
 
     |
 
     |__[g0_2] 2 repos
 
     |  |
 
     |  |__<g0/g0_2/g0_2_r1>
 
     |  |__<g0/g0_2/g0_2_r2>
 
     |
 
     |__[g0_3] 1 repo
 
        |
 
        |_<g0/g0_3/g0_3_r1>
 
        |_<g0/g0_3/g0_3_r2_private>
 

	
 
    """
 
    test_u1 = UserModel().create_or_update(
 
        username=u'test_u1', password=u'qweqwe',
 
        email=u'test_u1@example.com', firstname=u'test_u1', lastname=u'test_u1'
 
    )
 
    g0 = fixture.create_repo_group(u'g0')
 
    g0_1 = fixture.create_repo_group(u'g0_1', group_parent_id=g0)
 
    g0_1_1 = fixture.create_repo_group(u'g0_1_1', group_parent_id=g0_1)
 
    g0_1 = fixture.create_repo_group(u'g0_1', parent_group_id=g0)
 
    g0_1_1 = fixture.create_repo_group(u'g0_1_1', parent_group_id=g0_1)
 
    g0_1_1_r1 = fixture.create_repo(u'g0/g0_1/g0_1_1/g0_1_1_r1', repo_group=g0_1_1)
 
    g0_1_1_r2 = fixture.create_repo(u'g0/g0_1/g0_1_1/g0_1_1_r2', repo_group=g0_1_1)
 
    g0_1_r1 = fixture.create_repo(u'g0/g0_1/g0_1_r1', repo_group=g0_1)
 
    g0_2 = fixture.create_repo_group(u'g0_2', group_parent_id=g0)
 
    g0_2 = fixture.create_repo_group(u'g0_2', parent_group_id=g0)
 
    g0_2_r1 = fixture.create_repo(u'g0/g0_2/g0_2_r1', repo_group=g0_2)
 
    g0_2_r2 = fixture.create_repo(u'g0/g0_2/g0_2_r2', repo_group=g0_2)
 
    g0_3 = fixture.create_repo_group(u'g0_3', group_parent_id=g0)
 
    g0_3 = fixture.create_repo_group(u'g0_3', parent_group_id=g0)
 
    g0_3_r1 = fixture.create_repo(u'g0/g0_3/g0_3_r1', repo_group=g0_3)
 
    g0_3_r2_private = fixture.create_repo(u'g0/g0_3/g0_3_r1_private',
 
                                          repo_group=g0_3, repo_private=True)
 
    return test_u1
 

	
 

	
 
def expected_count(group_name, objects=False):
 
    repo_group = RepoGroup.get_by_group_name(group_name=group_name)
 
    objs = repo_group.recursive_groups_and_repos()
 
    if objects:
 
        return objs
 
    return len(objs)
 

	
 

	
 
def _check_expected_count(items, repo_items, expected):
 
    should_be = len(items + repo_items)
 
    there_are = len(expected)
 
    assert should_be == there_are, ('%s != %s' % ((items + repo_items), expected))
 

	
 

	
 
def check_tree_perms(obj_name, repo_perm, prefix, expected_perm):
 
    assert repo_perm == expected_perm, ('obj:`%s` got perm:`%s` should:`%s`'
 
                                    % (obj_name, repo_perm, expected_perm))
 

	
 

	
 
def _get_perms(filter_='', recursive=None, key=None, test_u1_id=None):
 
    test_u1 = AuthUser(user_id=test_u1_id)
 
    for k, v in test_u1.permissions[key].items():
 
        if recursive in ['all', 'repos', 'groups'] and k.startswith(filter_):
 
            yield k, v
 
        elif recursive in ['none']:
 
            if k == filter_:
 
                yield k, v

Changeset was too big and was cut off... Show full diff anyway

0 comments (0 inline, 0 general)