Changeset - b4d1e85265c1
[Not reviewed]
default
0 9 0
Søren Løvborg - 9 years ago 2017-02-20 19:31:48
sorenl@unity3d.com
auth: simplify repository group permission checks

In practice, Kallithea has the 'group.admin' permission imply the
'group.write' permission, which again implies 'group.read'.

This codifies this practice by replacing HasRepoGroupPermissionAny
"perm function" with the new HasRepoGroupLevel function, reducing the
risk of errors and saving quite a lot of typing.
9 files changed with 59 insertions and 66 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/admin/repo_groups.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.admin.repo_groups
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Repository groups controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Mar 23, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 
import formencode
 
import itertools
 

	
 
from formencode import htmlfill
 

	
 
from pylons import request, tmpl_context as c
 
from pylons.i18n.translation import _, ungettext
 
from webob.exc import HTTPFound, HTTPForbidden, HTTPNotFound, HTTPInternalServerError
 

	
 
import kallithea
 
from kallithea.config.routing import url
 
from kallithea.lib import helpers as h
 
from kallithea.lib.compat import json
 
from kallithea.lib.auth import LoginRequired, \
 
    HasRepoGroupPermissionAnyDecorator, HasRepoGroupPermissionAny, \
 
    HasRepoGroupPermissionLevelDecorator, HasRepoGroupPermissionLevel, \
 
    HasPermissionAny
 
from kallithea.lib.base import BaseController, render
 
from kallithea.model.db import RepoGroup, Repository
 
from kallithea.model.scm import RepoGroupList, AvailableRepoGroupChoices
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.forms import RepoGroupForm, RepoGroupPermsForm
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.lib.utils2 import safe_int
 
from sqlalchemy.sql.expression import func
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class RepoGroupsController(BaseController):
 

	
 
    @LoginRequired()
 
    def __before__(self):
 
        super(RepoGroupsController, self).__before__()
 

	
 
    def __load_defaults(self, extras=(), exclude=()):
 
        """extras is used for keeping current parent ignoring permissions
 
        exclude is used for not moving group to itself TODO: also exclude descendants
 
        Note: only admin can create top level groups
 
        """
 
        repo_groups = AvailableRepoGroupChoices([], ['group.admin'], extras)
 
        repo_groups = AvailableRepoGroupChoices([], 'admin', extras)
 
        exclude_group_ids = set(rg.group_id for rg in exclude)
 
        c.repo_groups = [rg for rg in repo_groups
 
                         if rg[0] not in exclude_group_ids]
 

	
 
        repo_model = RepoModel()
 
        c.users_array = repo_model.get_users_js()
 
        c.user_groups_array = repo_model.get_user_groups_js()
 

	
 
    def __load_data(self, group_id):
 
        """
 
        Load defaults settings for edit, and update
 

	
 
        :param group_id:
 
        """
 
        repo_group = RepoGroup.get_or_404(group_id)
 
        data = repo_group.get_dict()
 
        data['group_name'] = repo_group.name
 

	
 
        # fill repository group users
 
        for p in repo_group.repo_group_to_perm:
 
            data.update({'u_perm_%s' % p.user.username:
 
                             p.permission.permission_name})
 

	
 
        # fill repository group groups
 
        for p in repo_group.users_group_to_perm:
 
            data.update({'g_perm_%s' % p.users_group.users_group_name:
 
                             p.permission.permission_name})
 

	
 
        return data
 

	
 
    def _revoke_perms_on_yourself(self, form_result):
 
        _up = filter(lambda u: request.authuser.username == u[0],
 
                     form_result['perms_updates'])
 
        _new = filter(lambda u: request.authuser.username == u[0],
 
                      form_result['perms_new'])
 
        if _new and _new[0][1] != 'group.admin' or _up and _up[0][1] != 'group.admin':
 
            return True
 
        return False
 

	
 
    def index(self, format='html'):
 
        _list = RepoGroup.query(sorted=True).all()
 
        group_iter = RepoGroupList(_list, perm_set=['group.admin'])
 
        group_iter = RepoGroupList(_list, perm_level='admin')
 
        repo_groups_data = []
 
        total_records = len(group_iter)
 
        _tmpl_lookup = kallithea.CONFIG['pylons.app_globals'].mako_lookup
 
        template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
 

	
 
        repo_group_name = lambda repo_group_name, children_groups: (
 
            template.get_def("repo_group_name")
 
            .render(repo_group_name, children_groups, _=_, h=h, c=c)
 
        )
 
        repo_group_actions = lambda repo_group_id, repo_group_name, gr_count: (
 
            template.get_def("repo_group_actions")
 
            .render(repo_group_id, repo_group_name, gr_count, _=_, h=h, c=c,
 
                    ungettext=ungettext)
 
        )
 

	
 
        for repo_gr in group_iter:
 
            children_groups = map(h.safe_unicode,
 
                itertools.chain((g.name for g in repo_gr.parents),
 
                                (x.name for x in [repo_gr])))
 
            repo_count = repo_gr.repositories.count()
 
            repo_groups_data.append({
 
                "raw_name": repo_gr.group_name,
 
                "group_name": repo_group_name(repo_gr.group_name, children_groups),
 
                "desc": h.escape(repo_gr.group_description),
 
                "repos": repo_count,
 
                "owner": h.person(repo_gr.owner),
 
                "action": repo_group_actions(repo_gr.group_id, repo_gr.group_name,
 
                                             repo_count)
 
            })
 

	
 
        c.data = json.dumps({
 
            "totalRecords": total_records,
 
            "startIndex": 0,
 
            "sort": None,
 
            "dir": "asc",
 
            "records": repo_groups_data
 
        })
 

	
 
        return render('admin/repo_groups/repo_groups.html')
 

	
 
    def create(self):
 
        self.__load_defaults()
 

	
 
        # permissions for can create group based on parent_id are checked
 
        # here in the Form
 
        repo_group_form = RepoGroupForm(repo_groups=c.repo_groups)
 
        try:
 
            form_result = repo_group_form.to_python(dict(request.POST))
 
            gr = RepoGroupModel().create(
 
                group_name=form_result['group_name'],
 
                group_description=form_result['group_description'],
 
                parent=form_result['parent_group_id'],
 
                owner=request.authuser.user_id, # TODO: make editable
 
                copy_permissions=form_result['group_copy_permissions']
 
            )
 
            Session().commit()
 
            #TODO: in futureaction_logger(, '', '', '', self.sa)
 
        except formencode.Invalid as errors:
 
            return htmlfill.render(
 
                render('admin/repo_groups/repo_group_add.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8",
 
                force_defaults=False)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during creation of repository group %s') \
 
                    % request.POST.get('group_name'), category='error')
 
            parent_group_id = form_result['parent_group_id']
 
            #TODO: maybe we should get back to the main view, not the admin one
 
            raise HTTPFound(location=url('repos_groups', parent_group=parent_group_id))
 
        h.flash(_('Created repository group %s') % gr.group_name,
 
                category='success')
 
        raise HTTPFound(location=url('repos_group_home', group_name=gr.group_name))
 

	
 
    def new(self):
 
        if HasPermissionAny('hg.admin')('group create'):
 
            #we're global admin, we're ok and we can create TOP level groups
 
            pass
 
        else:
 
            # we pass in parent group into creation form, thus we know
 
            # what would be the group, we can check perms here !
 
            group_id = safe_int(request.GET.get('parent_group'))
 
            group = RepoGroup.get(group_id) if group_id else None
 
            group_name = group.group_name if group else None
 
            if HasRepoGroupPermissionAny('group.admin')(group_name, 'group create'):
 
            if HasRepoGroupPermissionLevel('admin')(group_name, 'group create'):
 
                pass
 
            else:
 
                raise HTTPForbidden()
 

	
 
        self.__load_defaults()
 
        return render('admin/repo_groups/repo_group_add.html')
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def update(self, group_name):
 
        c.repo_group = RepoGroup.guess_instance(group_name)
 
        self.__load_defaults(extras=[c.repo_group.parent_group],
 
                             exclude=[c.repo_group])
 

	
 
        # TODO: kill allow_empty_group - it is only used for redundant form validation!
 
        if HasPermissionAny('hg.admin')('group edit'):
 
            #we're global admin, we're ok and we can create TOP level groups
 
            allow_empty_group = True
 
        elif not c.repo_group.parent_group:
 
            allow_empty_group = True
 
        else:
 
            allow_empty_group = False
 
        repo_group_form = RepoGroupForm(
 
            edit=True,
 
            old_data=c.repo_group.get_dict(),
 
            repo_groups=c.repo_groups,
 
            can_create_in_root=allow_empty_group,
 
        )()
 
        try:
 
            form_result = repo_group_form.to_python(dict(request.POST))
 

	
 
            new_gr = RepoGroupModel().update(group_name, form_result)
 
            Session().commit()
 
            h.flash(_('Updated repository group %s') \
 
                    % form_result['group_name'], category='success')
 
            # we now have new name !
 
            group_name = new_gr.group_name
 
            #TODO: in future action_logger(, '', '', '', self.sa)
 
        except formencode.Invalid as errors:
 
            c.active = 'settings'
 
            return htmlfill.render(
 
                render('admin/repo_groups/repo_group_edit.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8",
 
                force_defaults=False)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during update of repository group %s') \
 
                    % request.POST.get('group_name'), category='error')
 

	
 
        raise HTTPFound(location=url('edit_repo_group', group_name=group_name))
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def delete(self, group_name):
 
        gr = c.repo_group = RepoGroup.guess_instance(group_name)
 
        repos = gr.repositories.all()
 
        if repos:
 
            h.flash(_('This group contains %s repositories and cannot be '
 
                      'deleted') % len(repos), category='warning')
 
            raise HTTPFound(location=url('repos_groups'))
 

	
 
        children = gr.children.all()
 
        if children:
 
            h.flash(_('This group contains %s subgroups and cannot be deleted'
 
                      % (len(children))), category='warning')
 
            raise HTTPFound(location=url('repos_groups'))
 

	
 
        try:
 
            RepoGroupModel().delete(group_name)
 
            Session().commit()
 
            h.flash(_('Removed repository group %s') % group_name,
 
                    category='success')
 
            #TODO: in future action_logger(, '', '', '', self.sa)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during deletion of repository group %s')
 
                    % group_name, category='error')
 

	
 
        if gr.parent_group:
 
            raise HTTPFound(location=url('repos_group_home', group_name=gr.parent_group.group_name))
 
        raise HTTPFound(location=url('repos_groups'))
 

	
 
    def show_by_name(self, group_name):
 
        """
 
        This is a proxy that does a lookup group_name -> id, and shows
 
        the group by id view instead
 
        """
 
        group_name = group_name.rstrip('/')
 
        id_ = RepoGroup.get_by_group_name(group_name)
 
        if id_:
 
            return self.show(group_name)
 
        raise HTTPNotFound
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.read', 'group.write',
 
                                         'group.admin')
 
    @HasRepoGroupPermissionLevelDecorator('read')
 
    def show(self, group_name):
 
        c.active = 'settings'
 

	
 
        c.group = c.repo_group = RepoGroup.guess_instance(group_name)
 

	
 
        groups = RepoGroup.query(sorted=True).filter_by(parent_group=c.group).all()
 
        c.groups = self.scm_model.get_repo_groups(groups)
 

	
 
        repos_list = Repository.query(sorted=True).filter_by(group=c.group).all()
 
        repos_data = RepoModel().get_repos_as_dict(repos_list=repos_list,
 
                                                   admin=False, short_name=True)
 
        #json used to render the grid
 
        c.data = json.dumps(repos_data)
 

	
 
        return render('admin/repo_groups/repo_group_show.html')
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def edit(self, group_name):
 
        c.active = 'settings'
 

	
 
        c.repo_group = RepoGroup.guess_instance(group_name)
 
        self.__load_defaults(extras=[c.repo_group.parent_group],
 
                             exclude=[c.repo_group])
 
        defaults = self.__load_data(c.repo_group.group_id)
 

	
 
        return htmlfill.render(
 
            render('admin/repo_groups/repo_group_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False
 
        )
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def edit_repo_group_advanced(self, group_name):
 
        c.active = 'advanced'
 
        c.repo_group = RepoGroup.guess_instance(group_name)
 

	
 
        return render('admin/repo_groups/repo_group_edit.html')
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def edit_repo_group_perms(self, group_name):
 
        c.active = 'perms'
 
        c.repo_group = RepoGroup.guess_instance(group_name)
 
        self.__load_defaults()
 
        defaults = self.__load_data(c.repo_group.group_id)
 

	
 
        return htmlfill.render(
 
            render('admin/repo_groups/repo_group_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False
 
        )
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def update_perms(self, group_name):
 
        """
 
        Update permissions for given repository group
 

	
 
        :param group_name:
 
        """
 

	
 
        c.repo_group = RepoGroup.guess_instance(group_name)
 
        valid_recursive_choices = ['none', 'repos', 'groups', 'all']
 
        form_result = RepoGroupPermsForm(valid_recursive_choices)().to_python(request.POST)
 
        if not request.authuser.is_admin:
 
            if self._revoke_perms_on_yourself(form_result):
 
                msg = _('Cannot revoke permission for yourself as admin')
 
                h.flash(msg, category='warning')
 
                raise HTTPFound(location=url('edit_repo_group_perms', group_name=group_name))
 
        recursive = form_result['recursive']
 
        # iterate over all members(if in recursive mode) of this groups and
 
        # set the permissions !
 
        # this can be potentially heavy operation
 
        RepoGroupModel()._update_permissions(c.repo_group,
 
                                             form_result['perms_new'],
 
                                             form_result['perms_updates'],
 
                                             recursive)
 
        #TODO: implement this
 
        #action_logger(request.authuser, 'admin_changed_repo_permissions',
 
        #              repo_name, request.ip_addr, self.sa)
 
        Session().commit()
 
        h.flash(_('Repository group permissions updated'), category='success')
 
        raise HTTPFound(location=url('edit_repo_group_perms', group_name=group_name))
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def delete_perms(self, group_name):
 
        try:
 
            obj_type = request.POST.get('obj_type')
 
            obj_id = None
 
            if obj_type == 'user':
 
                obj_id = safe_int(request.POST.get('user_id'))
 
            elif obj_type == 'user_group':
 
                obj_id = safe_int(request.POST.get('user_group_id'))
 

	
 
            if not request.authuser.is_admin:
 
                if obj_type == 'user' and request.authuser.user_id == obj_id:
 
                    msg = _('Cannot revoke permission for yourself as admin')
 
                    h.flash(msg, category='warning')
 
                    raise Exception('revoke admin permission on self')
 
            recursive = request.POST.get('recursive', 'none')
 
            if obj_type == 'user':
 
                RepoGroupModel().delete_permission(repo_group=group_name,
 
                                                   obj=obj_id, obj_type='user',
 
                                                   recursive=recursive)
 
            elif obj_type == 'user_group':
 
                RepoGroupModel().delete_permission(repo_group=group_name,
 
                                                   obj=obj_id,
 
                                                   obj_type='user_group',
 
                                                   recursive=recursive)
 

	
 
            Session().commit()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during revoking of permission'),
 
                    category='error')
 
            raise HTTPInternalServerError()
kallithea/controllers/admin/repos.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.admin.repos
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Repositories controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 7, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 
import formencode
 
from formencode import htmlfill
 
from pylons import request, tmpl_context as c
 
from pylons.i18n.translation import _
 
from sqlalchemy.sql.expression import func
 
from webob.exc import HTTPFound, HTTPInternalServerError, HTTPForbidden, HTTPNotFound
 

	
 
from kallithea.config.routing import url
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import LoginRequired, \
 
    HasRepoPermissionLevelDecorator, NotAnonymous, HasPermissionAny
 
from kallithea.lib.base import BaseRepoController, render, jsonify
 
from kallithea.lib.utils import action_logger
 
from kallithea.lib.vcs import RepositoryError
 
from kallithea.model.meta import Session
 
from kallithea.model.db import User, Repository, UserFollowing, RepoGroup, \
 
    Setting, RepositoryField
 
from kallithea.model.forms import RepoForm, RepoFieldForm, RepoPermsForm
 
from kallithea.model.scm import ScmModel, AvailableRepoGroupChoices, RepoList
 
from kallithea.model.repo import RepoModel
 
from kallithea.lib.compat import json
 
from kallithea.lib.exceptions import AttachedForksError
 
from kallithea.lib.utils2 import safe_int
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ReposController(BaseRepoController):
 
    """
 
    REST Controller styled on the Atom Publishing Protocol"""
 
    # To properly map this controller, ensure your config/routing.py
 
    # file has a resource setup:
 
    #     map.resource('repo', 'repos')
 

	
 
    @LoginRequired()
 
    def __before__(self):
 
        super(ReposController, self).__before__()
 

	
 
    def _load_repo(self):
 
        repo_obj = c.db_repo
 

	
 
        if repo_obj is None:
 
            h.not_mapped_error(c.repo_name)
 
            raise HTTPFound(location=url('repos'))
 

	
 
        return repo_obj
 

	
 
    def __load_defaults(self, repo=None):
 
        top_perms = ['hg.create.repository']
 
        repo_group_perms = ['group.admin']
 
        if HasPermissionAny('hg.create.write_on_repogroup.true')():
 
            repo_group_perms.append('group.write')
 
            repo_group_perm_level = 'write'
 
        else:
 
            repo_group_perm_level = 'admin'
 
        extras = [] if repo is None else [repo.group]
 

	
 
        c.repo_groups = AvailableRepoGroupChoices(top_perms, repo_group_perms, extras)
 
        c.repo_groups = AvailableRepoGroupChoices(top_perms, repo_group_perm_level, extras)
 

	
 
        c.landing_revs_choices, c.landing_revs = ScmModel().get_repo_landing_revs(repo)
 

	
 
    def __load_data(self):
 
        """
 
        Load defaults settings for edit, and update
 
        """
 
        c.repo_info = self._load_repo()
 
        self.__load_defaults(c.repo_info)
 

	
 
        defaults = RepoModel()._get_defaults(c.repo_name)
 
        defaults['clone_uri'] = c.repo_info.clone_uri_hidden # don't show password
 

	
 
        return defaults
 

	
 
    def index(self, format='html'):
 
        _list = Repository.query(sorted=True).all()
 

	
 
        c.repos_list = RepoList(_list, perm_level='admin')
 
        repos_data = RepoModel().get_repos_as_dict(repos_list=c.repos_list,
 
                                                   admin=True,
 
                                                   super_user_actions=True)
 
        #json used to render the grid
 
        c.data = json.dumps(repos_data)
 

	
 
        return render('admin/repos/repos.html')
 

	
 
    @NotAnonymous()
 
    def create(self):
 
        self.__load_defaults()
 
        form_result = {}
 
        try:
 
            # CanWriteGroup validators checks permissions of this POST
 
            form_result = RepoForm(repo_groups=c.repo_groups,
 
                                   landing_revs=c.landing_revs_choices)() \
 
                            .to_python(dict(request.POST))
 

	
 
            # create is done sometimes async on celery, db transaction
 
            # management is handled there.
 
            task = RepoModel().create(form_result, request.authuser.user_id)
 
            task_id = task.task_id
 
        except formencode.Invalid as errors:
 
            log.info(errors)
 
            return htmlfill.render(
 
                render('admin/repos/repo_add.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                force_defaults=False,
 
                encoding="UTF-8")
 

	
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            msg = (_('Error creating repository %s')
 
                   % form_result.get('repo_name'))
 
            h.flash(msg, category='error')
 
            raise HTTPFound(location=url('home'))
 

	
 
        raise HTTPFound(location=h.url('repo_creating_home',
 
                              repo_name=form_result['repo_name_full'],
 
                              task_id=task_id))
 

	
 
    @NotAnonymous()
 
    def create_repository(self):
 
        self.__load_defaults()
 
        if not c.repo_groups:
 
            raise HTTPForbidden
 
        parent_group = request.GET.get('parent_group')
 

	
 
        ## apply the defaults from defaults page
 
        defaults = Setting.get_default_repo_settings(strip_prefix=True)
 
        if parent_group:
 
            prg = RepoGroup.get(parent_group)
 
            if prg is None or not any(rgc[0] == prg.group_id
 
                                      for rgc in c.repo_groups):
 
                raise HTTPForbidden
 
            defaults.update({'repo_group': parent_group})
 

	
 
        return htmlfill.render(
 
            render('admin/repos/repo_add.html'),
 
            defaults=defaults,
 
            errors={},
 
            prefix_error=False,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @LoginRequired()
 
    @NotAnonymous()
 
    def repo_creating(self, repo_name):
 
        c.repo = repo_name
 
        c.task_id = request.GET.get('task_id')
 
        if not c.repo:
 
            raise HTTPNotFound()
 
        return render('admin/repos/repo_creating.html')
 

	
 
    @LoginRequired()
kallithea/controllers/api/api.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.api.api
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
API controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Aug 20, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import time
 
import traceback
 
import logging
 
from sqlalchemy import or_
 

	
 
from pylons import request
 

	
 
from kallithea.controllers.api import JSONRPCController, JSONRPCError
 
from kallithea.lib.auth import (
 
    PasswordGenerator, AuthUser, HasPermissionAnyDecorator,
 
    HasPermissionAnyDecorator, HasPermissionAny, HasRepoPermissionLevel,
 
    HasRepoGroupPermissionAny, HasUserGroupPermissionAny)
 
    HasRepoGroupPermissionLevel, HasUserGroupPermissionAny)
 
from kallithea.lib.utils import map_groups, repo2db_mapper
 
from kallithea.lib.utils2 import (
 
    str2bool, time_to_datetime, safe_int, Optional, OAttr)
 
from kallithea.model.meta import Session
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.scm import ScmModel, UserGroupList
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.model.gist import GistModel
 
from kallithea.model.db import (
 
    Repository, Setting, UserIpMap, Permission, User, Gist,
 
    RepoGroup, UserGroup)
 
from kallithea.lib.compat import json
 
from kallithea.lib.exceptions import (
 
    DefaultUserException, UserGroupsAssignedException)
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def store_update(updates, attr, name):
 
    """
 
    Stores param in updates dict if it's not instance of Optional
 
    allows easy updates of passed in params
 
    """
 
    if not isinstance(attr, Optional):
 
        updates[name] = attr
 

	
 

	
 
def get_user_or_error(userid):
 
    """
 
    Get user by id or name or return JsonRPCError if not found
 

	
 
    :param userid:
 
    """
 
    user = UserModel().get_user(userid)
 
    if user is None:
 
        raise JSONRPCError("user `%s` does not exist" % (userid,))
 
    return user
 

	
 

	
 
def get_repo_or_error(repoid):
 
    """
 
    Get repo by id or name or return JsonRPCError if not found
 

	
 
    :param repoid:
 
    """
 
    repo = RepoModel().get_repo(repoid)
 
    if repo is None:
 
        raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 
    return repo
 

	
 

	
 
def get_repo_group_or_error(repogroupid):
 
    """
 
    Get repo group by id or name or return JsonRPCError if not found
 

	
 
    :param repogroupid:
 
    """
 
    repo_group = RepoGroup.guess_instance(repogroupid)
 
    if repo_group is None:
 
        raise JSONRPCError(
 
            'repository group `%s` does not exist' % (repogroupid,))
 
    return repo_group
 

	
 

	
 
def get_user_group_or_error(usergroupid):
 
    """
 
    Get user group by id or name or return JsonRPCError if not found
 

	
 
    :param usergroupid:
 
    """
 
    user_group = UserGroupModel().get_group(usergroupid)
 
    if user_group is None:
 
        raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 
    return user_group
 

	
 

	
 
def get_perm_or_error(permid, prefix=None):
 
    """
 
    Get permission by id or name or return JsonRPCError if not found
 

	
 
    :param permid:
 
    """
 
    perm = Permission.get_by_key(permid)
 
    if perm is None:
 
        raise JSONRPCError('permission `%s` does not exist' % (permid,))
 
    if prefix:
 
        if not perm.permission_name.startswith(prefix):
 
            raise JSONRPCError('permission `%s` is invalid, '
 
                               'should start with %s' % (permid, prefix))
 
    return perm
 

	
 

	
 
def get_gist_or_error(gistid):
 
    """
 
@@ -2018,404 +2018,396 @@ class ApiController(JSONRPCController):
 
            store_update(updates, description, 'group_description')
 
            store_update(updates, owner, 'owner')
 
            store_update(updates, parent, 'parent_group')
 
            store_update(updates, enable_locking, 'enable_locking')
 
            repo_group = RepoGroupModel().update(repo_group, updates)
 
            Session().commit()
 
            return dict(
 
                msg='updated repository group ID:%s %s' % (repo_group.group_id,
 
                                                           repo_group.group_name),
 
                repo_group=repo_group.get_api_data()
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to update repository group `%s`'
 
                               % (repogroupid,))
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def delete_repo_group(self, repogroupid):
 
        """
 

	
 
        :param repogroupid: name or id of repository group
 
        :type repogroupid: str or int
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            'msg': 'deleted repo group ID:<repogroupid> <repogroupname>
 
            'repo_group': null
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to delete repo group ID:<repogroupid> <repogroupname>"
 
          }
 

	
 
        """
 
        repo_group = get_repo_group_or_error(repogroupid)
 

	
 
        try:
 
            RepoGroupModel().delete(repo_group)
 
            Session().commit()
 
            return dict(
 
                msg='deleted repo group ID:%s %s' %
 
                    (repo_group.group_id, repo_group.group_name),
 
                repo_group=None
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to delete repo group ID:%s %s' %
 
                               (repo_group.group_id, repo_group.group_name)
 
                               )
 

	
 
    # permission check inside
 
    def grant_user_permission_to_repo_group(self, repogroupid, userid,
 
                                            perm, apply_to_children=Optional('none')):
 
        """
 
        Grant permission for user on given repository group, or update existing
 
        one if found. This command can be executed only using api_key belonging
 
        to user with admin rights, or user who has admin right to given repository
 
        group.
 

	
 
        :param repogroupid: name or id of repository group
 
        :type repogroupid: str or int
 
        :param userid:
 
        :param perm: (group.(none|read|write|admin))
 
        :type perm: str
 
        :param apply_to_children: 'none', 'repos', 'groups', 'all'
 
        :type apply_to_children: str
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "Granted perm: `<perm>` (recursive:<apply_to_children>) for user: `<username>` in repo group: `<repo_group_name>`",
 
                      "success": true
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to edit permission for user: `<userid>` in repo group: `<repo_group_name>`"
 
          }
 

	
 
        """
 

	
 
        repo_group = get_repo_group_or_error(repogroupid)
 

	
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this repo group !
 
            if not HasRepoGroupPermissionAny('group.admin')(group_name=repo_group.group_name):
 
            if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
 
                raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,))
 

	
 
        user = get_user_or_error(userid)
 
        perm = get_perm_or_error(perm, prefix='group.')
 
        apply_to_children = Optional.extract(apply_to_children)
 

	
 
        try:
 
            RepoGroupModel().add_permission(repo_group=repo_group,
 
                                            obj=user,
 
                                            obj_type="user",
 
                                            perm=perm,
 
                                            recursive=apply_to_children)
 
            Session().commit()
 
            return dict(
 
                msg='Granted perm: `%s` (recursive:%s) for user: `%s` in repo group: `%s`' % (
 
                    perm.permission_name, apply_to_children, user.username, repo_group.name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user: `%s` in repo group: `%s`' % (
 
                    userid, repo_group.name))
 

	
 
    # permission check inside
 
    def revoke_user_permission_from_repo_group(self, repogroupid, userid,
 
                                               apply_to_children=Optional('none')):
 
        """
 
        Revoke permission for user on given repository group. This command can
 
        be executed only using api_key belonging to user with admin rights, or
 
        user who has admin right to given repository group.
 

	
 
        :param repogroupid: name or id of repository group
 
        :type repogroupid: str or int
 
        :param userid:
 
        :type userid:
 
        :param apply_to_children: 'none', 'repos', 'groups', 'all'
 
        :type apply_to_children: str
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "Revoked perm (recursive:<apply_to_children>) for user: `<username>` in repo group: `<repo_group_name>`",
 
                      "success": true
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to edit permission for user: `<userid>` in repo group: `<repo_group_name>`"
 
          }
 

	
 
        """
 

	
 
        repo_group = get_repo_group_or_error(repogroupid)
 

	
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this repo group !
 
            if not HasRepoGroupPermissionAny('group.admin')(group_name=repo_group.group_name):
 
            if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
 
                raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,))
 

	
 
        user = get_user_or_error(userid)
 
        apply_to_children = Optional.extract(apply_to_children)
 

	
 
        try:
 
            RepoGroupModel().delete_permission(repo_group=repo_group,
 
                                               obj=user,
 
                                               obj_type="user",
 
                                               recursive=apply_to_children)
 

	
 
            Session().commit()
 
            return dict(
 
                msg='Revoked perm (recursive:%s) for user: `%s` in repo group: `%s`' % (
 
                    apply_to_children, user.username, repo_group.name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user: `%s` in repo group: `%s`' % (
 
                    userid, repo_group.name))
 

	
 
    # permission check inside
 
    def grant_user_group_permission_to_repo_group(
 
            self, repogroupid, usergroupid, perm,
 
            apply_to_children=Optional('none')):
 
        """
 
        Grant permission for user group on given repository group, or update
 
        existing one if found. This command can be executed only using
 
        api_key belonging to user with admin rights, or user who has admin
 
        right to given repository group.
 

	
 
        :param repogroupid: name or id of repository group
 
        :type repogroupid: str or int
 
        :param usergroupid: id of usergroup
 
        :type usergroupid: str or int
 
        :param perm: (group.(none|read|write|admin))
 
        :type perm: str
 
        :param apply_to_children: 'none', 'repos', 'groups', 'all'
 
        :type apply_to_children: str
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            "msg" : "Granted perm: `<perm>` (recursive:<apply_to_children>) for user group: `<usersgroupname>` in repo group: `<repo_group_name>`",
 
            "success": true
 

	
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to edit permission for user group: `<usergroup>` in repo group: `<repo_group_name>`"
 
          }
 

	
 
        """
 
        repo_group = get_repo_group_or_error(repogroupid)
 
        perm = get_perm_or_error(perm, prefix='group.')
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this repo group !
 
            _perms = ('group.admin',)
 
            if not HasRepoGroupPermissionAny(*_perms)(
 
                    group_name=repo_group.group_name):
 
            if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
 
                raise JSONRPCError(
 
                    'repository group `%s` does not exist' % (repogroupid,))
 

	
 
            # check if we have at least read permission for this user group !
 
            _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
 
            if not HasUserGroupPermissionAny(*_perms)(
 
                    user_group_name=user_group.users_group_name):
 
                raise JSONRPCError(
 
                    'user group `%s` does not exist' % (usergroupid,))
 

	
 
        apply_to_children = Optional.extract(apply_to_children)
 

	
 
        try:
 
            RepoGroupModel().add_permission(repo_group=repo_group,
 
                                            obj=user_group,
 
                                            obj_type="user_group",
 
                                            perm=perm,
 
                                            recursive=apply_to_children)
 
            Session().commit()
 
            return dict(
 
                msg='Granted perm: `%s` (recursive:%s) for user group: `%s` in repo group: `%s`' % (
 
                    perm.permission_name, apply_to_children,
 
                    user_group.users_group_name, repo_group.name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user group: `%s` in '
 
                'repo group: `%s`' % (
 
                    usergroupid, repo_group.name
 
                )
 
            )
 

	
 
    # permission check inside
 
    def revoke_user_group_permission_from_repo_group(
 
            self, repogroupid, usergroupid,
 
            apply_to_children=Optional('none')):
 
        """
 
        Revoke permission for user group on given repository. This command can be
 
        executed only using api_key belonging to user with admin rights, or
 
        user who has admin right to given repository group.
 

	
 
        :param repogroupid: name or id of repository group
 
        :type repogroupid: str or int
 
        :param usergroupid:
 
        :param apply_to_children: 'none', 'repos', 'groups', 'all'
 
        :type apply_to_children: str
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "Revoked perm (recursive:<apply_to_children>) for user group: `<usersgroupname>` in repo group: `<repo_group_name>`",
 
                      "success": true
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to edit permission for user group: `<usergroup>` in repo group: `<repo_group_name>`"
 
          }
 

	
 

	
 
        """
 
        repo_group = get_repo_group_or_error(repogroupid)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this repo group !
 
            _perms = ('group.admin',)
 
            if not HasRepoGroupPermissionAny(*_perms)(
 
                    group_name=repo_group.group_name):
 
            if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
 
                raise JSONRPCError(
 
                    'repository group `%s` does not exist' % (repogroupid,))
 

	
 
            # check if we have at least read permission for this user group !
 
            _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
 
            if not HasUserGroupPermissionAny(*_perms)(
 
                    user_group_name=user_group.users_group_name):
 
                raise JSONRPCError(
 
                    'user group `%s` does not exist' % (usergroupid,))
 

	
 
        apply_to_children = Optional.extract(apply_to_children)
 

	
 
        try:
 
            RepoGroupModel().delete_permission(repo_group=repo_group,
 
                                               obj=user_group,
 
                                               obj_type="user_group",
 
                                               recursive=apply_to_children)
 
            Session().commit()
 
            return dict(
 
                msg='Revoked perm (recursive:%s) for user group: `%s` in repo group: `%s`' % (
 
                    apply_to_children, user_group.users_group_name, repo_group.name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user group: `%s` in repo group: `%s`' % (
 
                    user_group.users_group_name, repo_group.name
 
                )
 
            )
 

	
 
    def get_gist(self, gistid):
 
        """
 
        Get given gist by id
 

	
 
        :param gistid: id of private or public gist
 
        :type gistid: str
 
        """
 
        gist = get_gist_or_error(gistid)
 
        if not HasPermissionAny('hg.admin')():
 
            if gist.owner_id != request.authuser.user_id:
 
                raise JSONRPCError('gist `%s` does not exist' % (gistid,))
 
        return gist.get_api_data()
 

	
 
    def get_gists(self, userid=Optional(OAttr('apiuser'))):
 
        """
 
        Get all gists for given user. If userid is empty returned gists
 
        are for user who called the api
 

	
 
        :param userid: user to get gists for
 
        :type userid: Optional(str or int)
 
        """
 
        if not HasPermissionAny('hg.admin')():
 
            # make sure normal user does not pass someone else userid,
 
            # he is not allowed to do that
 
            if not isinstance(userid, Optional) and userid != request.authuser.user_id:
 
                raise JSONRPCError(
 
                    'userid is not the same as your user'
 
                )
 

	
 
        if isinstance(userid, Optional):
 
            user_id = request.authuser.user_id
 
        else:
 
            user_id = get_user_or_error(userid).user_id
 

	
 
        gists = []
 
        _gists = Gist().query() \
 
            .filter(or_(Gist.gist_expires == -1, Gist.gist_expires >= time.time())) \
 
            .filter(Gist.owner_id == user_id) \
 
            .order_by(Gist.created_on.desc())
 
        for gist in _gists:
 
            gists.append(gist.get_api_data())
 
        return gists
 

	
 
    def create_gist(self, files, owner=Optional(OAttr('apiuser')),
 
                    gist_type=Optional(Gist.GIST_PUBLIC), lifetime=Optional(-1),
 
                    description=Optional('')):
 

	
 
        """
 
        Creates new Gist
 

	
 
        :param files: files to be added to gist
 
            {'filename': {'content':'...', 'lexer': null},
 
             'filename2': {'content':'...', 'lexer': null}}
 
        :type files: dict
 
        :param owner: gist owner, defaults to api method caller
 
        :type owner: Optional(str or int)
 
        :param gist_type: type of gist 'public' or 'private'
 
        :type gist_type: Optional(str)
 
        :param lifetime: time in minutes of gist lifetime
 
        :type lifetime: Optional(int)
 
        :param description: gist description
 
        :type description: Optional(str)
 

	
 
        OUTPUT::
kallithea/controllers/forks.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.forks
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
forks controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 23, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import formencode
 
import traceback
 
from formencode import htmlfill
 

	
 
from pylons import tmpl_context as c, request
 
from pylons.i18n.translation import _
 
from webob.exc import HTTPFound
 

	
 
import kallithea.lib.helpers as h
 

	
 
from kallithea.config.routing import url
 
from kallithea.lib.auth import LoginRequired, HasRepoPermissionLevelDecorator, \
 
    NotAnonymous, HasRepoPermissionLevel, HasPermissionAnyDecorator, HasPermissionAny
 
from kallithea.lib.base import BaseRepoController, render
 
from kallithea.lib.page import Page
 
from kallithea.lib.utils2 import safe_int
 
from kallithea.model.db import Repository, UserFollowing, User, Ui
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.forms import RepoForkForm
 
from kallithea.model.scm import ScmModel, AvailableRepoGroupChoices
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ForksController(BaseRepoController):
 

	
 
    def __before__(self):
 
        super(ForksController, self).__before__()
 

	
 
    def __load_defaults(self):
 
        repo_group_perms = ['group.admin']
 
        if HasPermissionAny('hg.create.write_on_repogroup.true')():
 
            repo_group_perms.append('group.write')
 
        c.repo_groups = AvailableRepoGroupChoices(['hg.create.repository'], repo_group_perms)
 
            repo_group_perm_level = 'write'
 
        else:
 
            repo_group_perm_level = 'admin'
 
        c.repo_groups = AvailableRepoGroupChoices(['hg.create.repository'], repo_group_perm_level)
 

	
 
        c.landing_revs_choices, c.landing_revs = ScmModel().get_repo_landing_revs()
 

	
 
        c.can_update = Ui.get_by_key('hooks', Ui.HOOK_UPDATE).ui_active
 

	
 
    def __load_data(self):
 
        """
 
        Load defaults settings for edit, and update
 
        """
 
        self.__load_defaults()
 

	
 
        c.repo_info = c.db_repo
 
        repo = c.db_repo.scm_instance
 

	
 
        if c.repo_info is None:
 
            h.not_mapped_error(c.repo_name)
 
            raise HTTPFound(location=url('repos'))
 

	
 
        c.default_user_id = User.get_default_user().user_id
 
        c.in_public_journal = UserFollowing.query() \
 
            .filter(UserFollowing.user_id == c.default_user_id) \
 
            .filter(UserFollowing.follows_repository == c.repo_info).scalar()
 

	
 
        if c.repo_info.stats:
 
            last_rev = c.repo_info.stats.stat_on_revision+1
 
        else:
 
            last_rev = 0
 
        c.stats_revision = last_rev
 

	
 
        c.repo_last_rev = repo.count() if repo.revisions else 0
 

	
 
        if last_rev == 0 or c.repo_last_rev == 0:
 
            c.stats_percentage = 0
 
        else:
 
            c.stats_percentage = '%.2f' % ((float((last_rev)) /
 
                                            c.repo_last_rev) * 100)
 

	
 
        defaults = RepoModel()._get_defaults(c.repo_name)
 
        # alter the description to indicate a fork
 
        defaults['description'] = ('fork of repository: %s \n%s'
 
                                   % (defaults['repo_name'],
 
                                      defaults['description']))
 
        # add suffix to fork
 
        defaults['repo_name'] = '%s-fork' % defaults['repo_name']
 

	
 
        return defaults
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    def forks(self, repo_name):
 
        p = safe_int(request.GET.get('page'), 1)
 
        repo_id = c.db_repo.repo_id
 
        d = []
 
        for r in Repository.get_repo_forks(repo_id):
 
            if not HasRepoPermissionLevel('read')(r.repo_name, 'get forks check'):
 
                continue
 
            d.append(r)
 
        c.forks_pager = Page(d, page=p, items_per_page=20)
 

	
 
        if request.environ.get('HTTP_X_PARTIAL_XHR'):
 
            return render('/forks/forks_data.html')
 

	
 
        return render('/forks/forks.html')
 

	
 
    @LoginRequired()
 
    @NotAnonymous()
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository')
 
    @HasRepoPermissionLevelDecorator('read')
 
    def fork(self, repo_name):
 
        c.repo_info = Repository.get_by_repo_name(repo_name)
 
        if not c.repo_info:
 
            h.not_mapped_error(repo_name)
 
            raise HTTPFound(location=url('home'))
 

	
 
        defaults = self.__load_data()
 

	
 
        return htmlfill.render(
 
            render('forks/fork.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @LoginRequired()
 
    @NotAnonymous()
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository')
 
    @HasRepoPermissionLevelDecorator('read')
 
    def fork_create(self, repo_name):
 
        self.__load_defaults()
 
        c.repo_info = Repository.get_by_repo_name(repo_name)
 
        _form = RepoForkForm(old_data={'repo_type': c.repo_info.repo_type},
 
                             repo_groups=c.repo_groups,
 
                             landing_revs=c.landing_revs_choices)()
 
        form_result = {}
 
        task_id = None
 
        try:
 
            form_result = _form.to_python(dict(request.POST))
kallithea/lib/auth.py
Show inline comments
 
@@ -456,192 +456,204 @@ class AuthUser(object):
 
    `AuthUser` does not by itself authenticate users and the constructor
 
    sets the `is_authenticated` field to False. It's up to other parts
 
    of the code to check e.g. if a supplied password is correct, and if
 
    so, set `is_authenticated` to True.
 

	
 
    However, `AuthUser` does refuse to load a user that is not `active`.
 

	
 
    Note that Kallithea distinguishes between the default user (an actual
 
    user in the database with username "default") and "no user" (no actual
 
    User object, AuthUser filled with blank values and username "None").
 

	
 
    If the default user is active, that will always be used instead of
 
    "no user". On the other hand, if the default user is disabled (and
 
    there is no login information), we instead get "no user"; this should
 
    only happen on the login page (as all other requests are redirected).
 

	
 
    `is_default_user` specifically checks if the AuthUser is the user named
 
    "default". Use `is_anonymous` to check for both "default" and "no user".
 
    """
 

	
 
    def __init__(self, user_id=None, dbuser=None, authenticating_api_key=None,
 
            is_external_auth=False):
 

	
 
        self.is_authenticated = False
 
        self.is_external_auth = is_external_auth
 
        self.authenticating_api_key = authenticating_api_key
 

	
 
        user_model = UserModel()
 
        self._default_user = User.get_default_user(cache=True)
 

	
 
        # These attributes will be overridden by fill_data, below, unless the
 
        # requested user cannot be found and the default anonymous user is
 
        # not enabled.
 
        self.user_id = None
 
        self.username = None
 
        self.api_key = None
 
        self.name = ''
 
        self.lastname = ''
 
        self.email = ''
 
        self.admin = False
 
        self.inherit_default_permissions = False
 

	
 
        # Look up database user, if necessary.
 
        if user_id is not None:
 
            log.debug('Auth User lookup by USER ID %s', user_id)
 
            dbuser = user_model.get(user_id)
 
        else:
 
            # Note: dbuser is allowed to be None.
 
            log.debug('Auth User lookup by database user %s', dbuser)
 

	
 
        is_user_loaded = self._fill_data(dbuser)
 

	
 
        # If user cannot be found, try falling back to anonymous.
 
        if not is_user_loaded:
 
            is_user_loaded =  self._fill_data(self._default_user)
 

	
 
        self.is_default_user = (self.user_id == self._default_user.user_id)
 
        self.is_anonymous = not is_user_loaded or self.is_default_user
 

	
 
        if not self.username:
 
            self.username = 'None'
 

	
 
        log.debug('Auth User is now %s', self)
 

	
 
    def _fill_data(self, dbuser):
 
        """
 
        Copies database fields from a `db.User` to this `AuthUser`. Does
 
        not copy `api_keys` and `permissions` attributes.
 

	
 
        Checks that `dbuser` is `active` (and not None) before copying;
 
        returns True on success.
 
        """
 
        if dbuser is not None and dbuser.active:
 
            log.debug('filling %s data', dbuser)
 
            for k, v in dbuser.get_dict().iteritems():
 
                assert k not in ['api_keys', 'permissions']
 
                setattr(self, k, v)
 
            return True
 
        return False
 

	
 
    @LazyProperty
 
    def permissions(self):
 
        return self.__get_perms(user=self, cache=False)
 

	
 
    def has_repository_permission_level(self, repo_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['repository.read', 'repository.write', 'repository.admin'],
 
            'write': ['repository.write', 'repository.admin'],
 
            'admin': ['repository.admin'],
 
        }[level]
 
        actual_perm = self.permissions['repositories'].get(repo_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r repo %r (%s): %s (has %r)',
 
            self.username, level, repo_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    def has_repository_group_permission_level(self, repo_group_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['group.read', 'group.write', 'group.admin'],
 
            'write': ['group.write', 'group.admin'],
 
            'admin': ['group.admin'],
 
        }[level]
 
        actual_perm = self.permissions['repositories_groups'].get(repo_group_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r repo group %r (%s): %s (has %r)',
 
            self.username, level, repo_group_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    @property
 
    def api_keys(self):
 
        return self._get_api_keys()
 

	
 
    def __get_perms(self, user, explicit=True, algo='higherwin', cache=False):
 
        """
 
        Fills user permission attribute with permissions taken from database
 
        works for permissions given for repositories, and for permissions that
 
        are granted to groups
 

	
 
        :param user: `AuthUser` instance
 
        :param explicit: In case there are permissions both for user and a group
 
            that user is part of, explicit flag will define if user will
 
            explicitly override permissions from group, if it's False it will
 
            make decision based on the algo
 
        :param algo: algorithm to decide what permission should be choose if
 
            it's multiple defined, eg user in two different groups. It also
 
            decides if explicit flag is turned off how to specify the permission
 
            for case when user is in a group + have defined separate permission
 
        """
 
        user_id = user.user_id
 
        user_is_admin = user.is_admin
 
        user_inherit_default_permissions = user.inherit_default_permissions
 

	
 
        log.debug('Getting PERMISSION tree')
 
        compute = conditional_cache('short_term', 'cache_desc',
 
                                    condition=cache, func=_cached_perms_data)
 
        return compute(user_id, user_is_admin,
 
                       user_inherit_default_permissions, explicit, algo)
 

	
 
    def _get_api_keys(self):
 
        api_keys = [self.api_key]
 
        for api_key in UserApiKeys.query() \
 
                .filter(UserApiKeys.user_id == self.user_id) \
 
                .filter(or_(UserApiKeys.expires == -1,
 
                            UserApiKeys.expires >= time.time())).all():
 
            api_keys.append(api_key.api_key)
 

	
 
        return api_keys
 

	
 
    @property
 
    def is_admin(self):
 
        return self.admin
 

	
 
    @property
 
    def repositories_admin(self):
 
        """
 
        Returns list of repositories you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['repositories'].iteritems()
 
                if x[1] == 'repository.admin']
 

	
 
    @property
 
    def repository_groups_admin(self):
 
        """
 
        Returns list of repository groups you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['repositories_groups'].iteritems()
 
                if x[1] == 'group.admin']
 

	
 
    @property
 
    def user_groups_admin(self):
 
        """
 
        Returns list of user groups you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['user_groups'].iteritems()
 
                if x[1] == 'usergroup.admin']
 

	
 
    @staticmethod
 
    def check_ip_allowed(user, ip_addr):
 
        """
 
        Check if the given IP address (a `str`) is allowed for the given
 
        user (an `AuthUser` or `db.User`).
 
        """
 
        allowed_ips = AuthUser.get_allowed_ips(user.user_id, cache=True,
 
            inherit_from_default=user.inherit_default_permissions)
 
        if check_ip_access(source_ip=ip_addr, allowed_ips=allowed_ips):
 
            log.debug('IP:%s is in range of %s', ip_addr, allowed_ips)
 
            return True
 
        else:
 
            log.info('Access for IP:%s forbidden, '
 
                     'not in %s' % (ip_addr, allowed_ips))
 
            return False
 

	
 
    def __repr__(self):
 
        return "<AuthUser('id:%s[%s] auth:%s')>" \
 
            % (self.user_id, self.username, (self.is_authenticated or self.is_default_user))
 

	
 
    def to_cookie(self):
 
        """ Serializes this login session to a cookie `dict`. """
 
        return {
 
            'user_id': self.user_id,
 
            'is_external_auth': self.is_external_auth,
 
        }
 

	
 
    @staticmethod
 
@@ -766,227 +778,219 @@ class LoginRequired(object):
 
            return func(*fargs, **fkwargs)
 

	
 
        # CSRF protection: Whenever a request has ambient authority (whether
 
        # through a session cookie or its origin IP address), it must include
 
        # the correct token, unless the HTTP method is GET or HEAD (and thus
 
        # guaranteed to be side effect free. In practice, the only situation
 
        # where we allow side effects without ambient authority is when the
 
        # authority comes from an API key; and that is handled above.
 
        if request.method not in ['GET', 'HEAD']:
 
            token = request.POST.get(secure_form.token_key)
 
            if not token or token != secure_form.authentication_token():
 
                log.error('CSRF check failed')
 
                raise HTTPForbidden()
 

	
 
        # regular user authentication
 
        if user.is_authenticated or user.is_default_user:
 
            log.info('user %s authenticated with regular auth @ %s', user, loc)
 
            return func(*fargs, **fkwargs)
 
        else:
 
            log.warning('user %s NOT authenticated with regular auth @ %s', user, loc)
 
            raise _redirect_to_login()
 

	
 

	
 
# Use as decorator
 
class NotAnonymous(object):
 
    """Ensures that client is not logged in as the "default" user, and
 
    redirects to the login page otherwise. Must be used together with
 
    LoginRequired."""
 

	
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 

	
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        cls = fargs[0]
 
        user = request.authuser
 

	
 
        log.debug('Checking that user %s is not anonymous @%s', user.username, cls)
 

	
 
        if user.is_default_user:
 
            raise _redirect_to_login(_('You need to be a registered user to '
 
                                       'perform this action'))
 
        else:
 
            return func(*fargs, **fkwargs)
 

	
 

	
 
class _PermsDecorator(object):
 
    """Base class for controller decorators"""
 

	
 
    def __init__(self, *required_perms):
 
        self.required_perms = required_perms # usually very short - a list is thus fine
 

	
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 

	
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        cls = fargs[0]
 
        user = request.authuser
 
        log.debug('checking %s permissions %s for %s %s',
 
          self.__class__.__name__, self.required_perms, cls, user)
 

	
 
        if self.check_permissions(user):
 
            log.debug('Permission granted for %s %s', cls, user)
 
            return func(*fargs, **fkwargs)
 

	
 
        else:
 
            log.debug('Permission denied for %s %s', cls, user)
 
            if user.is_default_user:
 
                raise _redirect_to_login(_('You need to be signed in to view this page'))
 
            else:
 
                raise HTTPForbidden()
 

	
 
    def check_permissions(self, user):
 
        raise NotImplementedError()
 

	
 

	
 
class HasPermissionAnyDecorator(_PermsDecorator):
 
    """
 
    Checks the user has any of the given global permissions.
 
    """
 

	
 
    def check_permissions(self, user):
 
        global_permissions = user.permissions['global'] # usually very short
 
        return any(p in global_permissions for p in self.required_perms)
 

	
 

	
 
class HasRepoPermissionLevelDecorator(_PermsDecorator):
 
    """
 
    Checks the user has at least the specified permission level for the requested repository.
 
    """
 

	
 
    def check_permissions(self, user):
 
        repo_name = get_repo_slug(request)
 
        (level,) = self.required_perms
 
        return user.has_repository_permission_level(repo_name, level)
 

	
 

	
 
class HasRepoGroupPermissionAnyDecorator(_PermsDecorator):
 
class HasRepoGroupPermissionLevelDecorator(_PermsDecorator):
 
    """
 
    Checks the user has any of given permissions for the requested repository group.
 
    """
 

	
 
    def check_permissions(self, user):
 
        repo_group_name = get_repo_group_slug(request)
 
        try:
 
            return user.permissions['repositories_groups'][repo_group_name] in self.required_perms
 
        except KeyError:
 
            return False
 
        (level,) = self.required_perms
 
        return user.has_repository_group_permission_level(repo_group_name, level)
 

	
 

	
 
class HasUserGroupPermissionAnyDecorator(_PermsDecorator):
 
    """
 
    Checks for access permission for any of given predicates for specific
 
    user group. In order to fulfill the request any of predicates must be meet
 
    """
 

	
 
    def check_permissions(self, user):
 
        user_group_name = get_user_group_slug(request)
 
        try:
 
            return user.permissions['user_groups'][user_group_name] in self.required_perms
 
        except KeyError:
 
            return False
 

	
 

	
 
#==============================================================================
 
# CHECK FUNCTIONS
 
#==============================================================================
 

	
 
class _PermsFunction(object):
 
    """Base function for other check functions"""
 

	
 
    def __init__(self, *required_perms):
 
        self.required_perms = required_perms # usually very short - a list is thus fine
 

	
 
    def __nonzero__(self):
 
        """ Defend against accidentally forgetting to call the object
 
            and instead evaluating it directly in a boolean context,
 
            which could have security implications.
 
        """
 
        raise AssertionError(self.__class__.__name__ + ' is not a bool and must be called!')
 

	
 
    def __call__(self, *a, **b):
 
        raise NotImplementedError()
 

	
 

	
 
class HasPermissionAny(_PermsFunction):
 

	
 
    def __call__(self, purpose=None):
 
        global_permissions = request.user.permissions['global'] # usually very short
 
        ok = any(p in global_permissions for p in self.required_perms)
 

	
 
        log.debug('Check %s for global %s (%s): %s' %
 
            (request.user.username, self.required_perms, purpose, ok))
 
        return ok
 

	
 

	
 
class HasRepoPermissionLevel(_PermsFunction):
 

	
 
    def __call__(self, repo_name, purpose=None):
 
        (level,) = self.required_perms
 
        return request.user.has_repository_permission_level(repo_name, level, purpose)
 

	
 

	
 
class HasRepoGroupPermissionAny(_PermsFunction):
 
class HasRepoGroupPermissionLevel(_PermsFunction):
 

	
 
    def __call__(self, group_name, purpose=None):
 
        try:
 
            ok = request.user.permissions['repositories_groups'][group_name] in self.required_perms
 
        except KeyError:
 
            ok = False
 

	
 
        log.debug('Check %s for %s for repo group %s (%s): %s' %
 
            (request.user.username, self.required_perms, group_name, purpose, ok))
 
        return ok
 
        (level,) = self.required_perms
 
        return request.user.has_repository_group_permission_level(group_name, level, purpose)
 

	
 

	
 
class HasUserGroupPermissionAny(_PermsFunction):
 

	
 
    def __call__(self, user_group_name, purpose=None):
 
        try:
 
            ok = request.user.permissions['user_groups'][user_group_name] in self.required_perms
 
        except KeyError:
 
            ok = False
 

	
 
        log.debug('Check %s %s for user group %s (%s): %s' %
 
            (request.user.username, self.required_perms, user_group_name, purpose, ok))
 
        return ok
 

	
 

	
 
#==============================================================================
 
# SPECIAL VERSION TO HANDLE MIDDLEWARE AUTH
 
#==============================================================================
 

	
 
class HasPermissionAnyMiddleware(object):
 
    def __init__(self, *perms):
 
        self.required_perms = set(perms)
 

	
 
    def __call__(self, user, repo_name, purpose=None):
 
        # repo_name MUST be unicode, since we handle keys in ok
 
        # dict by unicode
 
        repo_name = safe_unicode(repo_name)
 
        user = AuthUser(user.user_id)
 

	
 
        try:
 
            ok = user.permissions['repositories'][repo_name] in self.required_perms
 
        except KeyError:
 
            ok = False
 

	
 
        log.debug('Middleware check %s for %s for repo %s (%s): %s' % (user.username, self.required_perms, repo_name, purpose, ok))
 
        return ok
 

	
 

	
 
def check_ip_access(source_ip, allowed_ips=None):
 
    """
 
    Checks if source_ip is a subnet of any of allowed_ips.
 

	
 
    :param source_ip:
 
    :param allowed_ips: list of allowed ips together with mask
 
    """
 
    from kallithea.lib import ipaddr
 
    log.debug('checking if ip:%s is subnet of %s', source_ip, allowed_ips)
 
    if isinstance(allowed_ips, (tuple, list, set)):
 
        for ip in allowed_ips:
 
            if ipaddr.IPAddress(source_ip) in ipaddr.IPNetwork(ip):
 
                log.debug('IP %s is network %s',
 
                          ipaddr.IPAddress(source_ip), ipaddr.IPNetwork(ip))
 
                return True
 
    return False
kallithea/lib/helpers.py
Show inline comments
 
@@ -685,193 +685,193 @@ def action_parser(user_log, feed=False, 
 
            repo_name = user_log.repository_name
 
        else:
 
            repo_name = user_log.repository.repo_name
 

	
 
        return link_to(_('Pull request %s') % nice_id,
 
                    url('pullrequest_show', repo_name=repo_name,
 
                    pull_request_id=pull_request_id))
 

	
 
    def get_archive_name():
 
        archive_name = action_params
 
        return archive_name
 

	
 
    # action : translated str, callback(extractor), icon
 
    action_map = {
 
    'user_deleted_repo':           (_('[deleted] repository'),
 
                                    None, 'icon-trashcan'),
 
    'user_created_repo':           (_('[created] repository'),
 
                                    None, 'icon-plus'),
 
    'user_created_fork':           (_('[created] repository as fork'),
 
                                    None, 'icon-fork'),
 
    'user_forked_repo':            (_('[forked] repository'),
 
                                    get_fork_name, 'icon-fork'),
 
    'user_updated_repo':           (_('[updated] repository'),
 
                                    None, 'icon-pencil'),
 
    'user_downloaded_archive':      (_('[downloaded] archive from repository'),
 
                                    get_archive_name, 'icon-download-cloud'),
 
    'admin_deleted_repo':          (_('[delete] repository'),
 
                                    None, 'icon-trashcan'),
 
    'admin_created_repo':          (_('[created] repository'),
 
                                    None, 'icon-plus'),
 
    'admin_forked_repo':           (_('[forked] repository'),
 
                                    None, 'icon-fork'),
 
    'admin_updated_repo':          (_('[updated] repository'),
 
                                    None, 'icon-pencil'),
 
    'admin_created_user':          (_('[created] user'),
 
                                    get_user_name, 'icon-user'),
 
    'admin_updated_user':          (_('[updated] user'),
 
                                    get_user_name, 'icon-user'),
 
    'admin_created_users_group':   (_('[created] user group'),
 
                                    get_users_group, 'icon-pencil'),
 
    'admin_updated_users_group':   (_('[updated] user group'),
 
                                    get_users_group, 'icon-pencil'),
 
    'user_commented_revision':     (_('[commented] on revision in repository'),
 
                                    get_cs_links, 'icon-comment'),
 
    'user_commented_pull_request': (_('[commented] on pull request for'),
 
                                    get_pull_request, 'icon-comment'),
 
    'user_closed_pull_request':    (_('[closed] pull request for'),
 
                                    get_pull_request, 'icon-ok'),
 
    'push':                        (_('[pushed] into'),
 
                                    get_cs_links, 'icon-move-up'),
 
    'push_local':                  (_('[committed via Kallithea] into repository'),
 
                                    get_cs_links, 'icon-pencil'),
 
    'push_remote':                 (_('[pulled from remote] into repository'),
 
                                    get_cs_links, 'icon-move-up'),
 
    'pull':                        (_('[pulled] from'),
 
                                    None, 'icon-move-down'),
 
    'started_following_repo':      (_('[started following] repository'),
 
                                    None, 'icon-heart'),
 
    'stopped_following_repo':      (_('[stopped following] repository'),
 
                                    None, 'icon-heart-empty'),
 
    }
 

	
 
    action_str = action_map.get(action, action)
 
    if feed:
 
        action = action_str[0].replace('[', '').replace(']', '')
 
    else:
 
        action = action_str[0] \
 
            .replace('[', '<b>') \
 
            .replace(']', '</b>')
 

	
 
    action_params_func = lambda: ""
 

	
 
    if callable(action_str[1]):
 
        action_params_func = action_str[1]
 

	
 
    def action_parser_icon():
 
        action = user_log.action
 
        action_params = None
 
        x = action.split(':')
 

	
 
        if len(x) > 1:
 
            action, action_params = x
 

	
 
        ico = action_map.get(action, ['', '', ''])[2]
 
        html = """<i class="%s"></i>""" % ico
 
        return literal(html)
 

	
 
    # returned callbacks we need to call to get
 
    return [lambda: literal(action), action_params_func, action_parser_icon]
 

	
 

	
 

	
 
#==============================================================================
 
# PERMS
 
#==============================================================================
 
from kallithea.lib.auth import HasPermissionAny, \
 
    HasRepoPermissionLevel, HasRepoGroupPermissionAny
 
    HasRepoPermissionLevel, HasRepoGroupPermissionLevel
 

	
 

	
 
#==============================================================================
 
# GRAVATAR URL
 
#==============================================================================
 
def gravatar_div(email_address, cls='', size=30, **div_attributes):
 
    """Return an html literal with a div around a gravatar if they are enabled.
 
    Extra keyword parameters starting with 'div_' will get the prefix removed
 
    and '_' changed to '-' and be used as attributes on the div. The default
 
    class is 'gravatar'.
 
    """
 
    from pylons import tmpl_context as c
 
    if not c.visual.use_gravatar:
 
        return ''
 
    if 'div_class' not in div_attributes:
 
        div_attributes['div_class'] = "gravatar"
 
    attributes = []
 
    for k, v in sorted(div_attributes.items()):
 
        assert k.startswith('div_'), k
 
        attributes.append(' %s="%s"' % (k[4:].replace('_', '-'), escape(v)))
 
    return literal("""<div%s>%s</div>""" %
 
                   (''.join(attributes),
 
                    gravatar(email_address, cls=cls, size=size)))
 

	
 
def gravatar(email_address, cls='', size=30):
 
    """return html element of the gravatar
 

	
 
    This method will return an <img> with the resolution double the size (for
 
    retina screens) of the image. If the url returned from gravatar_url is
 
    empty then we fallback to using an icon.
 

	
 
    """
 
    from pylons import tmpl_context as c
 
    if not c.visual.use_gravatar:
 
        return ''
 

	
 
    src = gravatar_url(email_address, size * 2)
 

	
 
    if src:
 
        # here it makes sense to use style="width: ..." (instead of, say, a
 
        # stylesheet) because we using this to generate a high-res (retina) size
 
        html = ('<img alt="" class="{cls}" style="width: {size}px; height: {size}px" src="{src}"/>'
 
            .format(cls=cls, size=size, src=src))
 

	
 
    else:
 
        # if src is empty then there was no gravatar, so we use a font icon
 
        html = ("""<i class="icon-user {cls}" style="font-size: {size}px;"></i>"""
 
            .format(cls=cls, size=size, src=src))
 

	
 
    return literal(html)
 

	
 
def gravatar_url(email_address, size=30, default=''):
 
    # doh, we need to re-import those to mock it later
 
    from kallithea.config.routing import url
 
    from kallithea.model.db import User
 
    from pylons import tmpl_context as c
 
    if not c.visual.use_gravatar:
 
        return ""
 

	
 
    _def = 'anonymous@kallithea-scm.org'  # default gravatar
 
    email_address = email_address or _def
 

	
 
    if email_address == _def:
 
        return default
 

	
 
    parsed_url = urlparse.urlparse(url.current(qualified=True))
 
    url = (c.visual.gravatar_url or User.DEFAULT_GRAVATAR_URL ) \
 
               .replace('{email}', email_address) \
 
               .replace('{md5email}', hashlib.md5(safe_str(email_address).lower()).hexdigest()) \
 
               .replace('{netloc}', parsed_url.netloc) \
 
               .replace('{scheme}', parsed_url.scheme) \
 
               .replace('{size}', safe_str(size))
 
    return url
 

	
 

	
 
def changed_tooltip(nodes):
 
    """
 
    Generates a html string for changed nodes in changeset page.
 
    It limits the output to 30 entries
 

	
 
    :param nodes: LazyNodesGenerator
 
    """
 
    if nodes:
 
        pref = ': <br/> '
 
        suf = ''
 
        if len(nodes) > 30:
 
            suf = '<br/>' + _(' and %s more') % (len(nodes) - 30)
 
        return literal(pref + '<br/> '.join([safe_unicode(x.path)
 
                                             for x in nodes[:30]]) + suf)
 
    else:
 
        return ': ' + _('No files')
 

	
 

	
 
def fancy_file_stats(stats):
 
    """
 
    Displays a fancy two colored bar for number of added/deleted
kallithea/model/scm.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.scm
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Scm model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 9, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import sys
 
import posixpath
 
import re
 
import time
 
import traceback
 
import logging
 
import cStringIO
 
import pkg_resources
 

	
 
from sqlalchemy import func
 
from pylons.i18n.translation import _
 

	
 
import kallithea
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.exceptions import RepositoryError
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 

	
 
from kallithea import BACKENDS
 
from kallithea.lib import helpers as h
 
from kallithea.lib.utils2 import safe_str, safe_unicode, get_server_url, \
 
    _set_extras
 
from kallithea.lib.auth import HasRepoPermissionLevel, HasRepoGroupPermissionAny, \
 
from kallithea.lib.auth import HasRepoPermissionLevel, HasRepoGroupPermissionLevel, \
 
    HasUserGroupPermissionAny, HasPermissionAny, HasPermissionAny
 
from kallithea.lib.utils import get_filesystem_repos, make_ui, \
 
    action_logger
 
from kallithea.model.base import BaseModel
 
from kallithea.model.db import Repository, Ui, CacheInvalidation, \
 
    UserFollowing, UserLog, User, RepoGroup, PullRequest
 
from kallithea.lib.hooks import log_push_action
 
from kallithea.lib.exceptions import NonRelativePathError, IMCCommitError
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UserTemp(object):
 
    def __init__(self, user_id):
 
        self.user_id = user_id
 

	
 
    def __repr__(self):
 
        return "<%s('id:%s')>" % (self.__class__.__name__, self.user_id)
 

	
 

	
 
class RepoTemp(object):
 
    def __init__(self, repo_id):
 
        self.repo_id = repo_id
 

	
 
    def __repr__(self):
 
        return "<%s('id:%s')>" % (self.__class__.__name__, self.repo_id)
 

	
 

	
 
class _PermCheckIterator(object):
 
    def __init__(self, obj_list, obj_attr, perm_set, perm_checker, extra_kwargs=None):
 
        """
 
        Creates iterator from given list of objects, additionally
 
        checking permission for them from perm_set var
 

	
 
        :param obj_list: list of db objects
 
        :param obj_attr: attribute of object to pass into perm_checker
 
        :param perm_set: list of permissions to check
 
        :param perm_checker: callable to check permissions against
 
        """
 
        self.obj_list = obj_list
 
        self.obj_attr = obj_attr
 
        self.perm_set = perm_set
 
        self.perm_checker = perm_checker
 
        self.extra_kwargs = extra_kwargs or {}
 

	
 
    def __len__(self):
 
        return len(self.obj_list)
 

	
 
    def __repr__(self):
 
        return '<%s (%s)>' % (self.__class__.__name__, self.__len__())
 

	
 
    def __iter__(self):
 
        for db_obj in self.obj_list:
 
            # check permission at this level
 
            name = getattr(db_obj, self.obj_attr, None)
 
            if not self.perm_checker(*self.perm_set)(
 
                    name, self.__class__.__name__, **self.extra_kwargs):
 
                continue
 

	
 
            yield db_obj
 

	
 

	
 
class RepoList(_PermCheckIterator):
 

	
 
    def __init__(self, db_repo_list, perm_level, extra_kwargs=None):
 
        super(RepoList, self).__init__(obj_list=db_repo_list,
 
                    obj_attr='repo_name', perm_set=[perm_level],
 
                    perm_checker=HasRepoPermissionLevel,
 
                    extra_kwargs=extra_kwargs)
 

	
 

	
 
class RepoGroupList(_PermCheckIterator):
 

	
 
    def __init__(self, db_repo_group_list, perm_set=None, extra_kwargs=None):
 
        if not perm_set:
 
            perm_set = ['group.read', 'group.write', 'group.admin']
 

	
 
    def __init__(self, db_repo_group_list, perm_level, extra_kwargs=None):
 
        super(RepoGroupList, self).__init__(obj_list=db_repo_group_list,
 
                    obj_attr='group_name', perm_set=perm_set,
 
                    perm_checker=HasRepoGroupPermissionAny,
 
                    obj_attr='group_name', perm_set=[perm_level],
 
                    perm_checker=HasRepoGroupPermissionLevel,
 
                    extra_kwargs=extra_kwargs)
 

	
 

	
 
class UserGroupList(_PermCheckIterator):
 

	
 
    def __init__(self, db_user_group_list, perm_set=None, extra_kwargs=None):
 
        if not perm_set:
 
            perm_set = ['usergroup.read', 'usergroup.write', 'usergroup.admin']
 

	
 
        super(UserGroupList, self).__init__(obj_list=db_user_group_list,
 
                    obj_attr='users_group_name', perm_set=perm_set,
 
                    perm_checker=HasUserGroupPermissionAny,
 
                    extra_kwargs=extra_kwargs)
 

	
 

	
 
class ScmModel(BaseModel):
 
    """
 
    Generic Scm Model
 
    """
 

	
 
    def __get_repo(self, instance):
 
        cls = Repository
 
        if isinstance(instance, cls):
 
            return instance
 
        elif isinstance(instance, int) or safe_str(instance).isdigit():
 
            return cls.get(instance)
 
        elif isinstance(instance, basestring):
 
            return cls.get_by_repo_name(instance)
 
        elif instance is not None:
 
            raise Exception('given object must be int, basestr or Instance'
 
                            ' of %s got %s' % (type(cls), type(instance)))
 

	
 
    @LazyProperty
 
    def repos_path(self):
 
        """
 
        Gets the repositories root path from database
 
        """
 

	
 
        q = self.sa.query(Ui).filter(Ui.ui_key == '/').one()
 

	
 
        return q.ui_value
 

	
 
    def repo_scan(self, repos_path=None):
 
        """
 
        Listing of repositories in given path. This path should not be a
 
        repository itself. Return a dictionary of repository objects
 

	
 
        :param repos_path: path to directory containing repositories
 
        """
 

	
 
        if repos_path is None:
 
            repos_path = self.repos_path
 

	
 
        log.info('scanning for repositories in %s', repos_path)
 

	
 
        baseui = make_ui('db')
 
        repos = {}
 

	
 
        for name, path in get_filesystem_repos(repos_path):
 
            # name need to be decomposed and put back together using the /
 
            # since this is internal storage separator for kallithea
 
            name = Repository.normalize_repo_name(name)
 

	
 
            try:
 
                if name in repos:
 
                    raise RepositoryError('Duplicate repository name %s '
 
                                          'found in %s' % (name, path))
 
                else:
 

	
 
                    klass = get_backend(path[0])
 

	
 
                    if path[0] == 'hg' and path[0] in BACKENDS.keys():
 
                        repos[name] = klass(safe_str(path[1]), baseui=baseui)
 

	
 
                    if path[0] == 'git' and path[0] in BACKENDS.keys():
 
                        repos[name] = klass(path[1])
 
            except OSError:
 
                continue
 
        log.debug('found %s paths with repositories', len(repos))
 
        return repos
 

	
 
    def get_repos(self, repos):
 
        """Return the repos the user has access to"""
 
        return RepoList(repos, perm_level='read')
 

	
 
    def get_repo_groups(self, groups=None):
 
        """Return the repo groups the user has access to
 
        If no groups are specified, use top level groups.
 
        """
 
        if groups is None:
 
            groups = RepoGroup.query() \
 
                .filter(RepoGroup.parent_group_id == None).all()
 
        return RepoGroupList(groups)
 
        return RepoGroupList(groups, perm_level='read')
 

	
 
    def mark_for_invalidation(self, repo_name):
 
        """
 
        Mark caches of this repo invalid in the database.
 

	
 
        :param repo_name: the repo for which caches should be marked invalid
 
        """
 
        CacheInvalidation.set_invalidate(repo_name)
 
        repo = Repository.get_by_repo_name(repo_name)
 
        if repo is not None:
 
            repo.update_changeset_cache()
 

	
 
    def toggle_following_repo(self, follow_repo_id, user_id):
 

	
 
        f = self.sa.query(UserFollowing) \
 
            .filter(UserFollowing.follows_repository_id == follow_repo_id) \
 
            .filter(UserFollowing.user_id == user_id).scalar()
 

	
 
        if f is not None:
 
            try:
 
                self.sa.delete(f)
 
                action_logger(UserTemp(user_id),
 
                              'stopped_following_repo',
 
                              RepoTemp(follow_repo_id))
 
                return
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                raise
 

	
 
        try:
 
            f = UserFollowing()
 
            f.user_id = user_id
 
            f.follows_repository_id = follow_repo_id
 
            self.sa.add(f)
 

	
 
            action_logger(UserTemp(user_id),
 
                          'started_following_repo',
 
                          RepoTemp(follow_repo_id))
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def toggle_following_user(self, follow_user_id, user_id):
 
        f = self.sa.query(UserFollowing) \
 
            .filter(UserFollowing.follows_user_id == follow_user_id) \
 
            .filter(UserFollowing.user_id == user_id).scalar()
 

	
 
        if f is not None:
 
            try:
 
                self.sa.delete(f)
 
                return
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                raise
 

	
 
        try:
 
            f = UserFollowing()
 
            f.user_id = user_id
 
            f.follows_user_id = follow_user_id
 
            self.sa.add(f)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def is_following_repo(self, repo_name, user_id, cache=False):
 
        r = self.sa.query(Repository) \
 
            .filter(Repository.repo_name == repo_name).scalar()
 

	
 
        f = self.sa.query(UserFollowing) \
 
            .filter(UserFollowing.follows_repository == r) \
 
            .filter(UserFollowing.user_id == user_id).scalar()
 

	
 
        return f is not None
 

	
 
    def is_following_user(self, username, user_id, cache=False):
 
        u = User.get_by_username(username)
 

	
 
        f = self.sa.query(UserFollowing) \
 
            .filter(UserFollowing.follows_user == u) \
 
            .filter(UserFollowing.user_id == user_id).scalar()
 

	
 
        return f is not None
 

	
 
    def get_followers(self, repo):
 
        repo = Repository.guess_instance(repo)
 

	
 
        return self.sa.query(UserFollowing) \
 
                .filter(UserFollowing.follows_repository == repo).count()
 

	
 
    def get_forks(self, repo):
 
        repo = Repository.guess_instance(repo)
 
        return self.sa.query(Repository) \
 
                .filter(Repository.fork == repo).count()
 

	
 
    def get_pull_requests(self, repo):
 
        repo = Repository.guess_instance(repo)
 
@@ -691,113 +688,113 @@ class ScmModel(BaseModel):
 
    def get_unread_journal(self):
 
        return self.sa.query(UserLog).count()
 

	
 
    def get_repo_landing_revs(self, repo=None):
 
        """
 
        Generates select option with tags branches and bookmarks (for hg only)
 
        grouped by type
 

	
 
        :param repo:
 
        """
 

	
 
        hist_l = []
 
        choices = []
 
        repo = self.__get_repo(repo)
 
        hist_l.append(['rev:tip', _('latest tip')])
 
        choices.append('rev:tip')
 
        if repo is None:
 
            return choices, hist_l
 

	
 
        repo = repo.scm_instance
 

	
 
        branches_group = ([(u'branch:%s' % k, k) for k, v in
 
                           repo.branches.iteritems()], _("Branches"))
 
        hist_l.append(branches_group)
 
        choices.extend([x[0] for x in branches_group[0]])
 

	
 
        if repo.alias == 'hg':
 
            bookmarks_group = ([(u'book:%s' % k, k) for k, v in
 
                                repo.bookmarks.iteritems()], _("Bookmarks"))
 
            hist_l.append(bookmarks_group)
 
            choices.extend([x[0] for x in bookmarks_group[0]])
 

	
 
        tags_group = ([(u'tag:%s' % k, k) for k, v in
 
                       repo.tags.iteritems()], _("Tags"))
 
        hist_l.append(tags_group)
 
        choices.extend([x[0] for x in tags_group[0]])
 

	
 
        return choices, hist_l
 

	
 
    def install_git_hooks(self, repo, force_create=False):
 
        """
 
        Creates a kallithea hook inside a git repository
 

	
 
        :param repo: Instance of VCS repo
 
        :param force_create: Create even if same name hook exists
 
        """
 

	
 
        loc = os.path.join(repo.path, 'hooks')
 
        if not repo.bare:
 
            loc = os.path.join(repo.path, '.git', 'hooks')
 
        if not os.path.isdir(loc):
 
            os.makedirs(loc)
 

	
 
        tmpl_post = "#!/usr/bin/env %s\n" % sys.executable or 'python2'
 
        tmpl_post += pkg_resources.resource_string(
 
            'kallithea', os.path.join('config', 'post_receive_tmpl.py')
 
        )
 
        tmpl_pre = "#!/usr/bin/env %s\n" % sys.executable or 'python2'
 
        tmpl_pre += pkg_resources.resource_string(
 
            'kallithea', os.path.join('config', 'pre_receive_tmpl.py')
 
        )
 

	
 
        for h_type, tmpl in [('pre', tmpl_pre), ('post', tmpl_post)]:
 
            _hook_file = os.path.join(loc, '%s-receive' % h_type)
 
            has_hook = False
 
            log.debug('Installing git hook in repo %s', repo)
 
            if os.path.exists(_hook_file):
 
                # let's take a look at this hook, maybe it's kallithea ?
 
                log.debug('hook exists, checking if it is from kallithea')
 
                with open(_hook_file, 'rb') as f:
 
                    data = f.read()
 
                    matches = re.compile(r'(?:%s)\s*=\s*(.*)'
 
                                         % 'KALLITHEA_HOOK_VER').search(data)
 
                    if matches:
 
                        try:
 
                            ver = matches.groups()[0]
 
                            log.debug('got %s it is kallithea', ver)
 
                            has_hook = True
 
                        except Exception:
 
                            log.error(traceback.format_exc())
 
            else:
 
                # there is no hook in this dir, so we want to create one
 
                has_hook = True
 

	
 
            if has_hook or force_create:
 
                log.debug('writing %s hook file !', h_type)
 
                try:
 
                    with open(_hook_file, 'wb') as f:
 
                        tmpl = tmpl.replace('_TMPL_', kallithea.__version__)
 
                        f.write(tmpl)
 
                    os.chmod(_hook_file, 0755)
 
                except IOError as e:
 
                    log.error('error writing %s: %s', _hook_file, e)
 
            else:
 
                log.debug('skipping writing hook file')
 

	
 
def AvailableRepoGroupChoices(top_perms, repo_group_perms, extras=()):
 
def AvailableRepoGroupChoices(top_perms, repo_group_perm_level, extras=()):
 
    """Return group_id,string tuples with choices for all the repo groups where
 
    the user has the necessary permissions.
 

	
 
    Top level is -1.
 
    """
 
    groups = RepoGroup.query().all()
 
    if HasPermissionAny('hg.admin')('available repo groups'):
 
        groups.append(None)
 
    else:
 
        groups = list(RepoGroupList(groups, perm_set=repo_group_perms))
 
        groups = list(RepoGroupList(groups, perm_level=repo_group_perm_level))
 
        if top_perms and HasPermissionAny(*top_perms)('available repo groups'):
 
            groups.append(None)
 
        for extra in extras:
 
            if not any(rg == extra for rg in groups):
 
                groups.append(extra)
 
    return RepoGroup.groups_choices(groups=groups)
kallithea/model/validators.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Set of generic validators
 
"""
 

	
 
import os
 
import re
 
import formencode
 
import logging
 
from collections import defaultdict
 
from pylons.i18n.translation import _
 
from webhelpers.pylonslib.secure_form import authentication_token
 
import sqlalchemy
 

	
 
from formencode.validators import (
 
    UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set,
 
    NotEmpty, IPAddress, CIDR, String, FancyValidator
 
)
 
from kallithea.lib.compat import OrderedSet
 
from kallithea.lib import ipaddr
 
from kallithea.lib.utils import repo_name_slug
 
from kallithea.lib.utils2 import str2bool, aslist
 
from kallithea.model.db import RepoGroup, Repository, UserGroup, User
 
from kallithea.lib.exceptions import LdapImportError
 
from kallithea.config.routing import ADMIN_PREFIX
 
from kallithea.lib.auth import HasRepoGroupPermissionAny, HasPermissionAny
 
from kallithea.lib.auth import HasRepoGroupPermissionLevel, HasPermissionAny
 

	
 
# silence warnings and pylint
 
UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \
 
    NotEmpty, IPAddress, CIDR, String, FancyValidator
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def UniqueListFromString():
 
    class _UniqueListFromString(formencode.FancyValidator):
 
        """
 
        Split value on ',' and make unique while preserving order
 
        """
 
        messages = dict(
 
            empty=_('Value cannot be an empty list'),
 
            missing_value=_('Value cannot be an empty list'),
 
        )
 

	
 
        def _to_python(self, value, state):
 
            value = aslist(value, ',')
 
            seen = set()
 
            return [c for c in value if not (c in seen or seen.add(c))]
 

	
 
        def empty_value(self, value):
 
            return []
 

	
 
    return _UniqueListFromString
 

	
 

	
 
def ValidUsername(edit=False, old_data=None):
 
    old_data = old_data or {}
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'username_exists': _('Username "%(username)s" already exists'),
 
            'system_invalid_username':
 
                _('Username "%(username)s" cannot be used'),
 
            'invalid_username':
 
                _('Username may only contain alphanumeric characters '
 
                  'underscores, periods or dashes and must begin with an '
 
                  'alphanumeric character or underscore')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            if value in ['default', 'new_user']:
 
                msg = self.message('system_invalid_username', state, username=value)
 
                raise formencode.Invalid(msg, value, state)
 
            #check if user is unique
 
            old_un = None
 
            if edit:
 
                old_un = User.get(old_data.get('user_id')).username
 

	
 
            if old_un != value or not edit:
 
                if User.get_by_username(value, case_insensitive=True):
 
                    msg = self.message('username_exists', state, username=value)
 
                    raise formencode.Invalid(msg, value, state)
 

	
 
            if re.match(r'^[a-zA-Z0-9\_]{1}[a-zA-Z0-9\-\_\.]*$', value) is None:
 
                msg = self.message('invalid_username', state)
 
                raise formencode.Invalid(msg, value, state)
 
    return _validator
 

	
 

	
 
def ValidRegex(msg=None):
 
    class _validator(formencode.validators.Regex):
 
        messages = dict(invalid=msg or _('The input is not valid'))
 
    return _validator
 

	
 

	
 
def ValidRepoUser():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_username': _('Username %(username)s is not valid')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            try:
 
                User.query().filter(User.active == True) \
 
                    .filter(User.username == value).one()
 
            except sqlalchemy.exc.InvalidRequestError: # NoResultFound/MultipleResultsFound
 
                msg = self.message('invalid_username', state, username=value)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(username=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidUserGroup(edit=False, old_data=None):
 
    old_data = old_data or {}
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_group': _('Invalid user group name'),
 
            'group_exist': _('User group "%(usergroup)s" already exists'),
 
            'invalid_usergroup_name':
 
                _('user group name may only contain alphanumeric '
 
                  'characters underscores, periods or dashes and must begin '
 
@@ -409,247 +409,246 @@ def SlugifyName():
 

	
 
    return _validator
 

	
 

	
 
def ValidCloneUri():
 
    from kallithea.lib.utils import make_ui
 

	
 
    def url_handler(repo_type, url, ui):
 
        if repo_type == 'hg':
 
            from kallithea.lib.vcs.backends.hg.repository import MercurialRepository
 
            if url.startswith('http') or url.startswith('ssh'):
 
                # initially check if it's at least the proper URL
 
                # or does it pass basic auth
 
                MercurialRepository._check_url(url, ui)
 
            elif url.startswith('svn+http'):
 
                from hgsubversion.svnrepo import svnremoterepo
 
                svnremoterepo(ui, url).svn.uuid
 
            elif url.startswith('git+http'):
 
                raise NotImplementedError()
 
            else:
 
                raise Exception('clone from URI %s not allowed' % (url,))
 

	
 
        elif repo_type == 'git':
 
            from kallithea.lib.vcs.backends.git.repository import GitRepository
 
            if url.startswith('http') or url.startswith('git'):
 
                # initially check if it's at least the proper URL
 
                # or does it pass basic auth
 
                GitRepository._check_url(url)
 
            elif url.startswith('svn+http'):
 
                raise NotImplementedError()
 
            elif url.startswith('hg+http'):
 
                raise NotImplementedError()
 
            else:
 
                raise Exception('clone from URI %s not allowed' % (url))
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'clone_uri': _('Invalid repository URL'),
 
            'invalid_clone_uri': _('Invalid repository URL. It must be a '
 
                                   'valid http, https, ssh, svn+http or svn+https URL'),
 
        }
 

	
 
        def validate_python(self, value, state):
 
            repo_type = value.get('repo_type')
 
            url = value.get('clone_uri')
 

	
 
            if url and url != value.get('clone_uri_hidden'):
 
                try:
 
                    url_handler(repo_type, url, make_ui('db', clear_session=False))
 
                except Exception:
 
                    log.exception('URL validation failed')
 
                    msg = self.message('clone_uri', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(clone_uri=msg)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidForkType(old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_fork_type': _('Fork has to be the same type as parent')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            if old_data['repo_type'] != value:
 
                msg = self.message('invalid_fork_type', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(repo_type=msg)
 
                )
 
    return _validator
 

	
 

	
 
def CanWriteGroup(old_data=None):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'permission_denied': _("You don't have permissions "
 
                                   "to create repository in this group"),
 
            'permission_denied_root': _("no permission to create repository "
 
                                        "in root location")
 
        }
 

	
 
        def _to_python(self, value, state):
 
            #root location
 
            if value == -1:
 
                return None
 
            return value
 

	
 
        def validate_python(self, value, state):
 
            gr = RepoGroup.get(value)
 
            gr_name = gr.group_name if gr is not None else None # None means ROOT location
 

	
 
            # create repositories with write permission on group is set to true
 
            create_on_write = HasPermissionAny('hg.create.write_on_repogroup.true')()
 
            group_admin = HasRepoGroupPermissionAny('group.admin')(gr_name,
 
            group_admin = HasRepoGroupPermissionLevel('admin')(gr_name,
 
                                            'can write into group validator')
 
            group_write = HasRepoGroupPermissionAny('group.write')(gr_name,
 
            group_write = HasRepoGroupPermissionLevel('write')(gr_name,
 
                                            'can write into group validator')
 
            forbidden = not (group_admin or (group_write and create_on_write))
 
            can_create_repos = HasPermissionAny('hg.admin', 'hg.create.repository')
 
            gid = (old_data['repo_group'].get('group_id')
 
                   if (old_data and 'repo_group' in old_data) else None)
 
            value_changed = gid != value
 
            new = not old_data
 
            # do check if we changed the value, there's a case that someone got
 
            # revoked write permissions to a repository, he still created, we
 
            # don't need to check permission if he didn't change the value of
 
            # groups in form box
 
            if value_changed or new:
 
                #parent group need to be existing
 
                if gr and forbidden:
 
                    msg = self.message('permission_denied', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(repo_type=msg)
 
                    )
 
                ## check if we can write to root location !
 
                elif gr is None and not can_create_repos():
 
                    msg = self.message('permission_denied_root', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(repo_type=msg)
 
                    )
 

	
 
    return _validator
 

	
 

	
 
def CanCreateGroup(can_create_in_root=False):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'permission_denied': _("You don't have permissions "
 
                                   "to create a group in this location")
 
        }
 

	
 
        def to_python(self, value, state):
 
            #root location
 
            if value == -1:
 
                return None
 
            return value
 

	
 
        def validate_python(self, value, state):
 
            gr = RepoGroup.get(value)
 
            gr_name = gr.group_name if gr is not None else None # None means ROOT location
 

	
 
            if can_create_in_root and gr is None:
 
                #we can create in root, we're fine no validations required
 
                return
 

	
 
            forbidden_in_root = gr is None and not can_create_in_root
 
            val = HasRepoGroupPermissionAny('group.admin')
 
            forbidden = not val(gr_name, 'can create group validator')
 
            forbidden = not HasRepoGroupPermissionLevel('admin')(gr_name, 'can create group validator')
 
            if forbidden_in_root or forbidden:
 
                msg = self.message('permission_denied', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(parent_group_id=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidPerms(type_='repo'):
 
    if type_ == 'repo_group':
 
        EMPTY_PERM = 'group.none'
 
    elif type_ == 'repo':
 
        EMPTY_PERM = 'repository.none'
 
    elif type_ == 'user_group':
 
        EMPTY_PERM = 'usergroup.none'
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'perm_new_member_name':
 
                _('This username or user group name is not valid')
 
        }
 

	
 
        def to_python(self, value, state):
 
            perms_update = OrderedSet()
 
            perms_new = OrderedSet()
 
            # build a list of permission to update and new permission to create
 

	
 
            #CLEAN OUT ORG VALUE FROM NEW MEMBERS, and group them using
 
            new_perms_group = defaultdict(dict)
 
            for k, v in value.copy().iteritems():
 
                if k.startswith('perm_new_member'):
 
                    del value[k]
 
                    _type, part = k.split('perm_new_member_')
 
                    args = part.split('_')
 
                    if len(args) == 1:
 
                        new_perms_group[args[0]]['perm'] = v
 
                    elif len(args) == 2:
 
                        _key, pos = args
 
                        new_perms_group[pos][_key] = v
 

	
 
            # fill new permissions in order of how they were added
 
            for k in sorted(map(int, new_perms_group.keys())):
 
                perm_dict = new_perms_group[str(k)]
 
                new_member = perm_dict.get('name')
 
                new_perm = perm_dict.get('perm')
 
                new_type = perm_dict.get('type')
 
                if new_member and new_perm and new_type:
 
                    perms_new.add((new_member, new_perm, new_type))
 

	
 
            for k, v in value.iteritems():
 
                if k.startswith('u_perm_') or k.startswith('g_perm_'):
 
                    member = k[7:]
 
                    t = {'u': 'user',
 
                         'g': 'users_group'
 
                    }[k[0]]
 
                    if member == User.DEFAULT_USER:
 
                        if str2bool(value.get('repo_private')):
 
                            # set none for default when updating to
 
                            # private repo protects against form manipulation
 
                            v = EMPTY_PERM
 
                    perms_update.add((member, v, t))
 

	
 
            value['perms_updates'] = list(perms_update)
 
            value['perms_new'] = list(perms_new)
 

	
 
            # update permissions
 
            for k, v, t in perms_new:
 
                try:
 
                    if t is 'user':
 
                        self.user_db = User.query() \
 
                            .filter(User.active == True) \
 
                            .filter(User.username == k).one()
 
                    if t is 'users_group':
 
                        self.user_db = UserGroup.query() \
 
                            .filter(UserGroup.users_group_active == True) \
 
                            .filter(UserGroup.users_group_name == k).one()
 

	
 
                except Exception:
 
                    log.exception('Updated permission failed')
 
                    msg = self.message('perm_new_member_type', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(perm_new_member_name=msg)
 
                    )
 
            return value
 
    return _validator
 

	
 

	
 
def ValidSettings():
 
    class _validator(formencode.validators.FancyValidator):
 
        def _to_python(self, value, state):
 
            # settings  form for users that are not admin
 
            # can't edit certain parameters, it's extra backup if they mangle
 
            # with forms
 

	
 
            forbidden_params = [
kallithea/templates/index_base.html
Show inline comments
 
<%page args="parent,group_name=''" />
 
    <div class="panel panel-primary">
 
        <div class="panel-heading clearfix">
 
            <div class="pull-left breadcrumbs">
 
                %if c.group is not None:
 
                    %for group in c.group.parents:
 
                        ${h.link_to(group.name, url('repos_group_home', group_name=group.group_name))}
 
                        &raquo;
 
                    %endfor
 
                    ${c.group.group_name}
 
                %endif
 
            </div>
 

	
 
            %if request.authuser.username != 'default':
 
              <ul class="pull-right links">
 
                <li>
 
                <%
 
                    gr_name = c.group.group_name if c.group else None
 
                    # create repositories with write permission on group is set to true
 
                    create_on_write = h.HasPermissionAny('hg.create.write_on_repogroup.true')()
 
                    group_admin = h.HasRepoGroupPermissionAny('group.admin')(gr_name, 'can write into group index page')
 
                    group_write = h.HasRepoGroupPermissionAny('group.write')(gr_name, 'can write into group index page')
 
                    group_admin = h.HasRepoGroupPermissionLevel('admin')(gr_name, 'can write into group index page')
 
                    group_write = h.HasRepoGroupPermissionLevel('write')(gr_name, 'can write into group index page')
 
                %>
 
                %if h.HasPermissionAny('hg.admin','hg.create.repository')() or (group_admin or (group_write and create_on_write)):
 
                  %if c.group:
 
                        <a href="${h.url('new_repo',parent_group=c.group.group_id)}" class="btn btn-default btn-xs"><i class="icon-plus"></i> ${_('Add Repository')}</a>
 
                        %if h.HasPermissionAny('hg.admin')() or h.HasRepoGroupPermissionAny('group.admin')(c.group.group_name):
 
                        %if h.HasPermissionAny('hg.admin')() or h.HasRepoGroupPermissionLevel('admin')(c.group.group_name):
 
                            <a href="${h.url('new_repos_group', parent_group=c.group.group_id)}" class="btn btn-default btn-xs"><i class="icon-plus"></i> ${_('Add Repository Group')}</a>
 
                        %endif
 
                  %else:
 
                    <a href="${h.url('new_repo')}" class="btn btn-default btn-xs"><i class="icon-plus"></i> ${_('Add Repository')}</a>
 
                    %if h.HasPermissionAny('hg.admin')():
 
                        <a href="${h.url('new_repos_group')}" class="btn btn-default btn-xs"><i class="icon-plus"></i> ${_('Add Repository Group')}</a>
 
                    %endif
 
                  %endif
 
                %endif
 
                %if c.group and h.HasRepoGroupPermissionAny('group.admin')(c.group.group_name):
 
                %if c.group and h.HasRepoGroupPermissionLevel('admin')(c.group.group_name):
 
                    <a href="${h.url('edit_repo_group',group_name=c.group.group_name)}" title="${_('You have admin right to this group, and can edit it')}" class="btn btn-default btn-xs"><i class="icon-pencil"></i> ${_('Edit Repository Group')}</a>
 
                %endif
 
                </li>
 
              </ul>
 
            %endif
 
        </div>
 
        %if c.groups:
 
        <div class="panel-body">
 
              <table id="groups_list" class="table">
 
                  <thead>
 
                      <tr>
 
                          <th class="left">${_('Repository Group')}</th>
 
                          <th class="left">${_('Description')}</th>
 
                          ##<th class="left">${_('Number of Repositories')}</th>
 
                      </tr>
 
                  </thead>
 

	
 
                  ## REPO GROUPS
 
                  % for gr in c.groups:
 
                    <tr>
 
                        <td>
 
                            <div class="dt_repo">
 
                              <a href="${url('repos_group_home',group_name=gr.group_name)}">
 
                                <i class="icon-folder"></i>
 
                                <span class="dt_repo_name">${gr.name}</span>
 
                              </a>
 
                            </div>
 
                        </td>
 
                        <td>${h.urlify_text(gr.group_description, stylize=c.visual.stylify_metatags)}</td>
 
                        ## this is commented out since for multi nested repos can be HEAVY!
 
                        ## in number of executed queries during traversing uncomment at will
 
                        ##<td><b>${gr.repositories_recursive_count}</b></td>
 
                    </tr>
 
                  % endfor
 
              </table>
 
        </div>
 
        %endif
 
        <div class="panel-body">
 
            <table class="table" id="repos_list_wrap"></table>
 
        </div>
 
    </div>
 

	
 
      <script>
 
        $('#groups_list').DataTable({
 
            dom: '<"dataTables_left"f><"dataTables_right"ilp>t',
 
            pageLength: 100
 
        });
 

	
 
        var data = ${c.data|n},
 
            $dataTable = $("#repos_list_wrap").DataTable({
 
                data: data.records,
 
                columns: [
 
                    {data: "raw_name", visible: false, searchable: false},
 
                    {title: "${_('Repository')}", data: "name", orderData: [0,], render: {
 
                        filter: function(data, type, row, meta) {
 
                            return row.just_name;
 
                        }
 
                    }},
 
                    {data: "desc", title: "${_('Description')}", searchable: false},
 
                    {data: "last_change_iso", visible: false, searchable: false},
 
                    {data: "last_change", title: "${_('Last Change')}", orderData: [3,], searchable: false},
 
                    {data: "last_rev_raw", visible: false, searchable: false},
 
                    {data: "last_changeset", title: "${_('Tip')}", orderData: [5,], searchable: false},
 
                    {data: "owner", title: "${_('Owner')}", searchable: false},
 
                    {data: "atom", sortable: false}
 
                ],
 
                order: [[1, "asc"]],
 
                dom: '<"dataTables_left"f><"dataTables_right"ilp>t',
 
                pageLength: 100
 
            });
 
      </script>
0 comments (0 inline, 0 general)