Changeset - ed78b4fbe2a3
kallithea/bin/kallithea_cli_ssh.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 
import os
 
import re
 
import shlex
 
import sys
 

	
 
import click
 

	
 
import kallithea
 
import kallithea.bin.kallithea_cli_base as cli_base
 
from kallithea.lib.utils2 import str2bool
 
from kallithea.lib.vcs.backends.git.ssh import GitSshHandler
 
from kallithea.lib.vcs.backends.hg.ssh import MercurialSshHandler
 
from kallithea.model.ssh_key import SshKeyModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
@cli_base.register_command(config_file_initialize_app=True, hidden=True)
 
@click.argument('user-id', type=click.INT, required=True)
 
@click.argument('key-id', type=click.INT, required=True)
 
def ssh_serve(user_id, key_id):
 
    """Serve SSH repository protocol access.
 

	
 
    The trusted command that is invoked from .ssh/authorized_keys to serve SSH
 
    protocol access. The access will be granted as the specified user ID, and
 
    logged as using the specified key ID.
 
    """
 
    ssh_enabled = kallithea.CONFIG.get('ssh_enabled', False)
 
    if not str2bool(ssh_enabled):
 
        sys.stderr.write("SSH access is disabled.\n")
 
        return sys.exit(1)
 

	
 
    ssh_locale = kallithea.CONFIG.get('ssh_locale')
 
    if ssh_locale:
 
        os.environ['LC_ALL'] = ssh_locale # trumps everything, including LANG, except LANGUAGE
 
        os.environ['LANGUAGE'] = ssh_locale # trumps LC_ALL for GNU gettext message handling
 

	
 
    ssh_original_command = os.environ.get('SSH_ORIGINAL_COMMAND', '')
 
    client_ip = os.environ.get('SSH_CONNECTION', '').split(' ', 1)[0] or '0.0.0.0'
 
    log.debug('ssh-serve was invoked for SSH command %r from %s', ssh_original_command, client_ip)
 

	
 
    if not ssh_original_command:
 
        if os.environ.get('SSH_CONNECTION'):
 
            sys.stderr.write("'kallithea-cli ssh-serve' can only provide protocol access over SSH. Interactive SSH login for this user is disabled.\n")
 
        else:
 
            sys.stderr.write("'kallithea-cli ssh-serve' cannot be called directly. It must be specified as command in an SSH authorized_keys file.\n")
 
        return sys.exit(1)
 

	
 
    try:
 
        ssh_command_parts = shlex.split(ssh_original_command)
 
    except ValueError as e:
 
        sys.stderr.write('Error parsing SSH command %r: %s\n' % (ssh_original_command, e))
 
        sys.exit(1)
 
    for VcsHandler in [MercurialSshHandler, GitSshHandler]:
 
        vcs_handler = VcsHandler.make(ssh_command_parts)
 
        if vcs_handler is not None:
 
            vcs_handler.serve(user_id, key_id, client_ip)
 
            assert False # serve is written so it never will terminate
 

	
 
    sys.stderr.write("This account can only be used for repository access. SSH command %r is not supported.\n" % ssh_original_command)
 
    sys.exit(1)
 

	
 

	
 
@cli_base.register_command(config_file_initialize_app=True)
 
def ssh_update_authorized_keys():
 
    """Update .ssh/authorized_keys file.
 

	
 
    The file is usually maintained automatically, but this command will also re-write it.
 
    """
 

	
 
    SshKeyModel().write_authorized_keys()
kallithea/controllers/admin/defaults.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.admin.defaults
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
default settings controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 27, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
import formencode
 
from formencode import htmlfill
 
from tg import request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPFound
 

	
 
from kallithea.config.routing import url
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasPermissionAnyDecorator, LoginRequired
 
from kallithea.lib.base import BaseController, render
 
from kallithea.model.db import Setting
 
from kallithea.model.forms import DefaultsForm
 
from kallithea.model.meta import Session
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class DefaultsController(BaseController):
 

	
 
    @LoginRequired()
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def _before(self, *args, **kwargs):
 
        super(DefaultsController, self)._before(*args, **kwargs)
 

	
 
    def index(self, format='html'):
 
        defaults = Setting.get_default_repo_settings()
 

	
 
        return htmlfill.render(
 
            render('admin/defaults/defaults.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False
 
        )
 

	
 
    def update(self, id):
 
        _form = DefaultsForm()()
 

	
 
        try:
 
            form_result = _form.to_python(dict(request.POST))
 
            for k, v in form_result.iteritems():
 
                setting = Setting.create_or_update(k, v)
 
            Session().commit()
 
            h.flash(_('Default settings updated successfully'),
 
                    category='success')
 

	
 
        except formencode.Invalid as errors:
 
            defaults = errors.value
 

	
 
            return htmlfill.render(
 
                render('admin/defaults/defaults.html'),
 
                defaults=defaults,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8",
 
                force_defaults=False)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during update of defaults'),
 
                    category='error')
 

	
 
        raise HTTPFound(location=url('defaults'))
kallithea/controllers/admin/repo_groups.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.admin.repo_groups
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Repository groups controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Mar 23, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
import formencode
 
from formencode import htmlfill
 
from tg import app_globals, request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from tg.i18n import ungettext
 
from webob.exc import HTTPForbidden, HTTPFound, HTTPInternalServerError, HTTPNotFound
 

	
 
import kallithea
 
from kallithea.config.routing import url
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasPermissionAny, HasRepoGroupPermissionLevel, HasRepoGroupPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.base import BaseController, render
 
from kallithea.lib.utils2 import safe_int, safe_unicode
 
from kallithea.model.db import RepoGroup, Repository
 
from kallithea.model.forms import RepoGroupForm, RepoGroupPermsForm
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.scm import AvailableRepoGroupChoices, RepoGroupList
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class RepoGroupsController(BaseController):
 

	
 
    @LoginRequired(allow_default_user=True)
 
    def _before(self, *args, **kwargs):
 
        super(RepoGroupsController, self)._before(*args, **kwargs)
 

	
 
    def __load_defaults(self, extras=(), exclude=()):
 
        """extras is used for keeping current parent ignoring permissions
 
        exclude is used for not moving group to itself TODO: also exclude descendants
 
        Note: only admin can create top level groups
 
        """
 
        repo_groups = AvailableRepoGroupChoices([], 'admin', extras)
 
        exclude_group_ids = set(rg.group_id for rg in exclude)
 
        c.repo_groups = [rg for rg in repo_groups
 
                         if rg[0] not in exclude_group_ids]
 

	
 
    def __load_data(self, group_id):
 
        """
 
        Load defaults settings for edit, and update
 

	
 
        :param group_id:
 
        """
 
        repo_group = RepoGroup.get_or_404(group_id)
 
        data = repo_group.get_dict()
 
        data['group_name'] = repo_group.name
 

	
 
        # fill repository group users
 
        for p in repo_group.repo_group_to_perm:
 
            data.update({'u_perm_%s' % p.user.username:
 
                             p.permission.permission_name})
 

	
 
        # fill repository group groups
 
        for p in repo_group.users_group_to_perm:
 
            data.update({'g_perm_%s' % p.users_group.users_group_name:
 
                             p.permission.permission_name})
 

	
 
        return data
 

	
 
    def _revoke_perms_on_yourself(self, form_result):
 
        _up = [u for u in form_result['perms_updates'] if request.authuser.username == u[0]]
 
        _new = [u for u in form_result['perms_new'] if request.authuser.username == u[0]]
 
        if _new and _new[0][1] != 'group.admin' or _up and _up[0][1] != 'group.admin':
 
            return True
 
        return False
 

	
 
    def index(self, format='html'):
 
        _list = RepoGroup.query(sorted=True).all()
 
        group_iter = RepoGroupList(_list, perm_level='admin')
 
        repo_groups_data = []
 
        total_records = len(group_iter)
 
        _tmpl_lookup = app_globals.mako_lookup
 
        template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
 

	
 
        repo_group_name = lambda repo_group_name, children_groups: (
 
            template.get_def("repo_group_name")
 
            .render(repo_group_name, children_groups, _=_, h=h, c=c)
 
        )
 
        repo_group_actions = lambda repo_group_id, repo_group_name, gr_count: (
 
            template.get_def("repo_group_actions")
 
            .render(repo_group_id, repo_group_name, gr_count, _=_, h=h, c=c,
 
                    ungettext=ungettext)
 
        )
 

	
 
        for repo_gr in group_iter:
 
            children_groups = [safe_unicode(g.name) for g in repo_gr.parents] + [safe_unicode(repo_gr.name)]
 
            repo_count = repo_gr.repositories.count()
 
            repo_groups_data.append({
 
                "raw_name": repo_gr.group_name,
 
                "group_name": repo_group_name(repo_gr.group_name, children_groups),
 
                "desc": h.escape(repo_gr.group_description),
 
                "repos": repo_count,
 
                "owner": h.person(repo_gr.owner),
 
                "action": repo_group_actions(repo_gr.group_id, repo_gr.group_name,
 
                                             repo_count)
 
            })
 

	
 
        c.data = {
 
            "sort": None,
 
            "dir": "asc",
 
            "records": repo_groups_data
 
        }
 

	
 
        return render('admin/repo_groups/repo_groups.html')
 

	
 
    def create(self):
 
        self.__load_defaults()
 

	
 
        # permissions for can create group based on parent_id are checked
 
        # here in the Form
 
        repo_group_form = RepoGroupForm(repo_groups=c.repo_groups)
 
        try:
 
            form_result = repo_group_form.to_python(dict(request.POST))
 
            gr = RepoGroupModel().create(
 
                group_name=form_result['group_name'],
 
                group_description=form_result['group_description'],
 
                parent=form_result['parent_group_id'],
 
                owner=request.authuser.user_id, # TODO: make editable
 
                copy_permissions=form_result['group_copy_permissions']
 
            )
 
            Session().commit()
 
            # TODO: in future action_logger(, '', '', '')
 
        except formencode.Invalid as errors:
 
            return htmlfill.render(
 
                render('admin/repo_groups/repo_group_add.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8",
 
                force_defaults=False)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during creation of repository group %s')
 
                    % request.POST.get('group_name'), category='error')
 
            parent_group_id = form_result['parent_group_id']
 
            # TODO: maybe we should get back to the main view, not the admin one
 
            raise HTTPFound(location=url('repos_groups', parent_group=parent_group_id))
 
        h.flash(_('Created repository group %s') % gr.group_name,
 
                category='success')
 
        raise HTTPFound(location=url('repos_group_home', group_name=gr.group_name))
 

	
 
    def new(self):
 
        if HasPermissionAny('hg.admin')('group create'):
 
            # we're global admin, we're ok and we can create TOP level groups
 
            pass
 
        else:
 
            # we pass in parent group into creation form, thus we know
 
            # what would be the group, we can check perms here !
 
            group_id = safe_int(request.GET.get('parent_group'))
 
            group = RepoGroup.get(group_id) if group_id else None
 
            group_name = group.group_name if group else None
 
            if HasRepoGroupPermissionLevel('admin')(group_name, 'group create'):
 
                pass
 
            else:
 
                raise HTTPForbidden()
 

	
 
        self.__load_defaults()
 
        return render('admin/repo_groups/repo_group_add.html')
 

	
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def update(self, group_name):
 
        c.repo_group = RepoGroup.guess_instance(group_name)
 
        self.__load_defaults(extras=[c.repo_group.parent_group],
 
                             exclude=[c.repo_group])
 

	
 
        # TODO: kill allow_empty_group - it is only used for redundant form validation!
 
        if HasPermissionAny('hg.admin')('group edit'):
 
            # we're global admin, we're ok and we can create TOP level groups
 
            allow_empty_group = True
 
        elif not c.repo_group.parent_group:
 
            allow_empty_group = True
 
        else:
 
            allow_empty_group = False
 
        repo_group_form = RepoGroupForm(
 
            edit=True,
 
            old_data=c.repo_group.get_dict(),
 
            repo_groups=c.repo_groups,
 
            can_create_in_root=allow_empty_group,
 
        )()
 
        try:
 
            form_result = repo_group_form.to_python(dict(request.POST))
 

	
 
            new_gr = RepoGroupModel().update(group_name, form_result)
 
            Session().commit()
 
            h.flash(_('Updated repository group %s')
 
                    % form_result['group_name'], category='success')
 
            # we now have new name !
 
            group_name = new_gr.group_name
 
            # TODO: in future action_logger(, '', '', '')
 
        except formencode.Invalid as errors:
 
            c.active = 'settings'
 
            return htmlfill.render(
 
                render('admin/repo_groups/repo_group_edit.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8",
 
                force_defaults=False)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during update of repository group %s')
 
                    % request.POST.get('group_name'), category='error')
 

	
 
        raise HTTPFound(location=url('edit_repo_group', group_name=group_name))
 

	
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def delete(self, group_name):
 
        gr = c.repo_group = RepoGroup.guess_instance(group_name)
 
        repos = gr.repositories.all()
 
        if repos:
 
            h.flash(_('This group contains %s repositories and cannot be '
 
                      'deleted') % len(repos), category='warning')
 
            raise HTTPFound(location=url('repos_groups'))
 

	
 
        children = gr.children.all()
 
        if children:
 
            h.flash(_('This group contains %s subgroups and cannot be deleted'
 
                      % (len(children))), category='warning')
 
            raise HTTPFound(location=url('repos_groups'))
 

	
 
        try:
 
            RepoGroupModel().delete(group_name)
 
            Session().commit()
 
            h.flash(_('Removed repository group %s') % group_name,
 
                    category='success')
 
            # TODO: in future action_logger(, '', '', '')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during deletion of repository group %s')
 
                    % group_name, category='error')
 

	
 
        if gr.parent_group:
 
            raise HTTPFound(location=url('repos_group_home', group_name=gr.parent_group.group_name))
 
        raise HTTPFound(location=url('repos_groups'))
 

	
 
    def show_by_name(self, group_name):
 
        """
 
        This is a proxy that does a lookup group_name -> id, and shows
 
        the group by id view instead
 
        """
 
        group_name = group_name.rstrip('/')
 
        id_ = RepoGroup.get_by_group_name(group_name)
 
        if id_:
 
            return self.show(group_name)
 
        raise HTTPNotFound
 

	
 
    @HasRepoGroupPermissionLevelDecorator('read')
 
    def show(self, group_name):
 
        c.active = 'settings'
 

	
 
        c.group = c.repo_group = RepoGroup.guess_instance(group_name)
 

	
 
        groups = RepoGroup.query(sorted=True).filter_by(parent_group=c.group).all()
 
        repo_groups_list = self.scm_model.get_repo_groups(groups)
 

	
 
        repos_list = Repository.query(sorted=True).filter_by(group=c.group).all()
 
        c.data = RepoModel().get_repos_as_dict(repos_list,
 
                                               repo_groups_list=repo_groups_list,
 
                                               short_name=True)
 

	
 
        return render('admin/repo_groups/repo_group_show.html')
 

	
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def edit(self, group_name):
 
        c.active = 'settings'
 

	
 
        c.repo_group = RepoGroup.guess_instance(group_name)
 
        self.__load_defaults(extras=[c.repo_group.parent_group],
 
                             exclude=[c.repo_group])
 
        defaults = self.__load_data(c.repo_group.group_id)
 

	
 
        return htmlfill.render(
 
            render('admin/repo_groups/repo_group_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False
 
        )
 

	
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def edit_repo_group_advanced(self, group_name):
 
        c.active = 'advanced'
 
        c.repo_group = RepoGroup.guess_instance(group_name)
 

	
 
        return render('admin/repo_groups/repo_group_edit.html')
 

	
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def edit_repo_group_perms(self, group_name):
 
        c.active = 'perms'
 
        c.repo_group = RepoGroup.guess_instance(group_name)
 
        self.__load_defaults()
 
        defaults = self.__load_data(c.repo_group.group_id)
 

	
 
        return htmlfill.render(
 
            render('admin/repo_groups/repo_group_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False
 
        )
 

	
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def update_perms(self, group_name):
 
        """
 
        Update permissions for given repository group
 

	
 
        :param group_name:
 
        """
 

	
 
        c.repo_group = RepoGroup.guess_instance(group_name)
 
        valid_recursive_choices = ['none', 'repos', 'groups', 'all']
 
        form_result = RepoGroupPermsForm(valid_recursive_choices)().to_python(request.POST)
 
        if not request.authuser.is_admin:
 
            if self._revoke_perms_on_yourself(form_result):
 
                msg = _('Cannot revoke permission for yourself as admin')
 
                h.flash(msg, category='warning')
 
                raise HTTPFound(location=url('edit_repo_group_perms', group_name=group_name))
 
        recursive = form_result['recursive']
 
        # iterate over all members(if in recursive mode) of this groups and
 
        # set the permissions !
 
        # this can be potentially heavy operation
 
        RepoGroupModel()._update_permissions(c.repo_group,
 
                                             form_result['perms_new'],
 
                                             form_result['perms_updates'],
 
                                             recursive)
 
        # TODO: implement this
 
        #action_logger(request.authuser, 'admin_changed_repo_permissions',
 
        #              repo_name, request.ip_addr)
 
        Session().commit()
 
        h.flash(_('Repository group permissions updated'), category='success')
 
        raise HTTPFound(location=url('edit_repo_group_perms', group_name=group_name))
 

	
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def delete_perms(self, group_name):
 
        try:
 
            obj_type = request.POST.get('obj_type')
 
            obj_id = None
 
            if obj_type == 'user':
 
                obj_id = safe_int(request.POST.get('user_id'))
 
            elif obj_type == 'user_group':
 
                obj_id = safe_int(request.POST.get('user_group_id'))
 

	
 
            if not request.authuser.is_admin:
 
                if obj_type == 'user' and request.authuser.user_id == obj_id:
 
                    msg = _('Cannot revoke permission for yourself as admin')
 
                    h.flash(msg, category='warning')
 
                    raise Exception('revoke admin permission on self')
 
            recursive = request.POST.get('recursive', 'none')
 
            if obj_type == 'user':
 
                RepoGroupModel().delete_permission(repo_group=group_name,
 
                                                   obj=obj_id, obj_type='user',
 
                                                   recursive=recursive)
 
            elif obj_type == 'user_group':
 
                RepoGroupModel().delete_permission(repo_group=group_name,
 
                                                   obj=obj_id,
 
                                                   obj_type='user_group',
 
                                                   recursive=recursive)
 

	
 
            Session().commit()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during revoking of permission'),
 
                    category='error')
 
            raise HTTPInternalServerError()
kallithea/controllers/admin/user_groups.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.admin.user_groups
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
User Groups crud controller
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jan 25, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
import formencode
 
from formencode import htmlfill
 
from sqlalchemy.orm import joinedload
 
from sqlalchemy.sql.expression import func
 
from tg import app_globals, request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPFound, HTTPInternalServerError
 

	
 
import kallithea
 
from kallithea.config.routing import url
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasPermissionAnyDecorator, HasUserGroupPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.base import BaseController, render
 
from kallithea.lib.exceptions import RepoGroupAssignmentError, UserGroupsAssignedException
 
from kallithea.lib.utils import action_logger
 
from kallithea.lib.utils2 import safe_int, safe_unicode
 
from kallithea.model.db import User, UserGroup, UserGroupRepoGroupToPerm, UserGroupRepoToPerm, UserGroupToPerm
 
from kallithea.model.forms import CustomDefaultPermissionsForm, UserGroupForm, UserGroupPermsForm
 
from kallithea.model.meta import Session
 
from kallithea.model.scm import UserGroupList
 
from kallithea.model.user_group import UserGroupModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UserGroupsController(BaseController):
 
    """REST Controller styled on the Atom Publishing Protocol"""
 

	
 
    @LoginRequired(allow_default_user=True)
 
    def _before(self, *args, **kwargs):
 
        super(UserGroupsController, self)._before(*args, **kwargs)
 

	
 
    def __load_data(self, user_group_id):
 
        c.group_members_obj = sorted((x.user for x in c.user_group.members),
 
                                     key=lambda u: u.username.lower())
 

	
 
        c.group_members = [(x.user_id, x.username) for x in c.group_members_obj]
 
        c.available_members = sorted(((x.user_id, x.username) for x in
 
                                      User.query().all()),
 
                                     key=lambda u: u[1].lower())
 

	
 
    def __load_defaults(self, user_group_id):
 
        """
 
        Load defaults settings for edit, and update
 

	
 
        :param user_group_id:
 
        """
 
        user_group = UserGroup.get_or_404(user_group_id)
 
        data = user_group.get_dict()
 
        return data
 

	
 
    def index(self, format='html'):
 
        _list = UserGroup.query() \
 
                        .order_by(func.lower(UserGroup.users_group_name)) \
 
                        .all()
 
        group_iter = UserGroupList(_list, perm_level='admin')
 
        user_groups_data = []
 
        total_records = len(group_iter)
 
        _tmpl_lookup = app_globals.mako_lookup
 
        template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
 

	
 
        user_group_name = lambda user_group_id, user_group_name: (
 
            template.get_def("user_group_name")
 
            .render(user_group_id, user_group_name, _=_, h=h, c=c)
 
        )
 
        user_group_actions = lambda user_group_id, user_group_name: (
 
            template.get_def("user_group_actions")
 
            .render(user_group_id, user_group_name, _=_, h=h, c=c)
 
        )
 
        for user_gr in group_iter:
 

	
 
            user_groups_data.append({
 
                "raw_name": user_gr.users_group_name,
 
                "group_name": user_group_name(user_gr.users_group_id,
 
                                              user_gr.users_group_name),
 
                "desc": h.escape(user_gr.user_group_description),
 
                "members": len(user_gr.members),
 
                "active": h.boolicon(user_gr.users_group_active),
 
                "owner": h.person(user_gr.owner.username),
 
                "action": user_group_actions(user_gr.users_group_id, user_gr.users_group_name)
 
            })
 

	
 
        c.data = {
 
            "sort": None,
 
            "dir": "asc",
 
            "records": user_groups_data
 
        }
 

	
 
        return render('admin/user_groups/user_groups.html')
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.usergroup.create.true')
 
    def create(self):
 
        users_group_form = UserGroupForm()()
 
        try:
 
            form_result = users_group_form.to_python(dict(request.POST))
 
            ug = UserGroupModel().create(name=form_result['users_group_name'],
 
                                         description=form_result['user_group_description'],
 
                                         owner=request.authuser.user_id,
 
                                         active=form_result['users_group_active'])
 

	
 
            gr = form_result['users_group_name']
 
            action_logger(request.authuser,
 
                          'admin_created_users_group:%s' % gr,
 
                          None, request.ip_addr)
 
            h.flash(h.HTML(_('Created user group %s')) % h.link_to(gr, url('edit_users_group', id=ug.users_group_id)),
 
                category='success')
 
            Session().commit()
 
        except formencode.Invalid as errors:
 
            return htmlfill.render(
 
                render('admin/user_groups/user_group_add.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8",
 
                force_defaults=False)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during creation of user group %s')
 
                    % request.POST.get('users_group_name'), category='error')
 

	
 
        raise HTTPFound(location=url('users_groups'))
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.usergroup.create.true')
 
    def new(self, format='html'):
 
        return render('admin/user_groups/user_group_add.html')
 

	
 
    @HasUserGroupPermissionLevelDecorator('admin')
 
    def update(self, id):
 
        c.user_group = UserGroup.get_or_404(id)
 
        c.active = 'settings'
 
        self.__load_data(id)
 

	
 
        available_members = [safe_unicode(x[0]) for x in c.available_members]
 

	
 
        users_group_form = UserGroupForm(edit=True,
 
                                         old_data=c.user_group.get_dict(),
 
                                         available_members=available_members)()
 

	
 
        try:
 
            form_result = users_group_form.to_python(request.POST)
 
            UserGroupModel().update(c.user_group, form_result)
 
            gr = form_result['users_group_name']
 
            action_logger(request.authuser,
 
                          'admin_updated_users_group:%s' % gr,
 
                          None, request.ip_addr)
 
            h.flash(_('Updated user group %s') % gr, category='success')
 
            Session().commit()
 
        except formencode.Invalid as errors:
 
            ug_model = UserGroupModel()
 
            defaults = errors.value
 
            e = errors.error_dict or {}
 
            defaults.update({
 
                'create_repo_perm': ug_model.has_perm(id,
 
                                                      'hg.create.repository'),
 
                'fork_repo_perm': ug_model.has_perm(id,
 
                                                    'hg.fork.repository'),
 
            })
 

	
 
            return htmlfill.render(
 
                render('admin/user_groups/user_group_edit.html'),
 
                defaults=defaults,
 
                errors=e,
 
                prefix_error=False,
 
                encoding="UTF-8",
 
                force_defaults=False)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during update of user group %s')
 
                    % request.POST.get('users_group_name'), category='error')
 

	
 
        raise HTTPFound(location=url('edit_users_group', id=id))
 

	
 
    @HasUserGroupPermissionLevelDecorator('admin')
 
    def delete(self, id):
 
        usr_gr = UserGroup.get_or_404(id)
 
        try:
 
            UserGroupModel().delete(usr_gr)
 
            Session().commit()
 
            h.flash(_('Successfully deleted user group'), category='success')
 
        except UserGroupsAssignedException as e:
 
            h.flash(e, category='error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during deletion of user group'),
 
                    category='error')
 
        raise HTTPFound(location=url('users_groups'))
 

	
 
    @HasUserGroupPermissionLevelDecorator('admin')
 
    def edit(self, id, format='html'):
 
        c.user_group = UserGroup.get_or_404(id)
 
        c.active = 'settings'
 
        self.__load_data(id)
 

	
 
        defaults = self.__load_defaults(id)
 

	
 
        return htmlfill.render(
 
            render('admin/user_groups/user_group_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False
 
        )
 

	
 
    @HasUserGroupPermissionLevelDecorator('admin')
 
    def edit_perms(self, id):
 
        c.user_group = UserGroup.get_or_404(id)
 
        c.active = 'perms'
 

	
 
        defaults = {}
 
        # fill user group users
 
        for p in c.user_group.user_user_group_to_perm:
 
            defaults.update({'u_perm_%s' % p.user.username:
 
                             p.permission.permission_name})
 

	
 
        for p in c.user_group.user_group_user_group_to_perm:
 
            defaults.update({'g_perm_%s' % p.user_group.users_group_name:
 
                             p.permission.permission_name})
 

	
 
        return htmlfill.render(
 
            render('admin/user_groups/user_group_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False
 
        )
 

	
 
    @HasUserGroupPermissionLevelDecorator('admin')
 
    def update_perms(self, id):
 
        """
 
        grant permission for given usergroup
 

	
 
        :param id:
 
        """
 
        user_group = UserGroup.get_or_404(id)
 
        form = UserGroupPermsForm()().to_python(request.POST)
 

	
 
        # set the permissions !
 
        try:
 
            UserGroupModel()._update_permissions(user_group, form['perms_new'],
 
                                                 form['perms_updates'])
 
        except RepoGroupAssignmentError:
 
            h.flash(_('Target group cannot be the same'), category='error')
 
            raise HTTPFound(location=url('edit_user_group_perms', id=id))
 
        # TODO: implement this
 
        #action_logger(request.authuser, 'admin_changed_repo_permissions',
 
        #              repo_name, request.ip_addr)
 
        Session().commit()
 
        h.flash(_('User group permissions updated'), category='success')
 
        raise HTTPFound(location=url('edit_user_group_perms', id=id))
 

	
 
    @HasUserGroupPermissionLevelDecorator('admin')
 
    def delete_perms(self, id):
 
        try:
 
            obj_type = request.POST.get('obj_type')
 
            obj_id = None
 
            if obj_type == 'user':
 
                obj_id = safe_int(request.POST.get('user_id'))
 
            elif obj_type == 'user_group':
 
                obj_id = safe_int(request.POST.get('user_group_id'))
 

	
 
            if not request.authuser.is_admin:
 
                if obj_type == 'user' and request.authuser.user_id == obj_id:
 
                    msg = _('Cannot revoke permission for yourself as admin')
 
                    h.flash(msg, category='warning')
 
                    raise Exception('revoke admin permission on self')
 
            if obj_type == 'user':
 
                UserGroupModel().revoke_user_permission(user_group=id,
 
                                                        user=obj_id)
 
            elif obj_type == 'user_group':
 
                UserGroupModel().revoke_user_group_permission(target_user_group=id,
 
                                                              user_group=obj_id)
 
            Session().commit()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during revoking of permission'),
 
                    category='error')
 
            raise HTTPInternalServerError()
 

	
 
    @HasUserGroupPermissionLevelDecorator('admin')
 
    def edit_default_perms(self, id):
 
        c.user_group = UserGroup.get_or_404(id)
 
        c.active = 'default_perms'
 

	
 
        permissions = {
 
            'repositories': {},
 
            'repositories_groups': {}
 
        }
 
        ugroup_repo_perms = UserGroupRepoToPerm.query() \
 
            .options(joinedload(UserGroupRepoToPerm.permission)) \
 
            .options(joinedload(UserGroupRepoToPerm.repository)) \
 
            .filter(UserGroupRepoToPerm.users_group_id == id) \
 
            .all()
 

	
 
        for gr in ugroup_repo_perms:
 
            permissions['repositories'][gr.repository.repo_name]  \
 
                = gr.permission.permission_name
 

	
 
        ugroup_group_perms = UserGroupRepoGroupToPerm.query() \
 
            .options(joinedload(UserGroupRepoGroupToPerm.permission)) \
 
            .options(joinedload(UserGroupRepoGroupToPerm.group)) \
 
            .filter(UserGroupRepoGroupToPerm.users_group_id == id) \
 
            .all()
 

	
 
        for gr in ugroup_group_perms:
 
            permissions['repositories_groups'][gr.group.group_name] \
 
                = gr.permission.permission_name
 
        c.permissions = permissions
 

	
 
        ug_model = UserGroupModel()
 

	
 
        defaults = c.user_group.get_dict()
 
        defaults.update({
 
            'create_repo_perm': ug_model.has_perm(c.user_group,
 
                                                  'hg.create.repository'),
 
            'create_user_group_perm': ug_model.has_perm(c.user_group,
 
                                                        'hg.usergroup.create.true'),
 
            'fork_repo_perm': ug_model.has_perm(c.user_group,
 
                                                'hg.fork.repository'),
 
        })
 

	
 
        return htmlfill.render(
 
            render('admin/user_groups/user_group_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False
 
        )
 

	
 
    @HasUserGroupPermissionLevelDecorator('admin')
 
    def update_default_perms(self, id):
 
        user_group = UserGroup.get_or_404(id)
 

	
 
        try:
 
            form = CustomDefaultPermissionsForm()()
 
            form_result = form.to_python(request.POST)
 

	
 
            usergroup_model = UserGroupModel()
 

	
 
            defs = UserGroupToPerm.query() \
 
                .filter(UserGroupToPerm.users_group == user_group) \
 
                .all()
 
            for ug in defs:
 
                Session().delete(ug)
 

	
 
            if form_result['create_repo_perm']:
 
                usergroup_model.grant_perm(id, 'hg.create.repository')
 
            else:
 
                usergroup_model.grant_perm(id, 'hg.create.none')
 
            if form_result['create_user_group_perm']:
 
                usergroup_model.grant_perm(id, 'hg.usergroup.create.true')
 
            else:
 
                usergroup_model.grant_perm(id, 'hg.usergroup.create.false')
 
            if form_result['fork_repo_perm']:
 
                usergroup_model.grant_perm(id, 'hg.fork.repository')
 
            else:
 
                usergroup_model.grant_perm(id, 'hg.fork.none')
 

	
 
            h.flash(_("Updated permissions"), category='success')
 
            Session().commit()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during permissions saving'),
 
                    category='error')
 

	
 
        raise HTTPFound(location=url('edit_user_group_default_perms', id=id))
 

	
 
    @HasUserGroupPermissionLevelDecorator('admin')
 
    def edit_advanced(self, id):
 
        c.user_group = UserGroup.get_or_404(id)
 
        c.active = 'advanced'
 
        c.group_members_obj = sorted((x.user for x in c.user_group.members),
 
                                     key=lambda u: u.username.lower())
 
        return render('admin/user_groups/user_group_edit.html')
 

	
 
    @HasUserGroupPermissionLevelDecorator('admin')
 
    def edit_members(self, id):
 
        c.user_group = UserGroup.get_or_404(id)
 
        c.active = 'members'
 
        c.group_members_obj = sorted((x.user for x in c.user_group.members),
 
                                     key=lambda u: u.username.lower())
 

	
 
        c.group_members = [(x.user_id, x.username) for x in c.group_members_obj]
 
        return render('admin/user_groups/user_group_edit.html')
kallithea/controllers/admin/users.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.admin.users
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Users crud controller
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 4, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
import formencode
 
from formencode import htmlfill
 
from sqlalchemy.sql.expression import func
 
from tg import app_globals, request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPFound, HTTPNotFound
 

	
 
import kallithea
 
from kallithea.config.routing import url
 
from kallithea.lib import auth_modules
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import AuthUser, HasPermissionAnyDecorator, LoginRequired
 
from kallithea.lib.base import BaseController, IfSshEnabled, render
 
from kallithea.lib.exceptions import DefaultUserException, UserCreationError, UserOwnsReposException
 
from kallithea.lib.utils import action_logger
 
from kallithea.lib.utils2 import datetime_to_time, generate_api_key, safe_int
 
from kallithea.model.api_key import ApiKeyModel
 
from kallithea.model.db import User, UserEmailMap, UserIpMap, UserToPerm
 
from kallithea.model.forms import CustomDefaultPermissionsForm, UserForm
 
from kallithea.model.meta import Session
 
from kallithea.model.ssh_key import SshKeyModel, SshKeyModelException
 
from kallithea.model.user import UserModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UsersController(BaseController):
 
    """REST Controller styled on the Atom Publishing Protocol"""
 

	
 
    @LoginRequired()
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def _before(self, *args, **kwargs):
 
        super(UsersController, self)._before(*args, **kwargs)
 

	
 
    def index(self, format='html'):
 
        c.users_list = User.query().order_by(User.username) \
 
                        .filter_by(is_default_user=False) \
 
                        .order_by(func.lower(User.username)) \
 
                        .all()
 

	
 
        users_data = []
 
        total_records = len(c.users_list)
 
        _tmpl_lookup = app_globals.mako_lookup
 
        template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
 

	
 
        grav_tmpl = '<div class="gravatar">%s</div>'
 

	
 
        username = lambda user_id, username: (
 
                template.get_def("user_name")
 
                .render(user_id, username, _=_, h=h, c=c))
 

	
 
        user_actions = lambda user_id, username: (
 
                template.get_def("user_actions")
 
                .render(user_id, username, _=_, h=h, c=c))
 

	
 
        for user in c.users_list:
 
            users_data.append({
 
                "gravatar": grav_tmpl % h.gravatar(user.email, size=20),
 
                "raw_name": user.username,
 
                "username": username(user.user_id, user.username),
 
                "firstname": h.escape(user.name),
 
                "lastname": h.escape(user.lastname),
 
                "last_login": h.fmt_date(user.last_login),
 
                "last_login_raw": datetime_to_time(user.last_login),
 
                "active": h.boolicon(user.active),
 
                "admin": h.boolicon(user.admin),
 
                "extern_type": user.extern_type,
 
                "extern_name": user.extern_name,
 
                "action": user_actions(user.user_id, user.username),
 
            })
 

	
 
        c.data = {
 
            "sort": None,
 
            "dir": "asc",
 
            "records": users_data
 
        }
 

	
 
        return render('admin/users/users.html')
 

	
 
    def create(self):
 
        c.default_extern_type = User.DEFAULT_AUTH_TYPE
 
        c.default_extern_name = ''
 
        user_model = UserModel()
 
        user_form = UserForm()()
 
        try:
 
            form_result = user_form.to_python(dict(request.POST))
 
            user = user_model.create(form_result)
 
            action_logger(request.authuser, 'admin_created_user:%s' % user.username,
 
                          None, request.ip_addr)
 
            h.flash(_('Created user %s') % user.username,
 
                    category='success')
 
            Session().commit()
 
        except formencode.Invalid as errors:
 
            return htmlfill.render(
 
                render('admin/users/user_add.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8",
 
                force_defaults=False)
 
        except UserCreationError as e:
 
            h.flash(e, 'error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during creation of user %s')
 
                    % request.POST.get('username'), category='error')
 
        raise HTTPFound(location=url('edit_user', id=user.user_id))
 

	
 
    def new(self, format='html'):
 
        c.default_extern_type = User.DEFAULT_AUTH_TYPE
 
        c.default_extern_name = ''
 
        return render('admin/users/user_add.html')
 

	
 
    def update(self, id):
 
        user_model = UserModel()
 
        user = user_model.get(id)
 
        _form = UserForm(edit=True, old_data={'user_id': id,
 
                                              'email': user.email})()
 
        form_result = {}
 
        try:
 
            form_result = _form.to_python(dict(request.POST))
 
            skip_attrs = ['extern_type', 'extern_name',
 
                         ] + auth_modules.get_managed_fields(user)
 

	
 
            user_model.update(id, form_result, skip_attrs=skip_attrs)
 
            usr = form_result['username']
 
            action_logger(request.authuser, 'admin_updated_user:%s' % usr,
 
                          None, request.ip_addr)
 
            h.flash(_('User updated successfully'), category='success')
 
            Session().commit()
 
        except formencode.Invalid as errors:
 
            defaults = errors.value
 
            e = errors.error_dict or {}
 
            defaults.update({
 
                'create_repo_perm': user_model.has_perm(id,
 
                                                        'hg.create.repository'),
 
                'fork_repo_perm': user_model.has_perm(id, 'hg.fork.repository'),
 
            })
 
            return htmlfill.render(
 
                self._render_edit_profile(user),
 
                defaults=defaults,
 
                errors=e,
 
                prefix_error=False,
 
                encoding="UTF-8",
 
                force_defaults=False)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during update of user %s')
 
                    % form_result.get('username'), category='error')
 
        raise HTTPFound(location=url('edit_user', id=id))
 

	
 
    def delete(self, id):
 
        usr = User.get_or_404(id)
 
        try:
 
            UserModel().delete(usr)
 
            Session().commit()
 
            h.flash(_('Successfully deleted user'), category='success')
 
        except (UserOwnsReposException, DefaultUserException) as e:
 
            h.flash(e, category='warning')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during deletion of user'),
 
                    category='error')
 
        raise HTTPFound(location=url('users'))
 

	
 
    def _get_user_or_raise_if_default(self, id):
 
        try:
 
            return User.get_or_404(id, allow_default=False)
 
        except DefaultUserException:
 
            h.flash(_("The default user cannot be edited"), category='warning')
 
            raise HTTPNotFound
 

	
 
    def _render_edit_profile(self, user):
 
        c.user = user
 
        c.active = 'profile'
 
        c.perm_user = AuthUser(dbuser=user)
 
        managed_fields = auth_modules.get_managed_fields(user)
 
        c.readonly = lambda n: 'readonly' if n in managed_fields else None
 
        return render('admin/users/user_edit.html')
 

	
 
    def edit(self, id, format='html'):
 
        user = self._get_user_or_raise_if_default(id)
 
        defaults = user.get_dict()
 

	
 
        return htmlfill.render(
 
            self._render_edit_profile(user),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def edit_advanced(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'advanced'
 
        c.perm_user = AuthUser(dbuser=c.user)
 

	
 
        umodel = UserModel()
 
        defaults = c.user.get_dict()
 
        defaults.update({
 
            'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'),
 
            'create_user_group_perm': umodel.has_perm(c.user,
 
                                                      'hg.usergroup.create.true'),
 
            'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'),
 
        })
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def edit_api_keys(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'api_keys'
 
        show_expired = True
 
        c.lifetime_values = [
 
            (str(-1), _('Forever')),
 
            (str(5), _('5 minutes')),
 
            (str(60), _('1 hour')),
 
            (str(60 * 24), _('1 day')),
 
            (str(60 * 24 * 30), _('1 month')),
 
        ]
 
        c.lifetime_options = [(c.lifetime_values, _("Lifetime"))]
 
        c.user_api_keys = ApiKeyModel().get_api_keys(c.user.user_id,
 
                                                     show_expired=show_expired)
 
        defaults = c.user.get_dict()
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def add_api_key(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 

	
 
        lifetime = safe_int(request.POST.get('lifetime'), -1)
 
        description = request.POST.get('description')
 
        ApiKeyModel().create(c.user.user_id, description, lifetime)
 
        Session().commit()
 
        h.flash(_("API key successfully created"), category='success')
 
        raise HTTPFound(location=url('edit_user_api_keys', id=c.user.user_id))
 

	
 
    def delete_api_key(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 

	
 
        api_key = request.POST.get('del_api_key')
 
        if request.POST.get('del_api_key_builtin'):
 
            c.user.api_key = generate_api_key()
 
            Session().commit()
 
            h.flash(_("API key successfully reset"), category='success')
 
        elif api_key:
 
            ApiKeyModel().delete(api_key, c.user.user_id)
 
            Session().commit()
 
            h.flash(_("API key successfully deleted"), category='success')
 

	
 
        raise HTTPFound(location=url('edit_user_api_keys', id=c.user.user_id))
 

	
 
    def update_account(self, id):
 
        pass
 

	
 
    def edit_perms(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'perms'
 
        c.perm_user = AuthUser(dbuser=c.user)
 

	
 
        umodel = UserModel()
 
        defaults = c.user.get_dict()
 
        defaults.update({
 
            'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'),
 
            'create_user_group_perm': umodel.has_perm(c.user,
 
                                                      'hg.usergroup.create.true'),
 
            'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'),
 
        })
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def update_perms(self, id):
 
        user = self._get_user_or_raise_if_default(id)
 

	
 
        try:
 
            form = CustomDefaultPermissionsForm()()
 
            form_result = form.to_python(request.POST)
 

	
 
            user_model = UserModel()
 

	
 
            defs = UserToPerm.query() \
 
                .filter(UserToPerm.user == user) \
 
                .all()
 
            for ug in defs:
 
                Session().delete(ug)
 

	
 
            if form_result['create_repo_perm']:
 
                user_model.grant_perm(id, 'hg.create.repository')
 
            else:
 
                user_model.grant_perm(id, 'hg.create.none')
 
            if form_result['create_user_group_perm']:
 
                user_model.grant_perm(id, 'hg.usergroup.create.true')
 
            else:
 
                user_model.grant_perm(id, 'hg.usergroup.create.false')
 
            if form_result['fork_repo_perm']:
 
                user_model.grant_perm(id, 'hg.fork.repository')
 
            else:
 
                user_model.grant_perm(id, 'hg.fork.none')
 
            h.flash(_("Updated permissions"), category='success')
 
            Session().commit()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during permissions saving'),
 
                    category='error')
 
        raise HTTPFound(location=url('edit_user_perms', id=id))
 

	
 
    def edit_emails(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'emails'
 
        c.user_email_map = UserEmailMap.query() \
 
            .filter(UserEmailMap.user == c.user).all()
 

	
 
        defaults = c.user.get_dict()
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def add_email(self, id):
 
        user = self._get_user_or_raise_if_default(id)
 
        email = request.POST.get('new_email')
 
        user_model = UserModel()
 

	
 
        try:
 
            user_model.add_extra_email(id, email)
 
            Session().commit()
 
            h.flash(_("Added email %s to user") % email, category='success')
 
        except formencode.Invalid as error:
 
            msg = error.error_dict['email']
 
            h.flash(msg, category='error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during email saving'),
 
                    category='error')
 
        raise HTTPFound(location=url('edit_user_emails', id=id))
 

	
 
    def delete_email(self, id):
 
        user = self._get_user_or_raise_if_default(id)
 
        email_id = request.POST.get('del_email_id')
 
        user_model = UserModel()
 
        user_model.delete_extra_email(id, email_id)
 
        Session().commit()
 
        h.flash(_("Removed email from user"), category='success')
 
        raise HTTPFound(location=url('edit_user_emails', id=id))
 

	
 
    def edit_ips(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'ips'
 
        c.user_ip_map = UserIpMap.query() \
 
            .filter(UserIpMap.user == c.user).all()
 

	
 
        c.default_user_ip_map = UserIpMap.query() \
 
            .filter(UserIpMap.user == User.get_default_user()).all()
 

	
 
        defaults = c.user.get_dict()
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def add_ip(self, id):
 
        ip = request.POST.get('new_ip')
 
        user_model = UserModel()
 

	
 
        try:
 
            user_model.add_extra_ip(id, ip)
 
            Session().commit()
 
            h.flash(_("Added IP address %s to user whitelist") % ip, category='success')
 
        except formencode.Invalid as error:
 
            msg = error.error_dict['ip']
 
            h.flash(msg, category='error')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred while adding IP address'),
 
                    category='error')
 

	
 
        if 'default_user' in request.POST:
 
            raise HTTPFound(location=url('admin_permissions_ips'))
 
        raise HTTPFound(location=url('edit_user_ips', id=id))
 

	
 
    def delete_ip(self, id):
 
        ip_id = request.POST.get('del_ip_id')
 
        user_model = UserModel()
 
        user_model.delete_extra_ip(id, ip_id)
 
        Session().commit()
 
        h.flash(_("Removed IP address from user whitelist"), category='success')
 

	
 
        if 'default_user' in request.POST:
 
            raise HTTPFound(location=url('admin_permissions_ips'))
 
        raise HTTPFound(location=url('edit_user_ips', id=id))
 

	
 
    @IfSshEnabled
 
    def edit_ssh_keys(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'ssh_keys'
 
        c.user_ssh_keys = SshKeyModel().get_ssh_keys(c.user.user_id)
 
        defaults = c.user.get_dict()
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @IfSshEnabled
 
    def ssh_keys_add(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 

	
 
        description = request.POST.get('description')
 
        public_key = request.POST.get('public_key')
 
        try:
 
            new_ssh_key = SshKeyModel().create(c.user.user_id,
 
                                               description, public_key)
 
            Session().commit()
 
            SshKeyModel().write_authorized_keys()
 
            h.flash(_("SSH key %s successfully added") % new_ssh_key.fingerprint, category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        raise HTTPFound(location=url('edit_user_ssh_keys', id=c.user.user_id))
 

	
 
    @IfSshEnabled
 
    def ssh_keys_delete(self, id):
 
        c.user = self._get_user_or_raise_if_default(id)
 

	
 
        public_key = request.POST.get('del_public_key')
 
        try:
 
            SshKeyModel().delete(public_key, c.user.user_id)
 
            Session().commit()
 
            SshKeyModel().write_authorized_keys()
 
            h.flash(_("SSH key successfully deleted"), category='success')
 
        except SshKeyModelException as errors:
 
            h.flash(errors.message, category='error')
 
        raise HTTPFound(location=url('edit_user_ssh_keys', id=c.user.user_id))
kallithea/controllers/changelog.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.changelog
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
changelog controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 21, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
from tg import request, session
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPBadRequest, HTTPFound, HTTPNotFound
 

	
 
import kallithea.lib.helpers as h
 
from kallithea.config.routing import url
 
from kallithea.lib.auth import HasRepoPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.base import BaseRepoController, render
 
from kallithea.lib.graphmod import graph_data
 
from kallithea.lib.page import Page
 
from kallithea.lib.utils2 import safe_int, safe_str
 
from kallithea.lib.utils2 import safe_int
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError, ChangesetError, EmptyRepositoryError, NodeDoesNotExistError, RepositoryError
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ChangelogController(BaseRepoController):
 

	
 
    def _before(self, *args, **kwargs):
 
        super(ChangelogController, self)._before(*args, **kwargs)
 
        c.affected_files_cut_off = 60
 

	
 
    @staticmethod
 
    def __get_cs(rev, repo):
 
        """
 
        Safe way to get changeset. If error occur fail with error message.
 

	
 
        :param rev: revision to fetch
 
        :param repo: repo instance
 
        """
 

	
 
        try:
 
            return c.db_repo_scm_instance.get_changeset(rev)
 
        except EmptyRepositoryError as e:
 
            h.flash(_('There are no changesets yet'), category='error')
 
        except RepositoryError as e:
 
            log.error(traceback.format_exc())
 
            h.flash(unicode(e), category='error')
 
        raise HTTPBadRequest()
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def index(self, repo_name, revision=None, f_path=None):
 
        limit = 2000
 
        default = 100
 
        if request.GET.get('size'):
 
            c.size = max(min(safe_int(request.GET.get('size')), limit), 1)
 
            session['changelog_size'] = c.size
 
            session.save()
 
        else:
 
            c.size = int(session.get('changelog_size', default))
 
        # min size must be 1
 
        c.size = max(c.size, 1)
 
        p = safe_int(request.GET.get('page'), 1)
 
        branch_name = request.GET.get('branch', None)
 
        if (branch_name and
 
            branch_name not in c.db_repo_scm_instance.branches and
 
            branch_name not in c.db_repo_scm_instance.closed_branches and
 
            not revision
 
        ):
 
            raise HTTPFound(location=url('changelog_file_home', repo_name=c.repo_name,
 
                                    revision=branch_name, f_path=f_path or ''))
 

	
 
        if revision == 'tip':
 
            revision = None
 

	
 
        c.changelog_for_path = f_path
 
        try:
 

	
 
            if f_path:
 
                log.debug('generating changelog for path %s', f_path)
 
                # get the history for the file !
 
                tip_cs = c.db_repo_scm_instance.get_changeset()
 
                try:
 
                    collection = tip_cs.get_file_history(f_path)
 
                except (NodeDoesNotExistError, ChangesetError):
 
                    # this node is not present at tip !
 
                    try:
 
                        cs = self.__get_cs(revision, repo_name)
 
                        collection = cs.get_file_history(f_path)
 
                    except RepositoryError as e:
 
                        h.flash(unicode(e), category='warning')
 
                        raise HTTPFound(location=h.url('changelog_home', repo_name=repo_name))
 
            else:
 
                collection = c.db_repo_scm_instance.get_changesets(start=0, end=revision,
 
                                                        branch_name=branch_name, reverse=True)
 
            c.total_cs = len(collection)
 

	
 
            c.cs_pagination = Page(collection, page=p, item_count=c.total_cs, items_per_page=c.size,
 
                                   branch=branch_name)
 

	
 
            page_revisions = [x.raw_id for x in c.cs_pagination]
 
            c.cs_comments = c.db_repo.get_comments(page_revisions)
 
            c.cs_statuses = c.db_repo.statuses(page_revisions)
 
        except EmptyRepositoryError as e:
 
            h.flash(unicode(e), category='warning')
 
            raise HTTPFound(location=url('summary_home', repo_name=c.repo_name))
 
        except (RepositoryError, ChangesetDoesNotExistError, Exception) as e:
 
            log.error(traceback.format_exc())
 
            h.flash(unicode(e), category='error')
 
            raise HTTPFound(location=url('changelog_home', repo_name=c.repo_name))
 

	
 
        c.branch_name = branch_name
 
        c.branch_filters = [('', _('None'))] + \
 
            [(k, k) for k in c.db_repo_scm_instance.branches.keys()]
 
        if c.db_repo_scm_instance.closed_branches:
 
            prefix = _('(closed)') + ' '
 
            c.branch_filters += [('-', '-')] + \
 
                [(k, prefix + k) for k in c.db_repo_scm_instance.closed_branches.keys()]
 
        revs = []
 
        if not f_path:
 
            revs = [x.revision for x in c.cs_pagination]
 
        c.jsdata = graph_data(c.db_repo_scm_instance, revs)
 

	
 
        c.revision = revision # requested revision ref
 
        c.first_revision = c.cs_pagination[0] # pagination is never empty here!
 
        return render('changelog/changelog.html')
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def changelog_details(self, cs):
 
        if request.environ.get('HTTP_X_PARTIAL_XHR'):
 
            c.cs = c.db_repo_scm_instance.get_changeset(cs)
 
            return render('changelog/changelog_details.html')
 
        raise HTTPNotFound()
kallithea/controllers/summary.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.summary
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Summary controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 18, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import calendar
 
import itertools
 
import logging
 
import traceback
 
from datetime import date, timedelta
 
from time import mktime
 

	
 
from beaker.cache import cache_region
 
from tg import request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPBadRequest
 

	
 
import kallithea.lib.helpers as h
 
from kallithea.config.conf import ALL_EXTS, ALL_READMES, LANGUAGES_EXTENSIONS_MAP
 
from kallithea.lib.auth import HasRepoPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.base import BaseRepoController, jsonify, render
 
from kallithea.lib.celerylib.tasks import get_commits_stats
 
from kallithea.lib.compat import json
 
from kallithea.lib.markup_renderer import MarkupRenderer
 
from kallithea.lib.page import Page
 
from kallithea.lib.utils2 import safe_int, safe_str
 
from kallithea.lib.utils2 import safe_int
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.exceptions import ChangesetError, EmptyRepositoryError, NodeDoesNotExistError
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.model.db import Statistics
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
README_FILES = [''.join([x[0][0], x[1][0]]) for x in
 
                    sorted(list(itertools.product(ALL_READMES, ALL_EXTS)),
 
                           key=lambda y:y[0][1] + y[1][1])]
 

	
 

	
 
class SummaryController(BaseRepoController):
 

	
 
    def __get_readme_data(self, db_repo):
 
        repo_name = db_repo.repo_name
 
        log.debug('Looking for README file')
 

	
 
        @cache_region('long_term', '_get_readme_from_cache')
 
        def _get_readme_from_cache(*_cache_keys):  # parameters are not really used - only as caching key
 
            readme_data = None
 
            readme_file = None
 
            try:
 
                # gets the landing revision! or tip if fails
 
                cs = db_repo.get_landing_changeset()
 
                if isinstance(cs, EmptyChangeset):
 
                    raise EmptyRepositoryError()
 
                renderer = MarkupRenderer()
 
                for f in README_FILES:
 
                    try:
 
                        readme = cs.get_node(f)
 
                        if not isinstance(readme, FileNode):
 
                            continue
 
                        readme_file = f
 
                        log.debug('Found README file `%s` rendering...',
 
                                  readme_file)
 
                        readme_data = renderer.render(readme.content,
 
                                                      filename=f)
 
                        break
 
                    except NodeDoesNotExistError:
 
                        continue
 
            except ChangesetError:
 
                log.error(traceback.format_exc())
 
                pass
 
            except EmptyRepositoryError:
 
                pass
 

	
 
            return readme_data, readme_file
 

	
 
        kind = 'README'
 
        return _get_readme_from_cache(repo_name, kind, c.db_repo.changeset_cache.get('raw_id'))
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def index(self, repo_name):
 
        p = safe_int(request.GET.get('page'), 1)
 
        size = safe_int(request.GET.get('size'), 10)
 
        try:
 
            collection = c.db_repo_scm_instance.get_changesets(reverse=True)
 
        except EmptyRepositoryError as e:
 
            h.flash(unicode(e), category='warning')
 
            collection = []
 
        c.cs_pagination = Page(collection, page=p, items_per_page=size)
 
        page_revisions = [x.raw_id for x in list(c.cs_pagination)]
 
        c.cs_comments = c.db_repo.get_comments(page_revisions)
 
        c.cs_statuses = c.db_repo.statuses(page_revisions)
 

	
 
        c.ssh_repo_url = None
 
        if request.authuser.is_default_user:
 
            username = None
 
        else:
 
            username = request.authuser.username
 
            if c.ssh_enabled:
 
                c.ssh_repo_url = c.db_repo.clone_url(clone_uri_tmpl=c.clone_ssh_tmpl)
 

	
 
        c.clone_repo_url = c.db_repo.clone_url(clone_uri_tmpl=c.clone_uri_tmpl, with_id=False, username=username)
 
        c.clone_repo_url_id = c.db_repo.clone_url(clone_uri_tmpl=c.clone_uri_tmpl, with_id=True, username=username)
 

	
 
        if c.db_repo.enable_statistics:
 
            c.show_stats = True
 
        else:
 
            c.show_stats = False
 

	
 
        stats = Statistics.query() \
 
            .filter(Statistics.repository == c.db_repo) \
 
            .scalar()
 

	
 
        c.stats_percentage = 0
 

	
 
        if stats and stats.languages:
 
            c.no_data = False is c.db_repo.enable_statistics
 
            lang_stats_d = json.loads(stats.languages)
 

	
 
            lang_stats = [(x, {"count": y,
 
                               "desc": LANGUAGES_EXTENSIONS_MAP.get(x, '?')})
 
                          for x, y in lang_stats_d.items()]
 
            lang_stats.sort(key=lambda k: (-k[1]['count'], k[0]))
 

	
 
            c.trending_languages = lang_stats[:10]
 
        else:
 
            c.no_data = True
 
            c.trending_languages = []
 

	
 
        c.enable_downloads = c.db_repo.enable_downloads
 
        c.readme_data, c.readme_file = \
 
            self.__get_readme_data(c.db_repo)
 
        return render('summary/summary.html')
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    @jsonify
 
    def repo_size(self, repo_name):
 
        if request.is_xhr:
 
            return c.db_repo._repo_size()
 
        else:
 
            raise HTTPBadRequest()
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def statistics(self, repo_name):
 
        if c.db_repo.enable_statistics:
 
            c.show_stats = True
 
            c.no_data_msg = _('No data ready yet')
 
        else:
 
            c.show_stats = False
 
            c.no_data_msg = _('Statistics are disabled for this repository')
 

	
 
        td = date.today() + timedelta(days=1)
 
        td_1m = td - timedelta(days=calendar.mdays[td.month])
 
        td_1y = td - timedelta(days=365)
 

	
 
        ts_min_m = mktime(td_1m.timetuple())
 
        ts_min_y = mktime(td_1y.timetuple())
 
        ts_max_y = mktime(td.timetuple())
 
        c.ts_min = ts_min_m
 
        c.ts_max = ts_max_y
 

	
 
        stats = Statistics.query() \
 
            .filter(Statistics.repository == c.db_repo) \
 
            .scalar()
 
        c.stats_percentage = 0
 
        if stats and stats.languages:
 
            c.no_data = False is c.db_repo.enable_statistics
 
            lang_stats_d = json.loads(stats.languages)
 
            c.commit_data = json.loads(stats.commit_activity)
 
            c.overview_data = json.loads(stats.commit_activity_combined)
 

	
 
            lang_stats = ((x, {"count": y,
 
                               "desc": LANGUAGES_EXTENSIONS_MAP.get(x)})
 
                          for x, y in lang_stats_d.items())
 

	
 
            c.trending_languages = (
 
                sorted(lang_stats, reverse=True, key=lambda k: k[1])[:10]
 
            )
 
            last_rev = stats.stat_on_revision + 1
 
            c.repo_last_rev = c.db_repo_scm_instance.count() \
 
                if c.db_repo_scm_instance.revisions else 0
 
            if last_rev == 0 or c.repo_last_rev == 0:
 
                pass
 
            else:
 
                c.stats_percentage = '%.2f' % ((float((last_rev)) /
 
                                                c.repo_last_rev) * 100)
 
        else:
 
            c.commit_data = {}
 
            c.overview_data = ([[ts_min_y, 0], [ts_max_y, 10]])
 
            c.trending_languages = {}
 
            c.no_data = True
 

	
 
        recurse_limit = 500  # don't recurse more than 500 times when parsing
 
        get_commits_stats(c.db_repo.repo_name, ts_min_y, ts_max_y, recurse_limit)
 
        return render('summary/statistics.html')
kallithea/lib/auth_modules/auth_internal.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.auth_modules.auth_internal
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Kallithea authentication plugin for built in internal auth
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Created on Nov 17, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import logging
 

	
 
from kallithea.lib import auth_modules
 
from kallithea.lib.compat import formatted_json, hybrid_property
 
from kallithea.model.db import User
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class KallitheaAuthPlugin(auth_modules.KallitheaAuthPluginBase):
 
    def __init__(self):
 
        pass
 

	
 
    @hybrid_property
 
    def name(self):
 
        # Also found as kallithea.lib.model.db.User.DEFAULT_AUTH_TYPE
 
        return 'internal'
 

	
 
    def settings(self):
 
        return []
 

	
 
    def accepts(self, user, accepts_empty=True):
 
        """
 
        Custom accepts for this auth that doesn't accept empty users. We
 
        know that user exists in database.
 
        """
 
        return super(KallitheaAuthPlugin, self).accepts(user,
 
                                                        accepts_empty=False)
 

	
 
    def auth(self, userobj, username, password, settings, **kwargs):
 
        if not userobj:
 
            log.debug('userobj was:%s skipping', userobj)
 
            return None
 
        if userobj.extern_type != self.name:
 
            log.warning("userobj:%s extern_type mismatch got:`%s` expected:`%s`",
 
                     userobj, userobj.extern_type, self.name)
 
            return None
 
        if not username:
 
            log.debug('Empty username - skipping...')
 
            return None
 

	
 
        user_data = {
 
            "username": userobj.username,
 
            "firstname": userobj.firstname,
 
            "lastname": userobj.lastname,
 
            "groups": [],
 
            "email": userobj.email,
 
            "admin": userobj.admin,
 
            "extern_name": userobj.user_id,
 
        }
 
        log.debug(formatted_json(user_data))
 

	
 
        from kallithea.lib import auth
 
        password_match = auth.check_password(password, userobj.password)
 
        if userobj.is_default_user:
 
            log.info('user %s authenticated correctly as anonymous user',
 
                     username)
 
            return user_data
 

	
 
        elif userobj.username == username and password_match:
 
            log.info('user %s authenticated correctly', user_data['username'])
 
            return user_data
 

	
 
        log.error("user %s had a bad password", username)
 
        return None
 

	
 
    def get_managed_fields(self):
 
        # Note: 'username' should only be editable (at least for user) if self registration is enabled
 
        return []
kallithea/lib/compat.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.compat
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
Python backward compatibility functions and common libs
 

	
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Oct 7, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import functools
 
import os
 
import sys
 

	
 
#==============================================================================
 
# Hybrid property/method
 
#==============================================================================
 
from sqlalchemy.ext.hybrid import hybrid_property
 
#==============================================================================
 
# OrderedSet
 
#==============================================================================
 
from sqlalchemy.util import OrderedSet
 

	
 
#==============================================================================
 
# json
 
#==============================================================================
 
from kallithea.lib.ext_json import json
 

	
 

	
 
# alias for formatted json
 
formatted_json = functools.partial(json.dumps, indent=4, sort_keys=True)
 

	
 

	
 

	
 

	
 

	
 

	
 
#==============================================================================
 
# kill
 
#==============================================================================
 
if os.name == 'nt': # Windows
 
    import ctypes
 

	
 
    def kill(pid, sig):
 
        """kill function for Win32"""
 
        kernel32 = ctypes.windll.kernel32
 
        handle = kernel32.OpenProcess(1, 0, pid)
 
        return (0 != kernel32.TerminateProcess(handle, 0))
 

	
 
else:
 
    kill = os.kill
kallithea/lib/vcs/backends/git/ssh.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 
import os
 

	
 
from kallithea.lib.hooks import log_pull_action
 
from kallithea.lib.utils import make_ui
 
from kallithea.lib.utils2 import safe_str, safe_unicode
 
from kallithea.lib.vcs.backends.ssh import BaseSshHandler
 
from kallithea.lib.vcs.utils import safe_str, safe_unicode
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class GitSshHandler(BaseSshHandler):
 
    vcs_type = 'git'
 

	
 
    @classmethod
 
    def make(cls, ssh_command_parts):
 
        r"""
 
        >>> import shlex
 

	
 
        >>> GitSshHandler.make(shlex.split("git-upload-pack '/foo bar'")).repo_name
 
        u'foo bar'
 
        >>> GitSshHandler.make(shlex.split("git-upload-pack '/foo bar'")).verb
 
        'git-upload-pack'
 
        >>> GitSshHandler.make(shlex.split(" git-upload-pack /blåbærgrød ")).repo_name # might not be necessary to support no quoting ... but we can
 
        u'bl\xe5b\xe6rgr\xf8d'
 
        >>> GitSshHandler.make(shlex.split('''git-upload-pack "/foo'bar"''')).repo_name
 
        u"foo'bar"
 
        >>> GitSshHandler.make(shlex.split("git-receive-pack '/foo'")).repo_name
 
        u'foo'
 
        >>> GitSshHandler.make(shlex.split("git-receive-pack '/foo'")).verb
 
        'git-receive-pack'
 

	
 
        >>> GitSshHandler.make(shlex.split("/bin/git-upload-pack '/foo'")) # ssh-serve will report 'SSH command %r is not supported'
 
        >>> GitSshHandler.make(shlex.split('''git-upload-pack /foo bar''')) # ssh-serve will report 'SSH command %r is not supported'
 
        >>> shlex.split("git-upload-pack '/foo'bar' x") # ssh-serve will report: Error parsing SSH command "...": No closing quotation
 
        Traceback (most recent call last):
 
        ValueError: No closing quotation
 
        >>> GitSshHandler.make(shlex.split('hg -R foo serve --stdio')) # not handled here
 
        """
 
        if (len(ssh_command_parts) == 2 and
 
            ssh_command_parts[0] in ['git-upload-pack', 'git-receive-pack'] and
 
            ssh_command_parts[1].startswith('/')
 
        ):
 
            return cls(safe_unicode(ssh_command_parts[1][1:]), ssh_command_parts[0])
 

	
 
        return None
 

	
 
    def __init__(self, repo_name, verb):
 
        self.repo_name = repo_name
 
        self.verb = verb
 

	
 
    def _serve(self):
 
        if self.verb == 'git-upload-pack': # action 'pull'
 
            # base class called set_hook_environment - action is hardcoded to 'pull'
 
            log_pull_action(ui=make_ui(), repo=self.db_repo.scm_instance._repo)
 
        else: # probably verb 'git-receive-pack', action 'push'
 
            if not self.allow_push:
 
                self.exit('Push access to %r denied' % safe_str(self.repo_name))
 
            # Note: push logging is handled by Git post-receive hook
 

	
 
        # git shell is not a real shell but use shell inspired quoting *inside* the argument.
 
        # Per https://github.com/git/git/blob/v2.22.0/quote.c#L12 :
 
        # The path must be "'" quoted, but "'" and "!" must exit the quoting and be "\" escaped
 
        quoted_abspath = "'%s'" % self.db_repo.repo_full_path.replace("'", r"'\''").replace("!", r"'\!'")
 
        newcmd = ['git', 'shell', '-c', "%s %s" % (self.verb, quoted_abspath)]
 
        log.debug('Serving: %s', newcmd)
 
        os.execvp(newcmd[0], newcmd)
 
        self.exit("Failed to exec 'git' as %s" % newcmd)
kallithea/lib/vcs/backends/hg/ssh.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 

	
 
from mercurial import hg
 
from mercurial.wireprotoserver import sshserver
 

	
 
from kallithea.lib.utils import make_ui
 
from kallithea.lib.utils2 import safe_str, safe_unicode
 
from kallithea.lib.vcs.backends.ssh import BaseSshHandler
 
from kallithea.lib.vcs.utils import safe_str, safe_unicode
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class MercurialSshHandler(BaseSshHandler):
 
    vcs_type = 'hg'
 

	
 
    @classmethod
 
    def make(cls, ssh_command_parts):
 
        r"""
 
        >>> import shlex
 

	
 
        >>> MercurialSshHandler.make(shlex.split('hg -R "foo bar" serve --stdio')).repo_name
 
        u'foo bar'
 
        >>> MercurialSshHandler.make(shlex.split(' hg -R blåbærgrød serve --stdio ')).repo_name
 
        u'bl\xe5b\xe6rgr\xf8d'
 
        >>> MercurialSshHandler.make(shlex.split('''hg -R 'foo"bar' serve --stdio''')).repo_name
 
        u'foo"bar'
 

	
 
        >>> MercurialSshHandler.make(shlex.split('/bin/hg -R "foo" serve --stdio'))
 
        >>> MercurialSshHandler.make(shlex.split('''hg -R "foo"bar" serve --stdio''')) # ssh-serve will report: Error parsing SSH command "...": invalid syntax
 
        Traceback (most recent call last):
 
        ValueError: No closing quotation
 
        >>> MercurialSshHandler.make(shlex.split('git-upload-pack "/foo"')) # not handled here
 
        """
 
        if ssh_command_parts[:2] == ['hg', '-R'] and ssh_command_parts[3:] == ['serve', '--stdio']:
 
            return cls(safe_unicode(ssh_command_parts[2]))
 

	
 
        return None
 

	
 
    def __init__(self, repo_name):
 
        self.repo_name = repo_name
 

	
 
    def _serve(self):
 
        # Note: we want a repo with config based on .hg/hgrc and can thus not use self.db_repo.scm_instance._repo.ui
 
        baseui = make_ui(repo_path=self.db_repo.repo_full_path)
 
        if not self.allow_push:
 
            baseui.setconfig('hooks', 'pretxnopen._ssh_reject', 'python:kallithea.lib.hooks.rejectpush')
 
            baseui.setconfig('hooks', 'prepushkey._ssh_reject', 'python:kallithea.lib.hooks.rejectpush')
 

	
 
        repo = hg.repository(baseui, safe_str(self.db_repo.repo_full_path))
 
        log.debug("Starting Mercurial sshserver for %s", self.db_repo.repo_full_path)
 
        sshserver(baseui, repo).serve_forever()
kallithea/lib/vcs/backends/ssh.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
vcs.backends.ssh
 
~~~~~~~~~~~~~~~~~
 

	
 
SSH backend for all available SCMs
 
"""
 

	
 
import datetime
 
import logging
 
import sys
 

	
 
from kallithea.lib.auth import AuthUser, HasPermissionAnyMiddleware
 
from kallithea.lib.utils2 import safe_str, set_hook_environment
 
from kallithea.lib.utils2 import set_hook_environment
 
from kallithea.lib.vcs.utils import safe_str
 
from kallithea.model.db import Repository, User, UserSshKeys
 
from kallithea.model.meta import Session
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class BaseSshHandler(object):
 
    # Protocol for setting properties:
 
    # Set by sub class:
 
    #   vcs_type: 'hg' or 'git'
 
    # Set by make() / __init__():
 
    #   repo_name: requested repo name - only validated by serve()
 
    # Set by serve() - must not be accessed before:
 
    #   db_repo: repository db object
 
    #   authuser: user that has been authenticated - like request.authuser ... which isn't used here
 
    #   allow_push: false for read-only access to the repo
 

	
 
    # Set defaults, in case .exit should be called early
 
    vcs_type = None
 
    repo_name = None
 

	
 
    @staticmethod
 
    def make(ssh_command):
 
        """Factory function. Given a command as invoked over SSH (and preserved
 
        in SSH_ORIGINAL_COMMAND when run as authorized_keys command), return a
 
        handler if the command looks ok, else return None.
 
        """
 
        raise NotImplementedError
 

	
 
    def serve(self, user_id, key_id, client_ip):
 
        """Verify basic sanity of the repository, and that the user is
 
        valid and has access - then serve the native VCS protocol for
 
        repository access."""
 
        dbuser = User.get(user_id)
 
        if dbuser is None:
 
            self.exit('User %r not found' % user_id)
 
        self.authuser = AuthUser.make(dbuser=dbuser, ip_addr=client_ip)
 
        log.info('Authorized user %s from SSH %s trusting user id %s and key id %s for %r', dbuser, client_ip, user_id, key_id, self.repo_name)
 
        if self.authuser is None: # not ok ... but already kind of authenticated by SSH ... but not really not authorized ...
 
            self.exit('User %s from %s cannot be authorized' % (dbuser.username, client_ip))
 

	
 
        ssh_key = UserSshKeys.get(key_id)
 
        if ssh_key is None:
 
            self.exit('SSH key %r not found' % key_id)
 
        ssh_key.last_seen = datetime.datetime.now()
 
        Session().commit()
 

	
 
        if HasPermissionAnyMiddleware('repository.write',
 
                                      'repository.admin')(self.authuser, self.repo_name):
 
            self.allow_push = True
 
        elif HasPermissionAnyMiddleware('repository.read')(self.authuser, self.repo_name):
 
            self.allow_push = False
 
        else:
 
            self.exit('Access to %r denied' % safe_str(self.repo_name))
 

	
 
        self.db_repo = Repository.get_by_repo_name(self.repo_name)
 
        if self.db_repo is None:
 
            self.exit("Repository '%s' not found" % self.repo_name)
 
        assert self.db_repo.repo_name == self.repo_name
 

	
 
        # Set global hook environment up for 'push' actions.
 
        # If pull actions should be served, the actual hook invocation will be
 
        # hardcoded to 'pull' when log_pull_action is invoked (directly on Git,
 
        # or through the Mercurial 'outgoing' hook).
 
        # For push actions, the action in global hook environment is used (in
 
        # handle_git_post_receive when it is called as Git post-receive hook,
 
        # or in log_push_action through the Mercurial 'changegroup' hook).
 
        set_hook_environment(self.authuser.username, client_ip, self.repo_name, self.vcs_type, 'push')
 
        return self._serve()
 

	
 
    def _serve(self):
 
        """Serve the native protocol for repository access."""
 
        raise NotImplementedError
 

	
 
    def exit(self, error):
 
        log.info('abort serving %s %s: %s', self.vcs_type, self.repo_name, error)
 
        sys.stderr.write('abort: %s\n' % error)
 
        sys.exit(1)
kallithea/model/repo.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.repo
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
Repository model for kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 5, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
import logging
 
import os
 
import shutil
 
import traceback
 
from datetime import datetime
 

	
 
import kallithea.lib.utils2
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasRepoPermissionLevel, HasUserGroupPermissionLevel
 
from kallithea.lib.caching_query import FromCache
 
from kallithea.lib.exceptions import AttachedForksError
 
from kallithea.lib.hooks import log_delete_repository
 
from kallithea.lib.utils import is_valid_repo_uri, make_ui
 
from kallithea.lib.utils2 import LazyProperty, get_current_authuser, obfuscate_url_pw, remove_prefix, safe_str, safe_unicode
 
from kallithea.lib.vcs.backends import get_backend
 
from kallithea.model.db import (
 
    Permission, RepoGroup, Repository, RepositoryField, Session, Statistics, Ui, User, UserGroup, UserGroupRepoGroupToPerm, UserGroupRepoToPerm, UserRepoGroupToPerm, UserRepoToPerm)
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class RepoModel(object):
 

	
 
    URL_SEPARATOR = Repository.url_sep()
 

	
 
    def _create_default_perms(self, repository, private):
 
        # create default permission
 
        default = 'repository.read'
 
        def_user = User.get_default_user()
 
        for p in def_user.user_perms:
 
            if p.permission.permission_name.startswith('repository.'):
 
                default = p.permission.permission_name
 
                break
 

	
 
        default_perm = 'repository.none' if private else default
 

	
 
        repo_to_perm = UserRepoToPerm()
 
        repo_to_perm.permission = Permission.get_by_key(default_perm)
 

	
 
        repo_to_perm.repository = repository
 
        repo_to_perm.user_id = def_user.user_id
 
        Session().add(repo_to_perm)
 

	
 
        return repo_to_perm
 

	
 
    @LazyProperty
 
    def repos_path(self):
 
        """
 
        Gets the repositories root path from database
 
        """
 

	
 
        q = Ui.query().filter(Ui.ui_key == '/').one()
 
        return q.ui_value
 

	
 
    def get(self, repo_id, cache=False):
 
        repo = Repository.query() \
 
            .filter(Repository.repo_id == repo_id)
 

	
 
        if cache:
 
            repo = repo.options(FromCache("sql_cache_short",
 
                                          "get_repo_%s" % repo_id))
 
        return repo.scalar()
 

	
 
    def get_repo(self, repository):
 
        return Repository.guess_instance(repository)
 

	
 
    def get_by_repo_name(self, repo_name, cache=False):
 
        repo = Repository.query() \
 
            .filter(Repository.repo_name == repo_name)
 

	
 
        if cache:
 
            repo = repo.options(FromCache("sql_cache_short",
 
                                          "get_repo_%s" % repo_name))
 
        return repo.scalar()
 

	
 
    def get_all_user_repos(self, user):
 
        """
 
        Gets all repositories that user have at least read access
 

	
 
        :param user:
 
        """
 
        from kallithea.lib.auth import AuthUser
 
        user = User.guess_instance(user)
 
        repos = AuthUser(dbuser=user).permissions['repositories']
 
        access_check = lambda r: r[1] in ['repository.read',
 
                                          'repository.write',
 
                                          'repository.admin']
 
        repos = [x[0] for x in filter(access_check, repos.items())]
 
        return Repository.query().filter(Repository.repo_name.in_(repos))
 

	
 
    @classmethod
 
    def _render_datatable(cls, tmpl, *args, **kwargs):
 
        import kallithea
 
        from tg import tmpl_context as c, request, app_globals
 
        from tg.i18n import ugettext as _
 

	
 
        _tmpl_lookup = app_globals.mako_lookup
 
        template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
 

	
 
        tmpl = template.get_def(tmpl)
 
        kwargs.update(dict(_=_, h=h, c=c, request=request))
 
        return tmpl.render_unicode(*args, **kwargs)
 

	
 
    def get_repos_as_dict(self, repos_list, repo_groups_list=None,
 
                          admin=False,
 
                          short_name=False):
 
        """Return repository list for use by DataTable.
 
        repos_list: list of repositories - but will be filtered for read permission.
 
        repo_groups_list: added at top of list without permission check.
 
        admin: return data for action column.
 
        """
 
        _render = self._render_datatable
 
        from tg import tmpl_context as c
 

	
 
        def repo_lnk(name, rtype, rstate, private, fork_of):
 
            return _render('repo_name', name, rtype, rstate, private, fork_of,
 
                           short_name=short_name)
 

	
 
        def last_change(last_change):
 
            return _render("last_change", last_change)
 

	
 
        def rss_lnk(repo_name):
 
            return _render("rss", repo_name)
 

	
 
        def atom_lnk(repo_name):
 
            return _render("atom", repo_name)
 

	
 
        def last_rev(repo_name, cs_cache):
 
            return _render('revision', repo_name, cs_cache.get('revision'),
 
                           cs_cache.get('raw_id'), cs_cache.get('author'),
 
                           cs_cache.get('message'))
 

	
 
        def desc(desc):
 
            return h.urlify_text(desc, truncate=80, stylize=c.visual.stylify_metalabels)
 

	
 
        def state(repo_state):
 
            return _render("repo_state", repo_state)
 

	
 
        def repo_actions(repo_name):
 
            return _render('repo_actions', repo_name)
 

	
 
        def owner_actions(owner_id, username):
 
            return _render('user_name', owner_id, username)
 

	
 
        repos_data = []
 

	
 
        for gr in repo_groups_list or []:
 
            repos_data.append(dict(
 
                raw_name='\0' + gr.name, # sort before repositories
 
                just_name=gr.name,
 
                name=_render('group_name_html', group_name=gr.group_name, name=gr.name),
 
                desc=gr.group_description))
 

	
 
        for repo in repos_list:
 
            if not HasRepoPermissionLevel('read')(repo.repo_name, 'get_repos_as_dict check'):
 
                continue
 
            cs_cache = repo.changeset_cache
 
            row = {
 
                "raw_name": repo.repo_name,
 
                "just_name": repo.just_name,
 
                "name": repo_lnk(repo.repo_name, repo.repo_type,
 
                                 repo.repo_state, repo.private, repo.fork),
 
                "last_change_iso": repo.last_db_change.isoformat(),
 
                "last_change": last_change(repo.last_db_change),
 
                "last_changeset": last_rev(repo.repo_name, cs_cache),
 
                "last_rev_raw": cs_cache.get('revision'),
 
                "desc": desc(repo.description),
 
                "owner": h.person(repo.owner),
 
                "state": state(repo.repo_state),
 
                "rss": rss_lnk(repo.repo_name),
 
                "atom": atom_lnk(repo.repo_name),
 
            }
 
            if admin:
 
                row.update({
 
                    "action": repo_actions(repo.repo_name),
 
                    "owner": owner_actions(repo.owner_id,
 
                                           h.person(repo.owner))
 
                })
 
            repos_data.append(row)
 

	
 
        return {
 
            "sort": "name",
 
            "dir": "asc",
 
            "records": repos_data
 
        }
 

	
 
    def _get_defaults(self, repo_name):
 
        """
 
        Gets information about repository, and returns a dict for
 
        usage in forms
 

	
 
        :param repo_name:
 
        """
 

	
 
        repo_info = Repository.get_by_repo_name(repo_name)
 

	
 
        if repo_info is None:
 
            return None
 

	
 
        defaults = repo_info.get_dict()
 
        defaults['repo_name'] = repo_info.just_name
 
        defaults['repo_group'] = repo_info.group_id
 

	
 
        for strip, k in [(0, 'repo_type'), (1, 'repo_enable_downloads'),
 
                         (1, 'repo_description'),
 
                         (1, 'repo_landing_rev'), (0, 'clone_uri'),
 
                         (1, 'repo_private'), (1, 'repo_enable_statistics')]:
 
            attr = k
 
            if strip:
 
                attr = remove_prefix(k, 'repo_')
 

	
 
            val = defaults[attr]
 
            if k == 'repo_landing_rev':
 
                val = ':'.join(defaults[attr])
 
            defaults[k] = val
 
            if k == 'clone_uri':
 
                defaults['clone_uri_hidden'] = repo_info.clone_uri_hidden
 

	
 
        # fill owner
 
        if repo_info.owner:
 
            defaults.update({'owner': repo_info.owner.username})
 
        else:
 
            replacement_user = User.query().filter(User.admin ==
 
                                                   True).first().username
 
            defaults.update({'owner': replacement_user})
 

	
 
        # fill repository users
 
        for p in repo_info.repo_to_perm:
 
            defaults.update({'u_perm_%s' % p.user.username:
 
                                 p.permission.permission_name})
 

	
 
        # fill repository groups
 
        for p in repo_info.users_group_to_perm:
 
            defaults.update({'g_perm_%s' % p.users_group.users_group_name:
 
                                 p.permission.permission_name})
 

	
 
        return defaults
 

	
 
    def update(self, repo, **kwargs):
 
        try:
 
            cur_repo = Repository.guess_instance(repo)
 
            org_repo_name = cur_repo.repo_name
 
            if 'owner' in kwargs:
 
                cur_repo.owner = User.get_by_username(kwargs['owner'])
 

	
 
            if 'repo_group' in kwargs:
 
                assert kwargs['repo_group'] != u'-1', kwargs # RepoForm should have converted to None
 
                cur_repo.group = RepoGroup.get(kwargs['repo_group'])
 
                cur_repo.repo_name = cur_repo.get_new_name(cur_repo.just_name)
 
            log.debug('Updating repo %s with params:%s', cur_repo, kwargs)
 
            for k in ['repo_enable_downloads',
 
                      'repo_description',
 
                      'repo_landing_rev',
 
                      'repo_private',
 
                      'repo_enable_statistics',
 
                      ]:
 
                if k in kwargs:
 
                    setattr(cur_repo, remove_prefix(k, 'repo_'), kwargs[k])
 
            clone_uri = kwargs.get('clone_uri')
 
            if clone_uri is not None and clone_uri != cur_repo.clone_uri_hidden:
 
                # clone_uri is modified - if given a value, check it is valid
 
                if clone_uri != '':
 
                    # will raise exception on error
 
                    is_valid_repo_uri(cur_repo.repo_type, clone_uri, make_ui())
 
                cur_repo.clone_uri = clone_uri
 

	
 
            if 'repo_name' in kwargs:
 
                repo_name = kwargs['repo_name']
 
                if kallithea.lib.utils2.repo_name_slug(repo_name) != repo_name:
 
                    raise Exception('invalid repo name %s' % repo_name)
 
                cur_repo.repo_name = cur_repo.get_new_name(repo_name)
 

	
 
            # if private flag is set, reset default permission to NONE
 
            if kwargs.get('repo_private'):
 
                EMPTY_PERM = 'repository.none'
 
                RepoModel().grant_user_permission(
 
                    repo=cur_repo, user='default', perm=EMPTY_PERM
 
                )
 
                # handle extra fields
 
            for field in [k for k in kwargs if k.startswith(RepositoryField.PREFIX)]:
 
                k = RepositoryField.un_prefix_key(field)
 
                ex_field = RepositoryField.get_by_key_name(key=k, repo=cur_repo)
 
                if ex_field:
 
                    ex_field.field_value = kwargs[field]
 

	
 
            if org_repo_name != cur_repo.repo_name:
 
                # rename repository
 
                self._rename_filesystem_repo(old=org_repo_name, new=cur_repo.repo_name)
 

	
 
            return cur_repo
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def _create_repo(self, repo_name, repo_type, description, owner,
 
                     private=False, clone_uri=None, repo_group=None,
 
                     landing_rev='rev:tip', fork_of=None,
 
                     copy_fork_permissions=False, enable_statistics=False,
 
                     enable_downloads=False,
 
                     copy_group_permissions=False, state=Repository.STATE_PENDING):
 
        """
 
        Create repository inside database with PENDING state. This should only be
 
        executed by create() repo, with exception of importing existing repos.
 

	
 
        """
 
        from kallithea.model.scm import ScmModel
 

	
 
        owner = User.guess_instance(owner)
 
        fork_of = Repository.guess_instance(fork_of)
 
        repo_group = RepoGroup.guess_instance(repo_group)
 
        try:
 
            repo_name = safe_unicode(repo_name)
 
            description = safe_unicode(description)
 
            # repo name is just a name of repository
 
            # while repo_name_full is a full qualified name that is combined
 
            # with name and path of group
 
            repo_name_full = repo_name
 
            repo_name = repo_name.split(self.URL_SEPARATOR)[-1]
 
            if kallithea.lib.utils2.repo_name_slug(repo_name) != repo_name:
 
                raise Exception('invalid repo name %s' % repo_name)
 

	
 
            new_repo = Repository()
 
            new_repo.repo_state = state
 
            new_repo.enable_statistics = False
 
            new_repo.repo_name = repo_name_full
 
            new_repo.repo_type = repo_type
 
            new_repo.owner = owner
 
            new_repo.group = repo_group
 
            new_repo.description = description or repo_name
 
            new_repo.private = private
 
            if clone_uri:
 
                # will raise exception on error
 
                is_valid_repo_uri(repo_type, clone_uri, make_ui())
 
            new_repo.clone_uri = clone_uri
 
            new_repo.landing_rev = landing_rev
 

	
 
            new_repo.enable_statistics = enable_statistics
 
            new_repo.enable_downloads = enable_downloads
 

	
 
            if fork_of:
 
                parent_repo = fork_of
 
                new_repo.fork = parent_repo
 

	
 
            Session().add(new_repo)
 

	
 
            if fork_of and copy_fork_permissions:
 
                repo = fork_of
 
                user_perms = UserRepoToPerm.query() \
 
                    .filter(UserRepoToPerm.repository == repo).all()
 
                group_perms = UserGroupRepoToPerm.query() \
 
                    .filter(UserGroupRepoToPerm.repository == repo).all()
 

	
 
                for perm in user_perms:
 
                    UserRepoToPerm.create(perm.user, new_repo, perm.permission)
 

	
 
                for perm in group_perms:
 
                    UserGroupRepoToPerm.create(perm.users_group, new_repo,
 
                                               perm.permission)
 

	
 
            elif repo_group and copy_group_permissions:
 

	
 
                user_perms = UserRepoGroupToPerm.query() \
 
                    .filter(UserRepoGroupToPerm.group == repo_group).all()
 

	
 
                group_perms = UserGroupRepoGroupToPerm.query() \
 
                    .filter(UserGroupRepoGroupToPerm.group == repo_group).all()
 

	
 
                for perm in user_perms:
 
                    perm_name = perm.permission.permission_name.replace('group.', 'repository.')
 
                    perm_obj = Permission.get_by_key(perm_name)
 
                    UserRepoToPerm.create(perm.user, new_repo, perm_obj)
 

	
 
                for perm in group_perms:
 
                    perm_name = perm.permission.permission_name.replace('group.', 'repository.')
 
                    perm_obj = Permission.get_by_key(perm_name)
 
                    UserGroupRepoToPerm.create(perm.users_group, new_repo, perm_obj)
 

	
 
            else:
 
                self._create_default_perms(new_repo, private)
 

	
 
            # now automatically start following this repository as owner
 
            ScmModel().toggle_following_repo(new_repo.repo_id, owner.user_id)
 
            # we need to flush here, in order to check if database won't
 
            # throw any exceptions, create filesystem dirs at the very end
 
            Session().flush()
 
            return new_repo
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def create(self, form_data, cur_user):
 
        """
 
        Create repository using celery tasks
 

	
 
        :param form_data:
 
        :param cur_user:
 
        """
 
        from kallithea.lib.celerylib import tasks
 
        return tasks.create_repo(form_data, cur_user)
 

	
 
    def _update_permissions(self, repo, perms_new=None, perms_updates=None,
 
                            check_perms=True):
 
        if not perms_new:
 
            perms_new = []
 
        if not perms_updates:
 
            perms_updates = []
 

	
 
        # update permissions
 
        for member, perm, member_type in perms_updates:
 
            if member_type == 'user':
 
                # this updates existing one
 
                self.grant_user_permission(
 
                    repo=repo, user=member, perm=perm
 
                )
 
            else:
 
                # check if we have permissions to alter this usergroup's access
 
                if not check_perms or HasUserGroupPermissionLevel('read')(member):
 
                    self.grant_user_group_permission(
 
                        repo=repo, group_name=member, perm=perm
 
                    )
 
            # set new permissions
 
        for member, perm, member_type in perms_new:
 
            if member_type == 'user':
 
                self.grant_user_permission(
 
                    repo=repo, user=member, perm=perm
 
                )
 
            else:
 
                # check if we have permissions to alter this usergroup's access
 
                if not check_perms or HasUserGroupPermissionLevel('read')(member):
 
                    self.grant_user_group_permission(
 
                        repo=repo, group_name=member, perm=perm
 
                    )
 

	
 
    def create_fork(self, form_data, cur_user):
 
        """
 
        Simple wrapper into executing celery task for fork creation
 

	
 
        :param form_data:
 
        :param cur_user:
 
        """
 
        from kallithea.lib.celerylib import tasks
 
        return tasks.create_repo_fork(form_data, cur_user)
 

	
 
    def delete(self, repo, forks=None, fs_remove=True, cur_user=None):
 
        """
 
        Delete given repository, forks parameter defines what do do with
 
        attached forks. Throws AttachedForksError if deleted repo has attached
 
        forks
 

	
 
        :param repo:
 
        :param forks: str 'delete' or 'detach'
 
        :param fs_remove: remove(archive) repo from filesystem
 
        """
 
        if not cur_user:
 
            cur_user = getattr(get_current_authuser(), 'username', None)
 
        repo = Repository.guess_instance(repo)
 
        if repo is not None:
 
            if forks == 'detach':
 
                for r in repo.forks:
 
                    r.fork = None
 
            elif forks == 'delete':
 
                for r in repo.forks:
 
                    self.delete(r, forks='delete')
 
            elif [f for f in repo.forks]:
 
                raise AttachedForksError()
 

	
 
            old_repo_dict = repo.get_dict()
 
            try:
 
                Session().delete(repo)
 
                if fs_remove:
 
                    self._delete_filesystem_repo(repo)
 
                else:
 
                    log.debug('skipping removal from filesystem')
 
                log_delete_repository(old_repo_dict,
 
                                      deleted_by=cur_user)
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                raise
 

	
 
    def grant_user_permission(self, repo, user, perm):
 
        """
 
        Grant permission for user on given repository, or update existing one
 
        if found
 

	
 
        :param repo: Instance of Repository, repository_id, or repository name
 
        :param user: Instance of User, user_id or username
 
        :param perm: Instance of Permission, or permission_name
 
        """
 
        user = User.guess_instance(user)
 
        repo = Repository.guess_instance(repo)
 
        permission = Permission.guess_instance(perm)
 

	
 
        # check if we have that permission already
 
        obj = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.user == user) \
 
            .filter(UserRepoToPerm.repository == repo) \
 
            .scalar()
 
        if obj is None:
 
            # create new !
 
            obj = UserRepoToPerm()
 
            Session().add(obj)
 
        obj.repository = repo
 
        obj.user = user
 
        obj.permission = permission
 
        log.debug('Granted perm %s to %s on %s', perm, user, repo)
 
        return obj
 

	
 
    def revoke_user_permission(self, repo, user):
 
        """
 
        Revoke permission for user on given repository
 

	
 
        :param repo: Instance of Repository, repository_id, or repository name
 
        :param user: Instance of User, user_id or username
 
        """
 

	
 
        user = User.guess_instance(user)
 
        repo = Repository.guess_instance(repo)
 

	
 
        obj = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository == repo) \
 
            .filter(UserRepoToPerm.user == user) \
 
            .scalar()
 
        if obj is not None:
 
            Session().delete(obj)
 
            log.debug('Revoked perm on %s on %s', repo, user)
 

	
 
    def grant_user_group_permission(self, repo, group_name, perm):
 
        """
 
        Grant permission for user group on given repository, or update
 
        existing one if found
 

	
 
        :param repo: Instance of Repository, repository_id, or repository name
 
        :param group_name: Instance of UserGroup, users_group_id,
 
            or user group name
 
        :param perm: Instance of Permission, or permission_name
 
        """
 
        repo = Repository.guess_instance(repo)
 
        group_name = UserGroup.guess_instance(group_name)
 
        permission = Permission.guess_instance(perm)
 

	
 
        # check if we have that permission already
 
        obj = UserGroupRepoToPerm.query() \
 
            .filter(UserGroupRepoToPerm.users_group == group_name) \
 
            .filter(UserGroupRepoToPerm.repository == repo) \
 
            .scalar()
 

	
 
        if obj is None:
 
            # create new
 
            obj = UserGroupRepoToPerm()
 
            Session().add(obj)
 

	
 
        obj.repository = repo
 
        obj.users_group = group_name
 
        obj.permission = permission
 
        log.debug('Granted perm %s to %s on %s', perm, group_name, repo)
 
        return obj
 

	
 
    def revoke_user_group_permission(self, repo, group_name):
 
        """
 
        Revoke permission for user group on given repository
 

	
 
        :param repo: Instance of Repository, repository_id, or repository name
 
        :param group_name: Instance of UserGroup, users_group_id,
 
            or user group name
 
        """
 
        repo = Repository.guess_instance(repo)
 
        group_name = UserGroup.guess_instance(group_name)
 

	
 
        obj = UserGroupRepoToPerm.query() \
 
            .filter(UserGroupRepoToPerm.repository == repo) \
 
            .filter(UserGroupRepoToPerm.users_group == group_name) \
 
            .scalar()
 
        if obj is not None:
 
            Session().delete(obj)
 
            log.debug('Revoked perm to %s on %s', repo, group_name)
 

	
 
    def delete_stats(self, repo_name):
 
        """
 
        removes stats for given repo
 

	
 
        :param repo_name:
 
        """
 
        repo = Repository.guess_instance(repo_name)
 
        try:
 
            obj = Statistics.query() \
 
                .filter(Statistics.repository == repo).scalar()
 
            if obj is not None:
 
                Session().delete(obj)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def _create_filesystem_repo(self, repo_name, repo_type, repo_group,
 
                                clone_uri=None, repo_store_location=None):
 
        """
 
        Makes repository on filesystem. Operation is group aware, meaning that it will create
 
        a repository within a group, and alter the paths accordingly to the group location.
 

	
 
        Note: clone_uri is low level and not validated - it might be a file system path used for validated cloning
 
        """
 
        from kallithea.lib.utils import is_valid_repo, is_valid_repo_group
 
        from kallithea.model.scm import ScmModel
 

	
 
        if '/' in repo_name:
 
            raise ValueError('repo_name must not contain groups got `%s`' % repo_name)
 

	
 
        if isinstance(repo_group, RepoGroup):
 
            new_parent_path = os.sep.join(repo_group.full_path_splitted)
 
        else:
 
            new_parent_path = repo_group or ''
 

	
 
        if repo_store_location:
 
            _paths = [repo_store_location]
 
        else:
 
            _paths = [self.repos_path, new_parent_path, repo_name]
 
            # we need to make it str for mercurial
 
        repo_path = os.path.join(*(safe_str(x) for x in _paths))
 

	
 
        # check if this path is not a repository
 
        if is_valid_repo(repo_path, self.repos_path):
 
            raise Exception('This path %s is a valid repository' % repo_path)
 

	
 
        # check if this path is a group
 
        if is_valid_repo_group(repo_path, self.repos_path):
 
            raise Exception('This path %s is a valid group' % repo_path)
 

	
 
        log.info('creating repo %s in %s from url: `%s`',
 
            repo_name, safe_unicode(repo_path),
 
            obfuscate_url_pw(clone_uri))
 

	
 
        backend = get_backend(repo_type)
 

	
 
        if repo_type == 'hg':
 
            baseui = make_ui()
 
            # patch and reset hooks section of UI config to not run any
 
            # hooks on creating remote repo
 
            for k, v in baseui.configitems('hooks'):
 
                baseui.setconfig('hooks', k, None)
 

	
 
            repo = backend(repo_path, create=True, src_url=clone_uri, baseui=baseui)
 
        elif repo_type == 'git':
 
            repo = backend(repo_path, create=True, src_url=clone_uri, bare=True)
 
            # add kallithea hook into this repo
 
            ScmModel().install_git_hooks(repo=repo)
 
        else:
 
            raise Exception('Not supported repo_type %s expected hg/git' % repo_type)
 

	
 
        log.debug('Created repo %s with %s backend',
 
                  safe_unicode(repo_name), safe_unicode(repo_type))
 
        return repo
 

	
 
    def _rename_filesystem_repo(self, old, new):
 
        """
 
        renames repository on filesystem
 

	
 
        :param old: old name
 
        :param new: new name
 
        """
 
        log.info('renaming repo from %s to %s', old, new)
 

	
 
        old_path = safe_str(os.path.join(self.repos_path, old))
 
        new_path = safe_str(os.path.join(self.repos_path, new))
 
        if os.path.isdir(new_path):
 
            raise Exception(
 
                'Was trying to rename to already existing dir %s' % new_path
 
            )
 
        shutil.move(old_path, new_path)
 

	
 
    def _delete_filesystem_repo(self, repo):
 
        """
 
        removes repo from filesystem, the removal is actually done by
 
        renaming dir to a 'rm__*' prefix which Kallithea will skip.
 
        It can be undeleted later by reverting the rename.
 

	
 
        :param repo: repo object
 
        """
 
        rm_path = safe_str(os.path.join(self.repos_path, repo.repo_name))
 
        log.info("Removing %s", rm_path)
 

	
 
        _now = datetime.now()
 
        _ms = str(_now.microsecond).rjust(6, '0')
 
        _d = 'rm__%s__%s' % (_now.strftime('%Y%m%d_%H%M%S_' + _ms),
 
                             repo.just_name)
 
        if repo.group:
 
            args = repo.group.full_path_splitted + [_d]
 
            _d = os.path.join(*args)
 
        if os.path.exists(rm_path):
 
            shutil.move(rm_path, safe_str(os.path.join(self.repos_path, _d)))
 
        else:
 
            log.error("Can't find repo to delete in %r", rm_path)
kallithea/tests/vcs/test_hg.py
Show inline comments
 
import os
 

	
 
import mock
 
import pytest
 

	
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.lib.vcs.backends.hg import MercurialChangeset, MercurialRepository
 
from kallithea.lib.vcs.exceptions import NodeDoesNotExistError, RepositoryError, VCSError
 
from kallithea.lib.vcs.nodes import NodeKind, NodeState
 
from kallithea.lib.vcs.utils import safe_str
 
from kallithea.tests.vcs.conf import TEST_HG_REPO, TEST_HG_REPO_CLONE, TEST_HG_REPO_PULL, TESTS_TMP_PATH
 

	
 

	
 
class TestMercurialRepository(object):
 

	
 
    def __check_for_existing_repo(self):
 
        if os.path.exists(TEST_HG_REPO_CLONE):
 
            pytest.fail('Cannot test mercurial clone repo as location %s already '
 
                      'exists. You should manually remove it first.'
 
                      % TEST_HG_REPO_CLONE)
 

	
 
    def setup_method(self):
 
        self.repo = MercurialRepository(safe_str(TEST_HG_REPO))
 

	
 
    def test_wrong_repo_path(self):
 
        wrong_repo_path = os.path.join(TESTS_TMP_PATH, 'errorrepo')
 
        with pytest.raises(RepositoryError):
 
            MercurialRepository(wrong_repo_path)
 

	
 
    def test_unicode_path_repo(self):
 
        with pytest.raises(VCSError):
 
            MercurialRepository(u'iShouldFail')
 

	
 
    def test_repo_clone(self):
 
        self.__check_for_existing_repo()
 
        repo = MercurialRepository(safe_str(TEST_HG_REPO))
 
        repo_clone = MercurialRepository(TEST_HG_REPO_CLONE,
 
            src_url=TEST_HG_REPO, update_after_clone=True)
 
        assert len(repo.revisions) == len(repo_clone.revisions)
 
        # Checking hashes of changesets should be enough
 
        for changeset in repo.get_changesets():
 
            raw_id = changeset.raw_id
 
            assert raw_id == repo_clone.get_changeset(raw_id).raw_id
 

	
 
    def test_repo_clone_with_update(self):
 
        repo = MercurialRepository(safe_str(TEST_HG_REPO))
 
        repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_w_update',
 
            src_url=TEST_HG_REPO, update_after_clone=True)
 
        assert len(repo.revisions) == len(repo_clone.revisions)
 

	
 
        # check if current workdir was updated
 
        assert os.path.isfile(
 
            os.path.join(
 
                TEST_HG_REPO_CLONE + '_w_update', 'MANIFEST.in'
 
            )
 
        )
 

	
 
    def test_repo_clone_without_update(self):
 
        repo = MercurialRepository(safe_str(TEST_HG_REPO))
 
        repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_wo_update',
 
            src_url=TEST_HG_REPO, update_after_clone=False)
 
        assert len(repo.revisions) == len(repo_clone.revisions)
 
        assert not os.path.isfile(
 
            os.path.join(
 
                TEST_HG_REPO_CLONE + '_wo_update', 'MANIFEST.in'
 
            )
 
        )
 

	
 
    def test_pull(self):
 
        if os.path.exists(TEST_HG_REPO_PULL):
 
            pytest.fail('Cannot test mercurial pull command as location %s '
 
                      'already exists. You should manually remove it first'
 
                      % TEST_HG_REPO_PULL)
 
        repo_new = MercurialRepository(TEST_HG_REPO_PULL, create=True)
 
        assert len(self.repo.revisions) > len(repo_new.revisions)
 

	
 
        repo_new.pull(self.repo.path)
 
        repo_new = MercurialRepository(TEST_HG_REPO_PULL)
 
        assert len(self.repo.revisions) == len(repo_new.revisions)
 

	
 
    def test_revisions(self):
 
        # there are 21 revisions at bitbucket now
 
        # so we can assume they would be available from now on
 
        subset = set(['b986218ba1c9b0d6a259fac9b050b1724ed8e545',
 
                 '3d8f361e72ab303da48d799ff1ac40d5ac37c67e',
 
                 '6cba7170863a2411822803fa77a0a264f1310b35',
 
                 '56349e29c2af3ac913b28bde9a2c6154436e615b',
 
                 '2dda4e345facb0ccff1a191052dd1606dba6781d',
 
                 '6fff84722075f1607a30f436523403845f84cd9e',
 
                 '7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7',
 
                 '3803844fdbd3b711175fc3da9bdacfcd6d29a6fb',
 
                 'dc5d2c0661b61928834a785d3e64a3f80d3aad9c',
 
                 'be90031137367893f1c406e0a8683010fd115b79',
 
                 'db8e58be770518cbb2b1cdfa69146e47cd481481',
 
                 '84478366594b424af694a6c784cb991a16b87c21',
 
                 '17f8e105dddb9f339600389c6dc7175d395a535c',
 
                 '20a662e756499bde3095ffc9bc0643d1def2d0eb',
 
                 '2e319b85e70a707bba0beff866d9f9de032aa4f9',
 
                 '786facd2c61deb9cf91e9534735124fb8fc11842',
 
                 '94593d2128d38210a2fcd1aabff6dda0d6d9edf8',
 
                 'aa6a0de05b7612707db567078e130a6cd114a9a7',
 
                 'eada5a770da98ab0dd7325e29d00e0714f228d09'
 
                ])
 
        assert subset.issubset(set(self.repo.revisions))
 

	
 
        # check if we have the proper order of revisions
 
        org = ['b986218ba1c9b0d6a259fac9b050b1724ed8e545',
 
                '3d8f361e72ab303da48d799ff1ac40d5ac37c67e',
 
                '6cba7170863a2411822803fa77a0a264f1310b35',
 
                '56349e29c2af3ac913b28bde9a2c6154436e615b',
 
                '2dda4e345facb0ccff1a191052dd1606dba6781d',
 
                '6fff84722075f1607a30f436523403845f84cd9e',
 
                '7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7',
 
                '3803844fdbd3b711175fc3da9bdacfcd6d29a6fb',
 
                'dc5d2c0661b61928834a785d3e64a3f80d3aad9c',
 
                'be90031137367893f1c406e0a8683010fd115b79',
 
                'db8e58be770518cbb2b1cdfa69146e47cd481481',
 
                '84478366594b424af694a6c784cb991a16b87c21',
 
                '17f8e105dddb9f339600389c6dc7175d395a535c',
 
                '20a662e756499bde3095ffc9bc0643d1def2d0eb',
 
                '2e319b85e70a707bba0beff866d9f9de032aa4f9',
 
                '786facd2c61deb9cf91e9534735124fb8fc11842',
 
                '94593d2128d38210a2fcd1aabff6dda0d6d9edf8',
 
                'aa6a0de05b7612707db567078e130a6cd114a9a7',
 
                'eada5a770da98ab0dd7325e29d00e0714f228d09',
 
                '2c1885c735575ca478bf9e17b0029dca68824458',
 
                'd9bcd465040bf869799b09ad732c04e0eea99fe9',
 
                '469e9c847fe1f6f7a697b8b25b4bc5b48780c1a7',
 
                '4fb8326d78e5120da2c7468dcf7098997be385da',
 
                '62b4a097164940bd66030c4db51687f3ec035eed',
 
                '536c1a19428381cfea92ac44985304f6a8049569',
 
                '965e8ab3c44b070cdaa5bf727ddef0ada980ecc4',
 
                '9bb326a04ae5d98d437dece54be04f830cf1edd9',
 
                'f8940bcb890a98c4702319fbe36db75ea309b475',
 
                'ff5ab059786ebc7411e559a2cc309dfae3625a3b',
 
                '6b6ad5f82ad5bb6190037671bd254bd4e1f4bf08',
 
                'ee87846a61c12153b51543bf860e1026c6d3dcba', ]
 
        assert org == self.repo.revisions[:31]
 

	
 
    def test_iter_slice(self):
 
        sliced = list(self.repo[:10])
 
        itered = list(self.repo)[:10]
 
        assert sliced == itered
 

	
 
    def test_slicing(self):
 
        # 4 1 5 10 95
 
        for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
 
                                 (10, 20, 10), (5, 100, 95)]:
 
            revs = list(self.repo[sfrom:sto])
 
            assert len(revs) == size
 
            assert revs[0] == self.repo.get_changeset(sfrom)
 
            assert revs[-1] == self.repo.get_changeset(sto - 1)
 

	
 
    def test_branches(self):
 
        # TODO: Need more tests here
 

	
 
        # active branches
 
        assert 'default' in self.repo.branches
 
        assert 'stable' in self.repo.branches
 

	
 
        # closed
 
        assert 'git' in self.repo._get_branches(closed=True)
 
        assert 'web' in self.repo._get_branches(closed=True)
 

	
 
        for name, id in self.repo.branches.items():
 
            assert isinstance(self.repo.get_changeset(id), MercurialChangeset)
 

	
 
    def test_tip_in_tags(self):
 
        # tip is always a tag
 
        assert 'tip' in self.repo.tags
 

	
 
    def test_tip_changeset_in_tags(self):
 
        tip = self.repo.get_changeset()
 
        assert self.repo.tags['tip'] == tip.raw_id
 

	
 
    def test_initial_changeset(self):
 

	
 
        init_chset = self.repo.get_changeset(0)
 
        assert init_chset.message == 'initial import'
 
        assert init_chset.author == 'Marcin Kuzminski <marcin@python-blog.com>'
 
        assert sorted(init_chset._file_paths) == sorted([
 
            'vcs/__init__.py',
 
            'vcs/backends/BaseRepository.py',
 
            'vcs/backends/__init__.py',
 
        ])
 

	
 
        assert sorted(init_chset._dir_paths) == sorted(['', 'vcs', 'vcs/backends'])
 

	
 
        with pytest.raises(NodeDoesNotExistError):
 
            init_chset.get_node(path='foobar')
 

	
 
        node = init_chset.get_node('vcs/')
 
        assert hasattr(node, 'kind')
 
        assert node.kind == NodeKind.DIR
 

	
 
        node = init_chset.get_node('vcs')
 
        assert hasattr(node, 'kind')
 
        assert node.kind == NodeKind.DIR
 

	
 
        node = init_chset.get_node('vcs/__init__.py')
 
        assert hasattr(node, 'kind')
 
        assert node.kind == NodeKind.FILE
 

	
 
    def test_not_existing_changeset(self):
 
        # rawid
 
        with pytest.raises(RepositoryError):
 
            self.repo.get_changeset('abcd' * 10)
 
        # shortid
 
        with pytest.raises(RepositoryError):
 
            self.repo.get_changeset('erro' * 4)
 
        # numeric
 
        with pytest.raises(RepositoryError):
 
            self.repo.get_changeset(self.repo.count() + 1)
 

	
 
        # Small chance we ever get to this one
 
        revision = pow(2, 30)
 
        with pytest.raises(RepositoryError):
 
            self.repo.get_changeset(revision)
 

	
 
    def test_changeset10(self):
 

	
 
        chset10 = self.repo.get_changeset(10)
 
        readme = """===
 
VCS
 
===
 

	
 
Various Version Control System management abstraction layer for Python.
 

	
 
Introduction
 
------------
 

	
 
TODO: To be written...
 

	
 
"""
 
        node = chset10.get_node('README.rst')
 
        assert node.kind == NodeKind.FILE
 
        assert node.content == readme
 

	
 
    @mock.patch('kallithea.lib.vcs.backends.hg.repository.diffopts')
 
    def test_get_diff_does_not_sanitize_zero_context(self, mock_diffopts):
 
        zero_context = 0
 

	
 
        self.repo.get_diff(0, 1, 'foo', context=zero_context)
 

	
 
        mock_diffopts.assert_called_once_with(git=True, showfunc=True, ignorews=False, context=zero_context)
 

	
 
    @mock.patch('kallithea.lib.vcs.backends.hg.repository.diffopts')
 
    def test_get_diff_sanitizes_negative_context(self, mock_diffopts):
 
        negative_context = -10
 
        zero_context = 0
 

	
 
        self.repo.get_diff(0, 1, 'foo', context=negative_context)
 

	
 
        mock_diffopts.assert_called_once_with(git=True, showfunc=True, ignorews=False, context=zero_context)
 

	
 

	
 
class TestMercurialChangeset(object):
 

	
 
    def setup_method(self):
 
        self.repo = MercurialRepository(safe_str(TEST_HG_REPO))
 

	
 
    def _test_equality(self, changeset):
 
        revision = changeset.revision
 
        assert changeset == self.repo.get_changeset(revision)
 

	
 
    def test_equality(self):
 
        revs = [0, 10, 20]
 
        changesets = [self.repo.get_changeset(rev) for rev in revs]
 
        for changeset in changesets:
 
            self._test_equality(changeset)
 

	
 
    def test_default_changeset(self):
 
        tip = self.repo.get_changeset('tip')
 
        assert tip == self.repo.get_changeset()
 
        assert tip == self.repo.get_changeset(revision=None)
 
        assert tip == list(self.repo[-1:])[0]
 

	
 
    def test_root_node(self):
 
        tip = self.repo.get_changeset('tip')
 
        assert tip.root is tip.get_node('')
 

	
 
    def test_lazy_fetch(self):
 
        """
 
        Test if changeset's nodes expands and are cached as we walk through
 
        the revision. This test is somewhat hard to write as order of tests
 
        is a key here. Written by running command after command in a shell.
 
        """
 
        chset = self.repo.get_changeset(45)
 
        assert len(chset.nodes) == 0
 
        root = chset.root
 
        assert len(chset.nodes) == 1
 
        assert len(root.nodes) == 8
 
        # accessing root.nodes updates chset.nodes
 
        assert len(chset.nodes) == 9
 

	
 
        docs = root.get_node('docs')
 
        # we haven't yet accessed anything new as docs dir was already cached
 
        assert len(chset.nodes) == 9
 
        assert len(docs.nodes) == 8
 
        # accessing docs.nodes updates chset.nodes
 
        assert len(chset.nodes) == 17
 

	
 
        assert docs is chset.get_node('docs')
 
        assert docs is root.nodes[0]
 
        assert docs is root.dirs[0]
 
        assert docs is chset.get_node('docs')
 

	
 
    def test_nodes_with_changeset(self):
 
        chset = self.repo.get_changeset(45)
 
        root = chset.root
 
        docs = root.get_node('docs')
 
        assert docs is chset.get_node('docs')
 
        api = docs.get_node('api')
 
        assert api is chset.get_node('docs/api')
 
        index = api.get_node('index.rst')
 
        assert index is chset.get_node('docs/api/index.rst')
 
        assert index is chset.get_node('docs').get_node('api').get_node('index.rst')
 

	
 
    def test_branch_and_tags(self):
 
        chset0 = self.repo.get_changeset(0)
 
        assert chset0.branch == 'default'
 
        assert chset0.branches == ['default']
 
        assert chset0.tags == []
 

	
 
        chset10 = self.repo.get_changeset(10)
 
        assert chset10.branch == 'default'
 
        assert chset10.branches == ['default']
 
        assert chset10.tags == []
 

	
 
        chset44 = self.repo.get_changeset(44)
 
        assert chset44.branch == 'web'
 
        assert chset44.branches == ['web']
 

	
 
        tip = self.repo.get_changeset('tip')
 
        assert 'tip' in tip.tags
 

	
 
    def _test_file_size(self, revision, path, size):
 
        node = self.repo.get_changeset(revision).get_node(path)
 
        assert node.is_file()
 
        assert node.size == size
 

	
 
    def test_file_size(self):
 
        to_check = (
 
            (10, 'setup.py', 1068),
 
            (20, 'setup.py', 1106),
 
            (60, 'setup.py', 1074),
 

	
 
            (10, 'vcs/backends/base.py', 2921),
 
            (20, 'vcs/backends/base.py', 3936),
 
            (60, 'vcs/backends/base.py', 6189),
 
        )
 
        for revision, path, size in to_check:
 
            self._test_file_size(revision, path, size)
 

	
 
    def _test_dir_size(self, revision, path, size):
 
        node = self.repo.get_changeset(revision).get_node(path)
 
        assert not node.is_file()
 
        assert node.size == size
 

	
 
    def test_dir_size(self):
 
        to_check = (
 
            ('96507bd11ecc', '/', 682421),
 
            ('a53d9201d4bc', '/', 682410),
 
            ('90243de06161', '/', 682006),
 
        )
 
        for revision, path, size in to_check:
 
            self._test_dir_size(revision, path, size)
 

	
 
    def test_repo_size(self):
 
        assert self.repo.size == 682421
 

	
 
    def test_file_history(self):
 
        # we can only check if those revisions are present in the history
 
        # as we cannot update this test every time file is changed
 
        files = {
 
            'setup.py': [7, 18, 45, 46, 47, 69, 77],
 
            'vcs/nodes.py': [7, 8, 24, 26, 30, 45, 47, 49, 56, 57, 58, 59, 60,
 
                61, 73, 76],
 
            'vcs/backends/hg.py': [4, 5, 6, 11, 12, 13, 14, 15, 16, 21, 22, 23,
 
                26, 27, 28, 30, 31, 33, 35, 36, 37, 38, 39, 40, 41, 44, 45, 47,
 
                48, 49, 53, 54, 55, 58, 60, 61, 67, 68, 69, 70, 73, 77, 78, 79,
 
                82],
 
        }
 
        for path, revs in files.items():
 
            tip = self.repo.get_changeset(revs[-1])
 
            node = tip.get_node(path)
 
            node_revs = [chset.revision for chset in node.history]
 
            assert set(revs).issubset(set(node_revs)), \
 
                "We assumed that %s is subset of revisions for which file %s " \
 
                "has been changed, and history of that node returned: %s" \
 
                % (revs, path, node_revs)
 

	
 
    def test_file_annotate(self):
 
        files = {
 
                 'vcs/backends/__init__.py':
 
                  {89: {'lines_no': 31,
 
                        'changesets': [32, 32, 61, 32, 32, 37, 32, 32, 32, 44,
 
                                       37, 37, 37, 37, 45, 37, 44, 37, 37, 37,
 
                                       32, 32, 32, 32, 37, 32, 37, 37, 32,
 
                                       32, 32]},
 
                   20: {'lines_no': 1,
 
                        'changesets': [4]},
 
                   55: {'lines_no': 31,
 
                        'changesets': [32, 32, 45, 32, 32, 37, 32, 32, 32, 44,
 
                                       37, 37, 37, 37, 45, 37, 44, 37, 37, 37,
 
                                       32, 32, 32, 32, 37, 32, 37, 37, 32,
 
                                       32, 32]}},
 
                 'vcs/exceptions.py':
 
                 {89: {'lines_no': 18,
 
                       'changesets': [16, 16, 16, 16, 16, 16, 16, 16, 16, 16,
 
                                      16, 16, 17, 16, 16, 18, 18, 18]},
 
                  20: {'lines_no': 18,
 
                       'changesets': [16, 16, 16, 16, 16, 16, 16, 16, 16, 16,
 
                                      16, 16, 17, 16, 16, 18, 18, 18]},
 
                  55: {'lines_no': 18, 'changesets': [16, 16, 16, 16, 16, 16,
 
                                                      16, 16, 16, 16, 16, 16,
 
                                                      17, 16, 16, 18, 18, 18]}},
 
                 'MANIFEST.in': {89: {'lines_no': 5,
 
                                      'changesets': [7, 7, 7, 71, 71]},
 
                                 20: {'lines_no': 3,
 
                                      'changesets': [7, 7, 7]},
 
                                 55: {'lines_no': 3,
 
                                     'changesets': [7, 7, 7]}}}
 

	
 
        for fname, revision_dict in files.items():
 
            for rev, data in revision_dict.items():
 
                cs = self.repo.get_changeset(rev)
 
                l1_1 = [x[1] for x in cs.get_file_annotate(fname)]
 
                l1_2 = [x[2]().raw_id for x in cs.get_file_annotate(fname)]
 
                assert l1_1 == l1_2
 
                l1 = l1_2 = [x[2]().revision for x in cs.get_file_annotate(fname)]
 
                l2 = files[fname][rev]['changesets']
 
                assert l1 == l2, "The lists of revision for %s@rev%s" \
 
                    "from annotation list should match each other," \
 
                    "got \n%s \nvs \n%s " % (fname, rev, l1, l2)
 

	
 
    def test_changeset_state(self):
 
        """
 
        Tests which files have been added/changed/removed at particular revision
 
        """
 

	
 
        # rev 46ad32a4f974:
 
        # hg st --rev 46ad32a4f974
 
        #    changed: 13
 
        #    added:   20
 
        #    removed: 1
 
        changed = set(['.hgignore'
 
            , 'README.rst', 'docs/conf.py', 'docs/index.rst', 'setup.py'
 
            , 'tests/test_hg.py', 'tests/test_nodes.py', 'vcs/__init__.py'
 
            , 'vcs/backends/__init__.py', 'vcs/backends/base.py'
 
            , 'vcs/backends/hg.py', 'vcs/nodes.py', 'vcs/utils/__init__.py'])
 

	
 
        added = set(['docs/api/backends/hg.rst'
 
            , 'docs/api/backends/index.rst', 'docs/api/index.rst'
 
            , 'docs/api/nodes.rst', 'docs/api/web/index.rst'
 
            , 'docs/api/web/simplevcs.rst', 'docs/installation.rst'
 
            , 'docs/quickstart.rst', 'setup.cfg', 'vcs/utils/baseui_config.py'
 
            , 'vcs/utils/web.py', 'vcs/web/__init__.py', 'vcs/web/exceptions.py'
 
            , 'vcs/web/simplevcs/__init__.py', 'vcs/web/simplevcs/exceptions.py'
 
            , 'vcs/web/simplevcs/middleware.py', 'vcs/web/simplevcs/models.py'
 
            , 'vcs/web/simplevcs/settings.py', 'vcs/web/simplevcs/utils.py'
 
            , 'vcs/web/simplevcs/views.py'])
 

	
 
        removed = set(['docs/api.rst'])
 

	
 
        chset64 = self.repo.get_changeset('46ad32a4f974')
 
        assert set((node.path for node in chset64.added)) == added
 
        assert set((node.path for node in chset64.changed)) == changed
 
        assert set((node.path for node in chset64.removed)) == removed
 

	
 
        # rev b090f22d27d6:
 
        # hg st --rev b090f22d27d6
 
        #    changed: 13
 
        #    added:   20
 
        #    removed: 1
 
        chset88 = self.repo.get_changeset('b090f22d27d6')
 
        assert set((node.path for node in chset88.added)) == set()
 
        assert set((node.path for node in chset88.changed)) == set(['.hgignore'])
 
        assert set((node.path for node in chset88.removed)) == set()
 

	
 
        # 85:
 
        #    added:   2 ['vcs/utils/diffs.py', 'vcs/web/simplevcs/views/diffs.py']
 
        #    changed: 4 ['vcs/web/simplevcs/models.py', ...]
 
        #    removed: 1 ['vcs/utils/web.py']
 
        chset85 = self.repo.get_changeset(85)
 
        assert set((node.path for node in chset85.added)) == set([
 
            'vcs/utils/diffs.py',
 
            'vcs/web/simplevcs/views/diffs.py'
 
        ])
 

	
 
        assert set((node.path for node in chset85.changed)) == set([
 
            'vcs/web/simplevcs/models.py',
 
            'vcs/web/simplevcs/utils.py',
 
            'vcs/web/simplevcs/views/__init__.py',
 
            'vcs/web/simplevcs/views/repository.py',
 
        ])
 

	
 
        assert set((node.path for node in chset85.removed)) == set([
 
            'vcs/utils/web.py'
 
        ])
 

	
 

	
 
    def test_files_state(self):
 
        """
 
        Tests state of FileNodes.
 
        """
 
        chset = self.repo.get_changeset(85)
 
        node = chset.get_node('vcs/utils/diffs.py')
 
        assert node.state, NodeState.ADDED
 
        assert node.added
 
        assert not node.changed
 
        assert not node.not_changed
 
        assert not node.removed
 

	
 
        chset = self.repo.get_changeset(88)
 
        node = chset.get_node('.hgignore')
 
        assert node.state, NodeState.CHANGED
 
        assert not node.added
 
        assert node.changed
 
        assert not node.not_changed
 
        assert not node.removed
 

	
 
        chset = self.repo.get_changeset(85)
 
        node = chset.get_node('setup.py')
 
        assert node.state, NodeState.NOT_CHANGED
 
        assert not node.added
 
        assert not node.changed
 
        assert node.not_changed
 
        assert not node.removed
 

	
 
        # If node has REMOVED state then trying to fetch it would raise
 
        # ChangesetError exception
 
        chset = self.repo.get_changeset(2)
 
        path = 'vcs/backends/BaseRepository.py'
 
        with pytest.raises(NodeDoesNotExistError):
 
            chset.get_node(path)
 
        # but it would be one of ``removed`` (changeset's attribute)
 
        assert path in [rf.path for rf in chset.removed]
 

	
 
    def test_commit_message_is_unicode(self):
 
        for cm in self.repo:
 
            assert isinstance(cm.message, unicode)
 

	
 
    def test_changeset_author_is_unicode(self):
 
        for cm in self.repo:
 
            assert isinstance(cm.author, unicode)
 

	
 
    def test_repo_files_content_is_unicode(self):
 
        test_changeset = self.repo.get_changeset(100)
 
        for node in test_changeset.get_node('/'):
 
            if node.is_file():
 
                assert isinstance(node.content, unicode)
 

	
 
    def test_wrong_path(self):
 
        # There is 'setup.py' in the root dir but not there:
 
        path = 'foo/bar/setup.py'
 
        with pytest.raises(VCSError):
 
            self.repo.get_changeset().get_node(path)
 

	
 
    def test_archival_file(self):
 
        # TODO:
 
        pass
 

	
 
    def test_archival_as_generator(self):
 
        # TODO:
 
        pass
 

	
 
    def test_archival_wrong_kind(self):
 
        tip = self.repo.get_changeset()
 
        with pytest.raises(VCSError):
 
            tip.fill_archive(kind='error')
 

	
 
    def test_archival_empty_prefix(self):
 
        # TODO:
 
        pass
 

	
 
    def test_author_email(self):
 
        assert 'marcin@python-blog.com' == self.repo.get_changeset('b986218ba1c9').author_email
 
        assert 'lukasz.balcerzak@python-center.pl' == self.repo.get_changeset('3803844fdbd3').author_email
 
        assert '' == self.repo.get_changeset('84478366594b').author_email
 

	
 
    def test_author_username(self):
 
        assert 'Marcin Kuzminski' == self.repo.get_changeset('b986218ba1c9').author_name
 
        assert 'Lukasz Balcerzak' == self.repo.get_changeset('3803844fdbd3').author_name
 
        assert 'marcink' == self.repo.get_changeset('84478366594b').author_name
 

	
 
    def test_successors(self):
 
        init_chset = self.repo.get_changeset(0)
 
        assert init_chset.successors == []
 

	
 
    def test_predecessors(self):
 
        init_chset = self.repo.get_changeset(0)
 
        assert len(init_chset.predecessors) == 0
kallithea/tests/vcs/test_inmemchangesets.py
Show inline comments
 
# encoding: utf-8
 
"""
 
Tests so called "in memory changesets" commit API of vcs.
 
"""
 

	
 
import datetime
 

	
 
import pytest
 

	
 
from kallithea.lib import vcs
 
from kallithea.lib.vcs.exceptions import (
 
    EmptyRepositoryError, NodeAlreadyAddedError, NodeAlreadyChangedError, NodeAlreadyExistsError, NodeAlreadyRemovedError, NodeDoesNotExistError, NodeNotChangedError)
 
from kallithea.lib.vcs.nodes import DirNode, FileNode
 
from kallithea.lib.vcs.utils import safe_unicode
 
from kallithea.tests.vcs.base import _BackendTestMixin
 

	
 

	
 
class InMemoryChangesetTestMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        # Note: this is slightly different than the regular _get_commits methods
 
        # as we don't actually return any commits. The creation of commits is
 
        # handled in the tests themselves.
 
        cls.nodes = [
 
            FileNode('foobar', content='Foo & bar'),
 
            FileNode('foobar2', content='Foo & bar, doubled!'),
 
            FileNode('foo bar with spaces', content=''),
 
            FileNode('foo/bar/baz', content='Inside'),
 
            FileNode('foo/bar/file.bin', content='\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00;\x00\x03\x00\xfe\xff\t\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x10\x00\x00\x18\x00\x00\x00\x01\x00\x00\x00\xfe\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'),
 
        ]
 
        commits = []
 
        return commits
 

	
 
    def test_add(self):
 
        rev_count = len(self.repo.revisions)
 
        to_add = [FileNode(node.path, content=node.content)
 
            for node in self.nodes]
 
        for node in to_add:
 
            self.imc.add(node)
 
        message = u'Added: %s' % ', '.join((node.path for node in self.nodes))
 
        author = unicode(self.__class__)
 
        changeset = self.imc.commit(message=message, author=author)
 

	
 
        newtip = self.repo.get_changeset()
 
        assert changeset == newtip
 
        assert rev_count + 1 == len(self.repo.revisions)
 
        assert newtip.message == message
 
        assert newtip.author == author
 
        assert not any((
 
            self.imc.added,
 
            self.imc.changed,
 
            self.imc.removed
 
        ))
 
        for node in to_add:
 
            assert newtip.get_node(node.path).content == node.content
 

	
 
    def test_add_in_bulk(self):
 
        rev_count = len(self.repo.revisions)
 
        to_add = [FileNode(node.path, content=node.content)
 
            for node in self.nodes]
 
        self.imc.add(*to_add)
 
        message = u'Added: %s' % ', '.join((node.path for node in self.nodes))
 
        author = unicode(self.__class__)
 
        changeset = self.imc.commit(message=message, author=author)
 

	
 
        newtip = self.repo.get_changeset()
 
        assert changeset == newtip
 
        assert rev_count + 1 == len(self.repo.revisions)
 
        assert newtip.message == message
 
        assert newtip.author == author
 
        assert not any((
 
            self.imc.added,
 
            self.imc.changed,
 
            self.imc.removed
 
        ))
 
        for node in to_add:
 
            assert newtip.get_node(node.path).content == node.content
 

	
 
    def test_add_actually_adds_all_nodes_at_second_commit_too(self):
 
        self.imc.add(FileNode('foo/bar/image.png', content='\0'))
 
        self.imc.add(FileNode('foo/README.txt', content='readme!'))
 
        changeset = self.imc.commit(u'Initial', u'joe.doe@example.com')
 
        assert isinstance(changeset.get_node('foo'), DirNode)
 
        assert isinstance(changeset.get_node('foo/bar'), DirNode)
 
        assert changeset.get_node('foo/bar/image.png').content == '\0'
 
        assert changeset.get_node('foo/README.txt').content == 'readme!'
 

	
 
        # commit some more files again
 
        to_add = [
 
            FileNode('foo/bar/foobaz/bar', content='foo'),
 
            FileNode('foo/bar/another/bar', content='foo'),
 
            FileNode('foo/baz.txt', content='foo'),
 
            FileNode('foobar/foobaz/file', content='foo'),
 
            FileNode('foobar/barbaz', content='foo'),
 
        ]
 
        self.imc.add(*to_add)
 
        changeset = self.imc.commit(u'Another', u'joe.doe@example.com')
 
        changeset.get_node('foo/bar/foobaz/bar').content == 'foo'
 
        changeset.get_node('foo/bar/another/bar').content == 'foo'
 
        changeset.get_node('foo/baz.txt').content == 'foo'
 
        changeset.get_node('foobar/foobaz/file').content == 'foo'
 
        changeset.get_node('foobar/barbaz').content == 'foo'
 

	
 
    def test_add_non_ascii_files(self):
 
        rev_count = len(self.repo.revisions)
 
        to_add = [
 
            FileNode('żółwik/zwierzątko', content='ćććć'),
 
            FileNode(u'żółwik/zwierzątko_uni', content=u'ćććć'),
 
        ]
 
        for node in to_add:
 
            self.imc.add(node)
 
        message = u'Added: %s' % ', '.join((node.path for node in self.nodes))
 
        author = unicode(self.__class__)
 
        changeset = self.imc.commit(message=message, author=author)
 

	
 
        newtip = self.repo.get_changeset()
 
        assert changeset == newtip
 
        assert rev_count + 1 == len(self.repo.revisions)
 
        assert newtip.message == message
 
        assert newtip.author == author
 
        assert not any((
 
            self.imc.added,
 
            self.imc.changed,
 
            self.imc.removed
 
        ))
 
        for node in to_add:
 
            assert newtip.get_node(node.path).content == node.content
 

	
 
    def test_add_raise_already_added(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        with pytest.raises(NodeAlreadyAddedError):
 
            self.imc.add(node)
 

	
 
    def test_check_integrity_raise_already_exist(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.imc.commit(message=u'Added foobar', author=unicode(self))
 
        self.imc.add(node)
 
        with pytest.raises(NodeAlreadyExistsError):
 
            self.imc.commit(message='new message',
 
                            author=str(self))
 

	
 
    def test_change(self):
 
        self.imc.add(FileNode('foo/bar/baz', content='foo'))
 
        self.imc.add(FileNode('foo/fbar', content='foobar'))
 
        tip = self.imc.commit(u'Initial', u'joe.doe@example.com')
 

	
 
        # Change node's content
 
        node = FileNode('foo/bar/baz', content='My **changed** content')
 
        self.imc.change(node)
 
        self.imc.commit(u'Changed %s' % node.path, u'joe.doe@example.com')
 

	
 
        newtip = self.repo.get_changeset()
 
        assert tip != newtip
 
        assert tip.id != newtip.id
 
        assert newtip.get_node('foo/bar/baz').content == 'My **changed** content'
 

	
 
    def test_change_non_ascii(self):
 
        to_add = [
 
            FileNode('żółwik/zwierzątko', content='ćććć'),
 
            FileNode(u'żółwik/zwierzątko_uni', content=u'ćććć'),
 
        ]
 
        for node in to_add:
 
            self.imc.add(node)
 

	
 
        tip = self.imc.commit(u'Initial', u'joe.doe@example.com')
 

	
 
        # Change node's content
 
        node = FileNode('żółwik/zwierzątko', content='My **changed** content')
 
        self.imc.change(node)
 
        self.imc.commit(u'Changed %s' % safe_unicode(node.path),
 
                        u'joe.doe@example.com')
 

	
 
        node = FileNode(u'żółwik/zwierzątko_uni', content=u'My **changed** content')
 
        self.imc.change(node)
 
        self.imc.commit(u'Changed %s' % safe_unicode(node.path),
 
                        u'joe.doe@example.com')
 

	
 
        newtip = self.repo.get_changeset()
 
        assert tip != newtip
 
        assert tip.id != newtip.id
 

	
 
        assert newtip.get_node('żółwik/zwierzątko').content == 'My **changed** content'
 
        assert newtip.get_node('żółwik/zwierzątko_uni').content == 'My **changed** content'
 

	
 
    def test_change_raise_empty_repository(self):
 
        node = FileNode('foobar')
 
        with pytest.raises(EmptyRepositoryError):
 
            self.imc.change(node)
 

	
 
    def test_check_integrity_change_raise_node_does_not_exist(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.imc.commit(message=u'Added foobar', author=unicode(self))
 
        node = FileNode('not-foobar', content='')
 
        self.imc.change(node)
 
        with pytest.raises(NodeDoesNotExistError):
 
            self.imc.commit(message='Changed not existing node', author=str(self))
 

	
 
    def test_change_raise_node_already_changed(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.imc.commit(message=u'Added foobar', author=unicode(self))
 
        node = FileNode('foobar', content='more baz')
 
        self.imc.change(node)
 
        with pytest.raises(NodeAlreadyChangedError):
 
            self.imc.change(node)
 

	
 
    def test_check_integrity_change_raise_node_not_changed(self):
 
        self.test_add()  # Performs first commit
 

	
 
        node = FileNode(self.nodes[0].path, content=self.nodes[0].content)
 
        self.imc.change(node)
 
        with pytest.raises(NodeNotChangedError):
 
            self.imc.commit(
 
                message=u'Trying to mark node as changed without touching it',
 
                author=unicode(self)
 
            )
 

	
 
    def test_change_raise_node_already_removed(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.imc.commit(message=u'Added foobar', author=unicode(self))
 
        self.imc.remove(FileNode('foobar'))
 
        with pytest.raises(NodeAlreadyRemovedError):
 
            self.imc.change(node)
 

	
 
    def test_remove(self):
 
        self.test_add()  # Performs first commit
 

	
 
        tip = self.repo.get_changeset()
 
        node = self.nodes[0]
 
        assert node.content == tip.get_node(node.path).content
 
        self.imc.remove(node)
 
        self.imc.commit(message=u'Removed %s' % node.path, author=unicode(self))
 

	
 
        newtip = self.repo.get_changeset()
 
        assert tip != newtip
 
        assert tip.id != newtip.id
 
        with pytest.raises(NodeDoesNotExistError):
 
            newtip.get_node(node.path)
 

	
 
    def test_remove_last_file_from_directory(self):
 
        node = FileNode('omg/qwe/foo/bar', content='foobar')
 
        self.imc.add(node)
 
        self.imc.commit(u'added', u'joe doe')
 

	
 
        self.imc.remove(node)
 
        tip = self.imc.commit(u'removed', u'joe doe')
 
        with pytest.raises(NodeDoesNotExistError):
 
            tip.get_node('omg/qwe/foo/bar')
 

	
 
    def test_remove_raise_node_does_not_exist(self):
 
        self.imc.remove(self.nodes[0])
 
        with pytest.raises(NodeDoesNotExistError):
 
            self.imc.commit(
 
                message='Trying to remove node at empty repository',
 
                author=str(self)
 
            )
 

	
 
    def test_check_integrity_remove_raise_node_does_not_exist(self):
 
        self.test_add()  # Performs first commit
 

	
 
        node = FileNode('no-such-file')
 
        self.imc.remove(node)
 
        with pytest.raises(NodeDoesNotExistError):
 
            self.imc.commit(
 
                message=u'Trying to remove not existing node',
 
                author=unicode(self)
 
            )
 

	
 
    def test_remove_raise_node_already_removed(self):
 
        self.test_add() # Performs first commit
 

	
 
        node = FileNode(self.nodes[0].path)
 
        self.imc.remove(node)
 
        with pytest.raises(NodeAlreadyRemovedError):
 
            self.imc.remove(node)
 

	
 
    def test_remove_raise_node_already_changed(self):
 
        self.test_add()  # Performs first commit
 

	
 
        node = FileNode(self.nodes[0].path, content='Bending time')
 
        self.imc.change(node)
 
        with pytest.raises(NodeAlreadyChangedError):
 
            self.imc.remove(node)
 

	
 
    def test_reset(self):
 
        self.imc.add(FileNode('foo', content='bar'))
 
        #self.imc.change(FileNode('baz', content='new'))
 
        #self.imc.remove(FileNode('qwe'))
 
        self.imc.reset()
 
        assert not any((
 
            self.imc.added,
 
            self.imc.changed,
 
            self.imc.removed
 
        ))
 

	
 
    def test_multiple_commits(self):
 
        N = 3  # number of commits to perform
 
        last = None
 
        for x in xrange(N):
 
            fname = 'file%s' % str(x).rjust(5, '0')
 
            content = 'foobar\n' * x
 
            node = FileNode(fname, content=content)
 
            self.imc.add(node)
 
            commit = self.imc.commit(u"Commit no. %s" % (x + 1), author=u'vcs')
 
            assert last != commit
 
            last = commit
 

	
 
        # Check commit number for same repo
 
        assert len(self.repo.revisions) == N
 

	
 
        # Check commit number for recreated repo
 
        assert len(self.repo.revisions) == N
 

	
 
    def test_date_attr(self):
 
        node = FileNode('foobar.txt', content='Foobared!')
 
        self.imc.add(node)
 
        date = datetime.datetime(1985, 1, 30, 1, 45)
 
        commit = self.imc.commit(u"Committed at time when I was born ;-)",
 
            author=u'lb <lb@example.com>', date=date)
 

	
 
        assert commit.date == date
 

	
 

	
 
class TestGitInMemoryChangeset(InMemoryChangesetTestMixin):
 
    backend_alias = 'git'
 

	
 

	
 
class TestHgInMemoryChangeset(InMemoryChangesetTestMixin):
 
    backend_alias = 'hg'
kallithea/tests/vcs/test_vcs.py
Show inline comments
 
import os
 
import shutil
 

	
 
import pytest
 

	
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.lib.vcs import VCSError, get_backend, get_repo
 
from kallithea.lib.vcs.backends.hg import MercurialRepository
 
from kallithea.lib.vcs.utils import safe_str
 
from kallithea.tests.vcs.conf import TEST_GIT_REPO, TEST_HG_REPO, TESTS_TMP_PATH
 

	
 

	
 
class TestVCS(object):
 
    """
 
    Tests for main module's methods.
 
    """
 

	
 
    def test_get_backend(self):
 
        hg = get_backend('hg')
 
        assert hg == MercurialRepository
 

	
 
    def test_alias_detect_hg(self):
 
        alias = 'hg'
 
        path = TEST_HG_REPO
 
        backend = get_backend(alias)
 
        repo = backend(safe_str(path))
 
        assert 'hg' == repo.alias
 

	
 
    def test_alias_detect_git(self):
 
        alias = 'git'
 
        path = TEST_GIT_REPO
 
        backend = get_backend(alias)
 
        repo = backend(safe_str(path))
 
        assert 'git' == repo.alias
 

	
 
    def test_wrong_alias(self):
 
        alias = 'wrong_alias'
 
        with pytest.raises(VCSError):
 
            get_backend(alias)
 

	
 
    def test_get_repo(self):
 
        alias = 'hg'
 
        path = TEST_HG_REPO
 
        backend = get_backend(alias)
 
        repo = backend(safe_str(path))
 

	
 
        assert repo.__class__ == get_repo(safe_str(path), alias).__class__
 
        assert repo.path == get_repo(safe_str(path), alias).path
 

	
 
    def test_get_repo_autoalias_hg(self):
 
        alias = 'hg'
 
        path = TEST_HG_REPO
 
        backend = get_backend(alias)
 
        repo = backend(safe_str(path))
 

	
 
        assert repo.__class__ == get_repo(safe_str(path)).__class__
 
        assert repo.path == get_repo(safe_str(path)).path
 

	
 
    def test_get_repo_autoalias_git(self):
 
        alias = 'git'
 
        path = TEST_GIT_REPO
 
        backend = get_backend(alias)
 
        repo = backend(safe_str(path))
 

	
 
        assert repo.__class__ == get_repo(safe_str(path)).__class__
 
        assert repo.path == get_repo(safe_str(path)).path
 

	
 
    def test_get_repo_err(self):
 
        blank_repo_path = os.path.join(TESTS_TMP_PATH, 'blank-error-repo')
 
        if os.path.isdir(blank_repo_path):
 
            shutil.rmtree(blank_repo_path)
 

	
 
        os.mkdir(blank_repo_path)
 
        with pytest.raises(VCSError):
 
            get_repo(blank_repo_path)
 
        with pytest.raises(VCSError):
 
            get_repo(blank_repo_path + 'non_existing')
 

	
 
    def test_get_repo_multialias(self):
 
        multialias_repo_path = os.path.join(TESTS_TMP_PATH, 'hg-git-repo')
 
        if os.path.isdir(multialias_repo_path):
 
            shutil.rmtree(multialias_repo_path)
 

	
 
        os.mkdir(multialias_repo_path)
 

	
 
        os.mkdir(os.path.join(multialias_repo_path, '.git'))
 
        os.mkdir(os.path.join(multialias_repo_path, '.hg'))
 
        with pytest.raises(VCSError):
 
            get_repo(multialias_repo_path)
0 comments (0 inline, 0 general)