Changeset - bd4840ad72d3
[Not reviewed]
kallithea/tests/__init__.py
Show inline comments
 
@@ -55,104 +55,104 @@ from nose.plugins.skip import SkipTest
 

	
 
from kallithea.lib.compat import unittest
 
from kallithea import is_windows
 
from kallithea.model.db import Notification, User, UserNotification
 
from kallithea.model.meta import Session
 
from kallithea.tests.parameterized import parameterized
 
from kallithea.lib.utils2 import safe_str
 

	
 

	
 
os.environ['TZ'] = 'UTC'
 
if not is_windows:
 
    time.tzset()
 

	
 
log = logging.getLogger(__name__)
 

	
 
__all__ = [
 
    'parameterized', 'environ', 'url', 'TestController',
 
    'SkipTest', 'ldap_lib_installed', 'pam_lib_installed', 'BaseTestCase', 'init_stack',
 
    'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
 
    'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
 
    'TEST_USER_ADMIN_EMAIL', 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
 
    'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
 
    'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
 
    'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
 
    'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
 
    'GIT_REMOTE_REPO', 'SCM_TESTS',
 
]
 

	
 
# Invoke websetup with the current config file
 
# SetupCommand('setup-app').run([config_file])
 

	
 
environ = {}
 

	
 
#SOME GLOBALS FOR TESTS
 

	
 
TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
 
TEST_USER_ADMIN_LOGIN = 'test_admin'
 
TEST_USER_ADMIN_PASS = 'test12'
 
TEST_USER_ADMIN_EMAIL = 'test_admin@example.com'
 

	
 
TEST_USER_REGULAR_LOGIN = 'test_regular'
 
TEST_USER_REGULAR_PASS = 'test12'
 
TEST_USER_REGULAR_EMAIL = 'test_regular@example.com'
 

	
 
TEST_USER_REGULAR2_LOGIN = 'test_regular2'
 
TEST_USER_REGULAR2_PASS = 'test12'
 
TEST_USER_REGULAR2_EMAIL = 'test_regular2@example.com'
 

	
 
HG_REPO = 'vcs_test_hg'
 
GIT_REPO = 'vcs_test_git'
 
HG_REPO = u'vcs_test_hg'
 
GIT_REPO = u'vcs_test_git'
 

	
 
NEW_HG_REPO = 'vcs_test_hg_new'
 
NEW_GIT_REPO = 'vcs_test_git_new'
 
NEW_HG_REPO = u'vcs_test_hg_new'
 
NEW_GIT_REPO = u'vcs_test_git_new'
 

	
 
HG_FORK = 'vcs_test_hg_fork'
 
GIT_FORK = 'vcs_test_git_fork'
 
HG_FORK = u'vcs_test_hg_fork'
 
GIT_FORK = u'vcs_test_git_fork'
 

	
 
## VCS
 
SCM_TESTS = ['hg', 'git']
 
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 

	
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 

	
 
TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
 
TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
 

	
 

	
 
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 

	
 
TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 
TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
 
TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
 

	
 
TEST_DIR = tempfile.gettempdir()
 
TEST_REPO_PREFIX = 'vcs-test'
 

	
 
# cached repos if any !
 
# comment out to get some other repos from bb or github
 
GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 

	
 
#skip ldap tests if LDAP lib is not installed
 
ldap_lib_installed = False
 
try:
 
    import ldap
 
    ldap.API_VERSION
 
    ldap_lib_installed = True
 
except ImportError:
 
    # means that python-ldap is not installed
 
    pass
 

	
 
try:
 
    import pam
 
    pam.PAM_TEXT_INFO
 
    pam_lib_installed = True
 
except ImportError:
 
    pam_lib_installed = False
 

	
 
import logging
 

	
 
class NullHandler(logging.Handler):
 
    def emit(self, record):
 
        pass
kallithea/tests/api/api_base.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
tests for api. run with::
 

	
 
    KALLITHEA_WHOOSH_TEST_DISABLE=1 nosetests --with-coverage --cover-package=kallithea.controllers.api.api -x kallithea/tests/api
 
"""
 

	
 
import os
 
import random
 
import mock
 

	
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.lib.compat import json
 
from kallithea.lib.auth import AuthUser
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.meta import Session
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.gist import GistModel
 
from kallithea.model.db import Repository, User, Setting
 
from kallithea.lib.utils2 import time_to_datetime
 

	
 

	
 
API_URL = '/_admin/api'
 
TEST_USER_GROUP = 'test_user_group'
 
TEST_REPO_GROUP = 'test_repo_group'
 
TEST_USER_GROUP = u'test_user_group'
 
TEST_REPO_GROUP = u'test_repo_group'
 

	
 
fixture = Fixture()
 

	
 

	
 
def _build_data(apikey, method, **kw):
 
    """
 
    Builds API data with given random ID
 

	
 
    :param random_id:
 
    """
 
    random_id = random.randrange(1, 9999)
 
    return random_id, json.dumps({
 
        "id": random_id,
 
        "api_key": apikey,
 
        "method": method,
 
        "args": kw
 
    })
 

	
 

	
 
jsonify = lambda obj: json.loads(json.dumps(obj))
 

	
 

	
 
def crash(*args, **kwargs):
 
    raise Exception('Total Crash !')
 

	
 

	
 
def api_call(test_obj, params):
 
    response = test_obj.app.post(API_URL, content_type='application/json',
 
                                 params=params)
 
    return response
 

	
 

	
 
## helpers
 
def make_user_group(name=TEST_USER_GROUP):
 
    gr = fixture.create_user_group(name, cur_user=TEST_USER_ADMIN_LOGIN)
 
    UserGroupModel().add_user_to_group(user_group=gr,
 
                                       user=TEST_USER_ADMIN_LOGIN)
 
    Session().commit()
 
    return gr
 

	
 

	
 
def make_repo_group(name=TEST_REPO_GROUP):
 
    gr = fixture.create_repo_group(name, cur_user=TEST_USER_ADMIN_LOGIN)
 
    Session().commit()
 
    return gr
 

	
 

	
 
class _BaseTestApi(object):
 
    REPO = None
 
    REPO_TYPE = None
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        cls.usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        cls.apikey = cls.usr.api_key
 
        cls.test_user = UserModel().create_or_update(
 
            username='test-api',
 
            password='test',
 
            email='test@example.com',
 
            firstname='first',
 
            lastname='last'
 
            firstname=u'first',
 
            lastname=u'last'
 
        )
 
        Session().commit()
 
        cls.TEST_USER_LOGIN = cls.test_user.username
 
        cls.apikey_regular = cls.test_user.api_key
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        pass
 

	
 
    def setUp(self):
 
        self.maxDiff = None
 
        make_user_group()
 
        make_repo_group()
 

	
 
    def tearDown(self):
 
        fixture.destroy_user_group(TEST_USER_GROUP)
 
        fixture.destroy_gists()
 
        fixture.destroy_repo_group(TEST_REPO_GROUP)
 

	
 
    def _compare_ok(self, id_, expected, given):
 
        expected = jsonify({
 
            'id': id_,
 
            'error': None,
 
            'result': expected
 
        })
 
        given = json.loads(given)
 
        self.assertEqual(expected, given)
 

	
 
    def _compare_error(self, id_, expected, given):
 
        expected = jsonify({
 
            'id': id_,
 
            'error': expected,
 
            'result': None
 
        })
 
        given = json.loads(given)
 
        self.assertEqual(expected, given)
 

	
 
    def test_Optional_object(self):
 
        from kallithea.controllers.api.api import Optional
 

	
 
        option1 = Optional(None)
 
        self.assertEqual('<Optional:%s>' % None, repr(option1))
 
        self.assertEqual(option1(), None)
 

	
 
        self.assertEqual(1, Optional.extract(Optional(1)))
 
        self.assertEqual('trololo', Optional.extract('trololo'))
 

	
 
    def test_Optional_OAttr(self):
 
@@ -233,348 +233,348 @@ class _BaseTestApi(object):
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(dbuser=usr).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_that_does_not_exist(self):
 
        id_, params = _build_data(self.apikey, 'get_user',
 
                                  userid='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "user `%s` does not exist" % 'trololo'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_without_giving_userid(self):
 
        id_, params = _build_data(self.apikey, 'get_user')
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(dbuser=usr).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_without_giving_userid_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_user')
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(self.TEST_USER_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(dbuser=usr).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_with_giving_userid_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_user',
 
                                  userid=self.TEST_USER_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = 'userid is not the same as your user'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_pull(self):
 
        repo_name = 'test_pull'
 
        repo_name = u'test_pull'
 
        r = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        r.clone_uri = os.path.join(TESTS_TMP_PATH, self.REPO)
 
        Session.add(r)
 
        Session.commit()
 

	
 
        id_, params = _build_data(self.apikey, 'pull',
 
                                  repoid=repo_name,)
 
        response = api_call(self, params)
 

	
 
        expected = {'msg': 'Pulled from `%s`' % repo_name,
 
                    'repository': repo_name}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_pull_error(self):
 
        id_, params = _build_data(self.apikey, 'pull',
 
                                  repoid=self.REPO, )
 
        response = api_call(self, params)
 

	
 
        expected = 'Unable to pull changes from `%s`' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_rescan_repos(self):
 
        id_, params = _build_data(self.apikey, 'rescan_repos')
 
        response = api_call(self, params)
 

	
 
        expected = {'added': [], 'removed': []}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(ScmModel, 'repo_scan', crash)
 
    def test_api_rescann_error(self):
 
        id_, params = _build_data(self.apikey, 'rescan_repos', )
 
        response = api_call(self, params)
 

	
 
        expected = 'Error occurred during rescan repositories action'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_invalidate_cache(self):
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        repo.scm_instance_cached()  # seed cache
 

	
 
        id_, params = _build_data(self.apikey, 'invalidate_cache',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = {
 
            'msg': "Cache for repository `%s` was invalidated" % (self.REPO,),
 
            'repository': self.REPO
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(ScmModel, 'mark_for_invalidation', crash)
 
    def test_api_invalidate_cache_error(self):
 
        id_, params = _build_data(self.apikey, 'invalidate_cache',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = 'Error occurred during cache invalidation action'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_invalidate_cache_regular_user_no_permission(self):
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        repo.scm_instance_cached() # seed cache
 

	
 
        id_, params = _build_data(self.apikey_regular, 'invalidate_cache',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = "repository `%s` does not exist" % (self.REPO,)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_aquire(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  repoid=self.REPO,
 
                                  locked=True)
 
        response = api_call(self, params)
 
        expected = {
 
            'repo': self.REPO, 'locked': True,
 
            'locked_since': response.json['result']['locked_since'],
 
            'locked_by': TEST_USER_ADMIN_LOGIN,
 
            'lock_state_changed': True,
 
            'msg': ('User `%s` set lock state for repo `%s` to `%s`'
 
                    % (TEST_USER_ADMIN_LOGIN, self.REPO, True))
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_aquire_by_non_admin(self):
 
        repo_name = 'api_delete_me'
 
        repo_name = u'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
 
                            cur_user=self.TEST_USER_LOGIN)
 
        try:
 
            id_, params = _build_data(self.apikey_regular, 'lock',
 
                                      repoid=repo_name,
 
                                      locked=True)
 
            response = api_call(self, params)
 
            expected = {
 
                'repo': repo_name,
 
                'locked': True,
 
                'locked_since': response.json['result']['locked_since'],
 
                'locked_by': self.TEST_USER_LOGIN,
 
                'lock_state_changed': True,
 
                'msg': ('User `%s` set lock state for repo `%s` to `%s`'
 
                        % (self.TEST_USER_LOGIN, repo_name, True))
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_lock_repo_lock_aquire_non_admin_with_userid(self):
 
        repo_name = 'api_delete_me'
 
        repo_name = u'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
 
                            cur_user=self.TEST_USER_LOGIN)
 
        try:
 
            id_, params = _build_data(self.apikey_regular, 'lock',
 
                                      userid=TEST_USER_ADMIN_LOGIN,
 
                                      repoid=repo_name,
 
                                      locked=True)
 
            response = api_call(self, params)
 
            expected = 'userid is not the same as your user'
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_lock_repo_lock_aquire_non_admin_not_his_repo(self):
 
        id_, params = _build_data(self.apikey_regular, 'lock',
 
                                  repoid=self.REPO,
 
                                  locked=True)
 
        response = api_call(self, params)
 
        expected = 'repository `%s` does not exist' % (self.REPO)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_release(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  repoid=self.REPO,
 
                                  locked=False)
 
        response = api_call(self, params)
 
        expected = {
 
            'repo': self.REPO,
 
            'locked': False,
 
            'locked_since': None,
 
            'locked_by': TEST_USER_ADMIN_LOGIN,
 
            'lock_state_changed': True,
 
            'msg': ('User `%s` set lock state for repo `%s` to `%s`'
 
                    % (TEST_USER_ADMIN_LOGIN, self.REPO, False))
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_aquire_optional_userid(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  repoid=self.REPO,
 
                                  locked=True)
 
        response = api_call(self, params)
 
        time_ = response.json['result']['locked_since']
 
        expected = {
 
            'repo': self.REPO,
 
            'locked': True,
 
            'locked_since': time_,
 
            'locked_by': TEST_USER_ADMIN_LOGIN,
 
            'lock_state_changed': True,
 
            'msg': ('User `%s` set lock state for repo `%s` to `%s`'
 
                    % (TEST_USER_ADMIN_LOGIN, self.REPO, True))
 
        }
 

	
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_optional_locked(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 
        time_ = response.json['result']['locked_since']
 
        expected = {
 
            'repo': self.REPO,
 
            'locked': True,
 
            'locked_since': time_,
 
            'locked_by': TEST_USER_ADMIN_LOGIN,
 
            'lock_state_changed': False,
 
            'msg': ('Repo `%s` locked by `%s` on `%s`.'
 
                    % (self.REPO, TEST_USER_ADMIN_LOGIN,
 
                       json.dumps(time_to_datetime(time_))))
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_optional_not_locked(self):
 
        repo_name = 'api_not_locked'
 
        repo_name = u'api_not_locked'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
 
                            cur_user=self.TEST_USER_LOGIN)
 
        self.assertEqual(repo.locked, [None, None])
 
        try:
 
            id_, params = _build_data(self.apikey, 'lock',
 
                                      repoid=repo.repo_id)
 
            response = api_call(self, params)
 
            expected = {
 
                'repo': repo_name,
 
                'locked': False,
 
                'locked_since': None,
 
                'locked_by': None,
 
                'lock_state_changed': False,
 
                'msg': ('Repo `%s` not locked.' % (repo_name,))
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    @mock.patch.object(Repository, 'lock', crash)
 
    def test_api_lock_error(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  repoid=self.REPO,
 
                                  locked=True)
 
        response = api_call(self, params)
 

	
 
        expected = 'Error occurred locking repository `%s`' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_locks_regular_user(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_locks')
 
        response = api_call(self, params)
 
        expected = []
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_locks_with_userid_regular_user(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_locks',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 
        expected = 'userid is not the same as your user'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_locks(self):
 
        id_, params = _build_data(self.apikey, 'get_locks')
 
        response = api_call(self, params)
 
        expected = []
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_locks_with_one_locked_repo(self):
 
        repo_name = 'api_delete_me'
 
        repo_name = u'api_delete_me'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
 
                                   cur_user=self.TEST_USER_LOGIN)
 
        Repository.lock(repo, User.get_by_username(self.TEST_USER_LOGIN).user_id)
 
        try:
 
            id_, params = _build_data(self.apikey, 'get_locks')
 
            response = api_call(self, params)
 
            expected = [repo.get_api_data()]
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_get_locks_with_one_locked_repo_for_specific_user(self):
 
        repo_name = 'api_delete_me'
 
        repo_name = u'api_delete_me'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
 
                                   cur_user=self.TEST_USER_LOGIN)
 
        Repository.lock(repo, User.get_by_username(self.TEST_USER_LOGIN).user_id)
 
        try:
 
            id_, params = _build_data(self.apikey, 'get_locks',
 
                                      userid=self.TEST_USER_LOGIN)
 
            response = api_call(self, params)
 
            expected = [repo.get_api_data()]
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_get_locks_with_userid(self):
 
        id_, params = _build_data(self.apikey, 'get_locks',
 
                                  userid=TEST_USER_REGULAR_LOGIN)
 
        response = api_call(self, params)
 
        expected = []
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_create_existing_user(self):
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=TEST_USER_ADMIN_LOGIN,
 
                                  email='test@example.com',
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "user `%s` already exist" % TEST_USER_ADMIN_LOGIN
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_user_with_existing_email(self):
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=TEST_USER_ADMIN_LOGIN + 'new',
 
                                  email=TEST_USER_REGULAR_EMAIL,
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "email `%s` already exist" % TEST_USER_REGULAR_EMAIL
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_user(self):
 
        username = 'test_new_api_user'
 
        email = username + "@example.com"
 

	
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=username,
 
                                  email=email,
 
                                  password='trololo')
 
        response = api_call(self, params)
 
@@ -721,97 +721,97 @@ class _BaseTestApi(object):
 

	
 
        response = api_call(self, params)
 
        ret = {
 
            'msg': 'updated user ID:%s %s' % (
 
                usr.user_id, TEST_USER_ADMIN_LOGIN),
 
            'user': ret
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_update_user_by_user_id(self):
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = jsonify(usr.get_api_data())
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=usr.user_id)
 

	
 
        response = api_call(self, params)
 
        ret = {
 
            'msg': 'updated user ID:%s %s' % (
 
                usr.user_id, TEST_USER_ADMIN_LOGIN),
 
            'user': ret
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_update_user_default_user(self):
 
        usr = User.get_default_user()
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=usr.user_id)
 

	
 
        response = api_call(self, params)
 
        expected = 'editing default user is forbidden'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserModel, 'update_user', crash)
 
    def test_api_update_user_when_exception_happens(self):
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = jsonify(usr.get_api_data())
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=usr.user_id)
 

	
 
        response = api_call(self, params)
 
        ret = 'failed to update user `%s`' % usr.user_id
 

	
 
        expected = ret
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo(self):
 
        new_group = 'some_new_group'
 
        new_group = u'some_new_group'
 
        make_user_group(new_group)
 
        RepoModel().grant_user_group_permission(repo=self.REPO,
 
                                                group_name=new_group,
 
                                                perm='repository.read')
 
        Session().commit()
 
        id_, params = _build_data(self.apikey, 'get_repo',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        ret = repo.get_api_data()
 

	
 
        members = []
 
        followers = []
 
        for user in repo.repo_to_perm:
 
            perm = user.permission.permission_name
 
            user = user.user
 
            user_data = {'name': user.username, 'type': "user",
 
                         'permission': perm}
 
            members.append(user_data)
 

	
 
        for user_group in repo.users_group_to_perm:
 
            perm = user_group.permission.permission_name
 
            user_group = user_group.users_group
 
            user_group_data = {'name': user_group.users_group_name,
 
                               'type': "user_group", 'permission': perm}
 
            members.append(user_group_data)
 

	
 
        for user in repo.followers:
 
            followers.append(user.user.get_api_data())
 

	
 
        ret['members'] = members
 
        ret['followers'] = followers
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_user_group(new_group)
 

	
 
    @parameterized.expand([
 
        ('repository.admin',),
 
        ('repository.write',),
 
        ('repository.read',),
 
    ])
 
    def test_api_get_repo_by_non_admin(self, grant_perm):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm=grant_perm)
 
        Session().commit()
 
@@ -930,817 +930,817 @@ class _BaseTestApi(object):
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path, )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to get repo: `%s` nodes' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_ret_type(self):
 
        rev = 'tip'
 
        path = '/'
 
        ret_type = 'error'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        expected = ('ret_type must be one of %s'
 
                    % (','.join(['files', 'dirs', 'all'])))
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([('all', 'all', 'repository.write'),
 
                           ('dirs', 'dirs', 'repository.admin'),
 
                           ('files', 'files', 'repository.read'), ])
 
    def test_api_get_repo_nodes_by_regular_user(self, name, ret_type, grant_perm):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm=grant_perm)
 
        Session().commit()
 

	
 
        rev = 'tip'
 
        path = '/'
 
        id_, params = _build_data(self.apikey_regular, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        # we don't the actual return types here since it's tested somewhere
 
        # else
 
        expected = response.json['result']
 
        try:
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
 

	
 
    def test_api_create_repo(self):
 
        repo_name = 'api-repo'
 
        repo_name = u'api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        self.assertNotEqual(repo, None)
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_and_repo_group(self):
 
        repo_name = 'my_gr/api-repo'
 
        repo_name = u'my_gr/api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        print params
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        self.assertNotEqual(repo, None)
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 
        fixture.destroy_repo_group('my_gr')
 
        fixture.destroy_repo_group(u'my_gr')
 

	
 
    def test_api_create_repo_in_repo_group_without_permission(self):
 
        repo_group_name = '%s/api-repo-repo' % TEST_REPO_GROUP
 
        repo_name = '%s/api-repo' % repo_group_name
 
        repo_group_name = u'%s/api-repo-repo' % TEST_REPO_GROUP
 
        repo_name = u'%s/api-repo' % repo_group_name
 

	
 
        rg = fixture.create_repo_group(repo_group_name)
 
        Session().commit()
 
        RepoGroupModel().grant_user_permission(repo_group_name,
 
                                               self.TEST_USER_LOGIN,
 
                                               'group.none')
 
        Session().commit()
 

	
 
        id_, params = _build_data(self.apikey_regular, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        # Current result when API access control is different from Web:
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
        # Expected and arguably more correct result:
 
        #expected = 'failed to create repository `%s`' % repo_name
 
        #self._compare_error(id_, expected, given=response.body)
 

	
 
        fixture.destroy_repo_group(repo_group_name)
 

	
 
    def test_api_create_repo_unknown_owner(self):
 
        repo_name = 'api-repo'
 
        repo_name = u'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=owner,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 
        expected = 'user `%s` does not exist' % owner
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_repo_dont_specify_owner(self):
 
        repo_name = 'api-repo'
 
        repo_name = u'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        self.assertNotEqual(repo, None)
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_by_non_admin(self):
 
        repo_name = 'api-repo'
 
        repo_name = u'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey_regular, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        self.assertNotEqual(repo, None)
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_by_non_admin_specify_owner(self):
 
        repo_name = 'api-repo'
 
        repo_name = u'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey_regular, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
                                  owner=owner)
 
        response = api_call(self, params)
 

	
 
        expected = 'Only Kallithea admin can specify `owner` param'
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_exists(self):
 
        repo_name = self.REPO
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = "repo `%s` already exist" % repo_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'create', crash)
 
    def test_api_create_repo_exception_occurred(self):
 
        repo_name = 'api-repo'
 
        repo_name = u'api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = 'failed to create repository `%s`' % repo_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([
 
        ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
 
        ('description', {'description': 'new description'}),
 
        ('description', {'description': u'new description'}),
 
        ('active', {'active': True}),
 
        ('active', {'active': False}),
 
        ('clone_uri', {'clone_uri': 'http://example.com/repo'}),
 
        ('clone_uri', {'clone_uri': None}),
 
        ('landing_rev', {'landing_rev': 'branch:master'}),
 
        ('enable_statistics', {'enable_statistics': True}),
 
        ('enable_locking', {'enable_locking': True}),
 
        ('enable_downloads', {'enable_downloads': True}),
 
        ('name', {'name': 'new_repo_name'}),
 
        ('repo_group', {'group': 'test_group_for_update'}),
 
        ('name', {'name': u'new_repo_name'}),
 
        ('repo_group', {'group': u'test_group_for_update'}),
 
    ])
 
    def test_api_update_repo(self, changing_attr, updates):
 
        repo_name = 'api_update_me'
 
        repo_name = u'api_update_me'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        if changing_attr == 'repo_group':
 
            fixture.create_repo_group(updates['group'])
 

	
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        if changing_attr == 'name':
 
            repo_name = updates['name']
 
        if changing_attr == 'repo_group':
 
            repo_name = '/'.join([updates['group'], repo_name])
 
            repo_name = u'/'.join([updates['group'], repo_name])
 
        try:
 
            expected = {
 
                'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
 
                'repository': repo.get_api_data()
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            if changing_attr == 'repo_group':
 
                fixture.destroy_repo_group(updates['group'])
 

	
 
    def test_api_update_repo_repo_group_does_not_exist(self):
 
        repo_name = 'admin_owned'
 
        repo_name = u'admin_owned'
 
        fixture.create_repo(repo_name)
 
        updates = {'group': 'test_group_for_update'}
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'repository group `%s` does not exist' % updates['group']
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_update_repo_regular_user_not_allowed(self):
 
        repo_name = 'admin_owned'
 
        repo_name = u'admin_owned'
 
        fixture.create_repo(repo_name)
 
        updates = {'active': False}
 
        id_, params = _build_data(self.apikey_regular, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'repository `%s` does not exist' % repo_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    @mock.patch.object(RepoModel, 'update', crash)
 
    def test_api_update_repo_exception_occurred(self):
 
        repo_name = 'api_update_me'
 
        repo_name = u'api_update_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, owner=TEST_USER_ADMIN_LOGIN,)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'failed to update repo `%s`' % repo_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_update_repo_regular_user_change_repo_name(self):
 
        repo_name = 'admin_owned'
 
        new_repo_name = 'new_repo_name'
 
        repo_name = u'admin_owned'
 
        new_repo_name = u'new_repo_name'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        RepoModel().grant_user_permission(repo=repo_name,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.admin')
 
        UserModel().revoke_perm('default', 'hg.create.repository')
 
        UserModel().grant_perm('default', 'hg.create.none')
 
        updates = {'name': new_repo_name}
 
        id_, params = _build_data(self.apikey_regular, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'no permission to create (or move) repositories'
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            fixture.destroy_repo(new_repo_name)
 

	
 
    def test_api_update_repo_regular_user_change_repo_name_allowed(self):
 
        repo_name = 'admin_owned'
 
        new_repo_name = 'new_repo_name'
 
        repo_name = u'admin_owned'
 
        new_repo_name = u'new_repo_name'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        RepoModel().grant_user_permission(repo=repo_name,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.admin')
 
        UserModel().revoke_perm('default', 'hg.create.none')
 
        UserModel().grant_perm('default', 'hg.create.repository')
 
        updates = {'name': new_repo_name}
 
        id_, params = _build_data(self.apikey_regular, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = {
 
                'msg': 'updated repo ID:%s %s' % (repo.repo_id, new_repo_name),
 
                'repository': repo.get_api_data()
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            fixture.destroy_repo(new_repo_name)
 

	
 
    def test_api_update_repo_regular_user_change_owner(self):
 
        repo_name = 'admin_owned'
 
        repo_name = u'admin_owned'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        RepoModel().grant_user_permission(repo=repo_name,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.admin')
 
        updates = {'owner': TEST_USER_ADMIN_LOGIN}
 
        id_, params = _build_data(self.apikey_regular, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'Only Kallithea admin can specify `owner` param'
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo(self):
 
        repo_name = 'api_delete_me'
 
        repo_name = u'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 

	
 
        id_, params = _build_data(self.apikey, 'delete_repo',
 
                                  repoid=repo_name, )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Deleted repository `%s`' % repo_name,
 
            'success': True
 
        }
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo_by_non_admin(self):
 
        repo_name = 'api_delete_me'
 
        repo_name = u'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
 
                            cur_user=self.TEST_USER_LOGIN)
 
        id_, params = _build_data(self.apikey_regular, 'delete_repo',
 
                                  repoid=repo_name, )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Deleted repository `%s`' % repo_name,
 
            'success': True
 
        }
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo_by_non_admin_no_permission(self):
 
        repo_name = 'api_delete_me'
 
        repo_name = u'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        try:
 
            id_, params = _build_data(self.apikey_regular, 'delete_repo',
 
                                      repoid=repo_name, )
 
            response = api_call(self, params)
 
            expected = 'repository `%s` does not exist' % (repo_name)
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo_exception_occurred(self):
 
        repo_name = 'api_delete_me'
 
        repo_name = u'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        try:
 
            with mock.patch.object(RepoModel, 'delete', crash):
 
                id_, params = _build_data(self.apikey, 'delete_repo',
 
                                          repoid=repo_name, )
 
                response = api_call(self, params)
 

	
 
                expected = 'failed to delete repository `%s`' % repo_name
 
                self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_fork_repo(self):
 
        fork_name = 'api-repo-fork'
 
        fork_name = u'api-repo-fork'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
 
                                                     fork_name),
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_non_admin(self):
 
        fork_name = 'api-repo-fork'
 
        fork_name = u'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
        )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
 
                                                     fork_name),
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_non_admin_specify_owner(self):
 
        fork_name = 'api-repo-fork'
 
        fork_name = u'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 
        expected = 'Only Kallithea admin can specify `owner` param'
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_non_admin_no_permission_to_fork(self):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.none')
 
        fork_name = 'api-repo-fork'
 
        fork_name = u'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
        )
 
        response = api_call(self, params)
 
        expected = 'repository `%s` does not exist' % (self.REPO)
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    @parameterized.expand([('read', 'repository.read'),
 
                           ('write', 'repository.write'),
 
                           ('admin', 'repository.admin')])
 
    def test_api_fork_repo_non_admin_no_create_repo_permission(self, name, perm):
 
        fork_name = 'api-repo-fork'
 
        fork_name = u'api-repo-fork'
 
        # regardless of base repository permission, forking is disallowed
 
        # when repository creation is disabled
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm=perm)
 
        UserModel().revoke_perm('default', 'hg.create.repository')
 
        UserModel().grant_perm('default', 'hg.create.none')
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
        )
 
        response = api_call(self, params)
 
        expected = 'no permission to create repositories'
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_unknown_owner(self):
 
        fork_name = 'api-repo-fork'
 
        fork_name = u'api-repo-fork'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=owner,
 
        )
 
        response = api_call(self, params)
 
        expected = 'user `%s` does not exist' % owner
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_fork_repo_fork_exists(self):
 
        fork_name = 'api-repo-fork'
 
        fork_name = u'api-repo-fork'
 
        fixture.create_fork(self.REPO, fork_name)
 

	
 
        try:
 
            fork_name = 'api-repo-fork'
 
            fork_name = u'api-repo-fork'
 

	
 
            id_, params = _build_data(self.apikey, 'fork_repo',
 
                                      repoid=self.REPO,
 
                                      fork_name=fork_name,
 
                                      owner=TEST_USER_ADMIN_LOGIN,
 
            )
 
            response = api_call(self, params)
 

	
 
            expected = "fork `%s` already exist" % fork_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_repo_exists(self):
 
        fork_name = self.REPO
 

	
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        expected = "repo `%s` already exist" % fork_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'create_fork', crash)
 
    def test_api_fork_repo_exception_occurred(self):
 
        fork_name = 'api-repo-fork'
 
        fork_name = u'api-repo-fork'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to fork repository `%s` as `%s`' % (self.REPO,
 
                                                               fork_name)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_group(self):
 
        id_, params = _build_data(self.apikey, 'get_user_group',
 
                                  usergroupid=TEST_USER_GROUP)
 
        response = api_call(self, params)
 

	
 
        user_group = UserGroupModel().get_group(TEST_USER_GROUP)
 
        members = []
 
        for user in user_group.members:
 
            user = user.user
 
            members.append(user.get_api_data())
 

	
 
        ret = user_group.get_api_data()
 
        ret['members'] = members
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_groups(self):
 
        gr_name = 'test_user_group2'
 
        gr_name = u'test_user_group2'
 
        make_user_group(gr_name)
 

	
 
        id_, params = _build_data(self.apikey, 'get_user_groups', )
 
        response = api_call(self, params)
 

	
 
        try:
 
            expected = []
 
            for gr_name in [TEST_USER_GROUP, 'test_user_group2']:
 
            for gr_name in [TEST_USER_GROUP, u'test_user_group2']:
 
                user_group = UserGroupModel().get_group(gr_name)
 
                ret = user_group.get_api_data()
 
                expected.append(ret)
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_create_user_group(self):
 
        group_name = 'some_new_group'
 
        group_name = u'some_new_group'
 
        id_, params = _build_data(self.apikey, 'create_user_group',
 
                                  group_name=group_name)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'created new user group `%s`' % group_name,
 
            'user_group': jsonify(UserGroupModel() \
 
                .get_by_name(group_name) \
 
                .get_api_data())
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        fixture.destroy_user_group(group_name)
 

	
 
    def test_api_get_user_group_that_exist(self):
 
        id_, params = _build_data(self.apikey, 'create_user_group',
 
                                  group_name=TEST_USER_GROUP)
 
        response = api_call(self, params)
 

	
 
        expected = "user group `%s` already exist" % TEST_USER_GROUP
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserGroupModel, 'create', crash)
 
    def test_api_get_user_group_exception_occurred(self):
 
        group_name = 'exception_happens'
 
        group_name = u'exception_happens'
 
        id_, params = _build_data(self.apikey, 'create_user_group',
 
                                  group_name=group_name)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to create group `%s`' % group_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([('group_name', {'group_name': 'new_group_name'}),
 
                           ('group_name', {'group_name': 'test_group_for_update'}),
 
    @parameterized.expand([('group_name', {'group_name': u'new_group_name'}),
 
                           ('group_name', {'group_name': u'test_group_for_update'}),
 
                           ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
 
                           ('active', {'active': False}),
 
                           ('active', {'active': True})])
 
    def test_api_update_user_group(self, changing_attr, updates):
 
        gr_name = 'test_group_for_update'
 
        gr_name = u'test_group_for_update'
 
        user_group = fixture.create_user_group(gr_name)
 
        id_, params = _build_data(self.apikey, 'update_user_group',
 
                                  usergroupid=gr_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = {
 
               'msg': 'updated user group ID:%s %s' % (user_group.users_group_id,
 
                                                     user_group.users_group_name),
 
               'user_group': user_group.get_api_data()
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            if changing_attr == 'group_name':
 
                # switch to updated name for proper cleanup
 
                gr_name = updates['group_name']
 
            fixture.destroy_user_group(gr_name)
 

	
 
    @mock.patch.object(UserGroupModel, 'update', crash)
 
    def test_api_update_user_group_exception_occurred(self):
 
        gr_name = 'test_group'
 
        gr_name = u'test_group'
 
        fixture.create_user_group(gr_name)
 
        id_, params = _build_data(self.apikey, 'update_user_group',
 
                                  usergroupid=gr_name)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'failed to update user group `%s`' % gr_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_add_user_to_user_group(self):
 
        gr_name = 'test_group'
 
        gr_name = u'test_group'
 
        fixture.create_user_group(gr_name)
 
        id_, params = _build_data(self.apikey, 'add_user_to_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 
        try:
 
            expected = {
 
            'msg': 'added member `%s` to user group `%s`' % (
 
                    TEST_USER_ADMIN_LOGIN, gr_name),
 
            'success': True
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_add_user_to_user_group_that_doesnt_exist(self):
 
        id_, params = _build_data(self.apikey, 'add_user_to_user_group',
 
                                  usergroupid='false-group',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = 'user group `%s` does not exist' % 'false-group'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserGroupModel, 'add_user_to_group', crash)
 
    def test_api_add_user_to_user_group_exception_occurred(self):
 
        gr_name = 'test_group'
 
        gr_name = u'test_group'
 
        fixture.create_user_group(gr_name)
 
        id_, params = _build_data(self.apikey, 'add_user_to_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        try:
 
            expected = 'failed to add member to user group `%s`' % gr_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_remove_user_from_user_group(self):
 
        gr_name = 'test_group_3'
 
        gr_name = u'test_group_3'
 
        gr = fixture.create_user_group(gr_name)
 
        UserGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 
        id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        try:
 
            expected = {
 
                'msg': 'removed member `%s` from user group `%s`' % (
 
                    TEST_USER_ADMIN_LOGIN, gr_name
 
                ),
 
                'success': True}
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    @mock.patch.object(UserGroupModel, 'remove_user_from_group', crash)
 
    def test_api_remove_user_from_user_group_exception_occurred(self):
 
        gr_name = 'test_group_3'
 
        gr_name = u'test_group_3'
 
        gr = fixture.create_user_group(gr_name)
 
        UserGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 
        id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'failed to remove member from user group `%s`' % gr_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_delete_user_group(self):
 
        gr_name = 'test_group'
 
        gr_name = u'test_group'
 
        ugroup = fixture.create_user_group(gr_name)
 
        gr_id = ugroup.users_group_id
 
        id_, params = _build_data(self.apikey, 'delete_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        try:
 
            expected = {
 
                'user_group': None,
 
                'msg': 'deleted user group ID:%s %s' % (gr_id, gr_name)
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            if UserGroupModel().get_by_name(gr_name):
 
                fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_delete_user_group_that_is_assigned(self):
 
        gr_name = 'test_group'
 
        gr_name = u'test_group'
 
        ugroup = fixture.create_user_group(gr_name)
 
        gr_id = ugroup.users_group_id
 

	
 
        ugr_to_perm = RepoModel().grant_user_group_permission(self.REPO, gr_name, 'repository.write')
 
        msg = 'User Group assigned to %s' % ugr_to_perm.repository.repo_name
 

	
 
        id_, params = _build_data(self.apikey, 'delete_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        try:
 
            expected = msg
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            if UserGroupModel().get_by_name(gr_name):
 
                fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_delete_user_group_exception_occurred(self):
 
        gr_name = 'test_group'
 
        gr_name = u'test_group'
 
        ugroup = fixture.create_user_group(gr_name)
 
        gr_id = ugroup.users_group_id
 
        id_, params = _build_data(self.apikey, 'delete_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 

	
 
        try:
 
            with mock.patch.object(UserGroupModel, 'delete', crash):
 
                response = api_call(self, params)
 
                expected = 'failed to delete user group ID:%s %s' % (gr_id, gr_name)
 
                self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    @parameterized.expand([('none', 'repository.none'),
 
                           ('read', 'repository.read'),
 
                           ('write', 'repository.write'),
 
                           ('admin', 'repository.admin')])
 
    def test_api_grant_user_permission(self, name, perm):
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Granted perm: `%s` for user: `%s` in repo: `%s`' % (
 
                perm, TEST_USER_ADMIN_LOGIN, self.REPO
 
            ),
 
            'success': True
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_grant_user_permission_wrong_permission(self):
 
        perm = 'haha.no.permission'
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        expected = 'permission `%s` does not exist' % perm
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'grant_user_permission', crash)
kallithea/tests/fixture.py
Show inline comments
 
@@ -31,146 +31,146 @@ from kallithea.lib.vcs.backends.base imp
 
dn = os.path.dirname
 
FIXTURES = os.path.join(dn(dn(os.path.abspath(__file__))), 'tests', 'fixtures')
 

	
 

	
 
def error_function(*args, **kwargs):
 
    raise Exception('Total Crash !')
 

	
 

	
 
class Fixture(object):
 

	
 
    def __init__(self):
 
        pass
 

	
 
    def anon_access(self, status):
 
        """
 
        Context manager for controlling anonymous access.
 
        Anon access will be set and committed, but restored again when exiting the block.
 

	
 
        Usage:
 

	
 
        fixture = Fixture()
 
        with fixture.anon_access(False):
 
            stuff
 
        """
 

	
 
        class context(object):
 
            def __enter__(self):
 
                anon = User.get_default_user()
 
                self._before = anon.active
 
                anon.active = status
 
                Session().add(anon)
 
                Session().commit()
 
                time.sleep(1.5)  # hack: wait for beaker sql_cache_short to expire
 

	
 
            def __exit__(self, exc_type, exc_val, exc_tb):
 
                anon = User.get_default_user()
 
                anon.active = self._before
 
                Session().add(anon)
 
                Session().commit()
 

	
 
        return context()
 

	
 
    def _get_repo_create_params(self, **custom):
 
        defs = dict(
 
            repo_name=None,
 
            repo_type='hg',
 
            clone_uri='',
 
            repo_group=u'-1',
 
            repo_description='DESC',
 
            repo_description=u'DESC',
 
            repo_private=False,
 
            repo_landing_rev='rev:tip',
 
            repo_copy_permissions=False,
 
            repo_state=Repository.STATE_CREATED,
 
        )
 
        defs.update(custom)
 
        if 'repo_name_full' not in custom:
 
            defs.update({'repo_name_full': defs['repo_name']})
 

	
 
        # fix the repo name if passed as repo_name_full
 
        if defs['repo_name']:
 
            defs['repo_name'] = defs['repo_name'].split('/')[-1]
 

	
 
        return defs
 

	
 
    def _get_group_create_params(self, **custom):
 
        defs = dict(
 
            group_name=None,
 
            group_description='DESC',
 
            group_description=u'DESC',
 
            group_parent_id=None,
 
            perms_updates=[],
 
            perms_new=[],
 
            enable_locking=False,
 
            recursive=False
 
        )
 
        defs.update(custom)
 

	
 
        return defs
 

	
 
    def _get_user_create_params(self, name, **custom):
 
        defs = dict(
 
            username=name,
 
            password='qweqwe',
 
            email='%s+test@example.com' % name,
 
            firstname='TestUser',
 
            lastname='Test',
 
            firstname=u'TestUser',
 
            lastname=u'Test',
 
            active=True,
 
            admin=False,
 
            extern_type='internal',
 
            extern_name=None
 
        )
 
        defs.update(custom)
 

	
 
        return defs
 

	
 
    def _get_user_group_create_params(self, name, **custom):
 
        defs = dict(
 
            users_group_name=name,
 
            user_group_description='DESC',
 
            user_group_description=u'DESC',
 
            users_group_active=True,
 
            user_group_data={},
 
        )
 
        defs.update(custom)
 

	
 
        return defs
 

	
 
    def create_repo(self, name, **kwargs):
 
        if 'skip_if_exists' in kwargs:
 
            del kwargs['skip_if_exists']
 
            r = Repository.get_by_repo_name(name)
 
            if r:
 
                return r
 

	
 
        if isinstance(kwargs.get('repo_group'), RepoGroup):
 
            kwargs['repo_group'] = kwargs['repo_group'].group_id
 

	
 
        form_data = self._get_repo_create_params(repo_name=name, **kwargs)
 
        cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
 
        RepoModel().create(form_data, cur_user)
 
        Session().commit()
 
        return Repository.get_by_repo_name(name)
 

	
 
    def create_fork(self, repo_to_fork, fork_name, **kwargs):
 
        repo_to_fork = Repository.get_by_repo_name(repo_to_fork)
 

	
 
        form_data = self._get_repo_create_params(repo_name=fork_name,
 
                                            fork_parent_id=repo_to_fork,
 
                                            repo_type=repo_to_fork.repo_type,
 
                                            **kwargs)
 
        form_data['update_after_clone'] = False
 

	
 
        #TODO: fix it !!
 
        form_data['description'] = form_data['repo_description']
 
        form_data['private'] = form_data['repo_private']
 
        form_data['landing_rev'] = form_data['repo_landing_rev']
 

	
 
        owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
 
        RepoModel().create_fork(form_data, cur_user=owner)
 
        Session().commit()
 
        r = Repository.get_by_repo_name(fork_name)
 
        assert r
 
        return r
 

	
 
    def destroy_repo(self, repo_name, **kwargs):
 
        RepoModel().delete(repo_name, **kwargs)
 
        Session().commit()
 

	
kallithea/tests/functional/test_admin_notifications.py
Show inline comments
 
from kallithea.tests import *
 
from kallithea.model.db import User
 

	
 
from kallithea.model.user import UserModel
 
from kallithea.model.notification import NotificationModel
 
from kallithea.model.meta import Session
 
from kallithea.lib import helpers as h
 

	
 

	
 
class TestNotificationsController(TestController):
 
    def setUp(self):
 
        self.remove_all_notifications()
 

	
 
    def test_index(self):
 
        self.log_user()
 

	
 
        u1 = UserModel().create_or_update(username='u1', password='qweqwe',
 
                                          email='u1@example.com',
 
                                          firstname='u1', lastname='u1')
 
                                          firstname=u'u1', lastname=u'u1')
 
        u1 = u1.user_id
 

	
 
        response = self.app.get(url('notifications'))
 
        response.mustcontain('<div class="table">No notifications here yet</div>')
 

	
 
        cur_user = self._get_logged_user()
 
        notif = NotificationModel().create(created_by=u1, subject=u'test_notification_1',
 
                                           body=u'notification_1', recipients=[cur_user])
 
        Session().commit()
 
        response = self.app.get(url('notifications'))
 
        response.mustcontain('id="notification_%s"' % notif.notification_id)
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 

	
 
        u1 = UserModel().create_or_update(username='u1', password='qweqwe',
 
                                               email='u1@example.com',
 
                                               firstname='u1', lastname='u1')
 
                                               firstname=u'u1', lastname=u'u1')
 
        u2 = UserModel().create_or_update(username='u2', password='qweqwe',
 
                                               email='u2@example.com',
 
                                               firstname='u2', lastname='u2')
 
                                               firstname=u'u2', lastname=u'u2')
 

	
 
        # make notifications
 
        notification = NotificationModel().create(created_by=cur_user,
 
                                                  subject=u'test',
 
                                                  body=u'hi there',
 
                                                  recipients=[cur_user, u1, u2])
 
        Session().commit()
 
        u1 = User.get(u1.user_id)
 
        u2 = User.get(u2.user_id)
 

	
 
        # check DB
 
        get_notif = lambda un: [x.notification for x in un]
 
        self.assertEqual(get_notif(cur_user.notifications), [notification])
 
        self.assertEqual(get_notif(u1.notifications), [notification])
 
        self.assertEqual(get_notif(u2.notifications), [notification])
 
        cur_usr_id = cur_user.user_id
 

	
 
        response = self.app.post(
 
            url('notification', notification_id=notification.notification_id),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
        self.assertEqual(response.body, 'ok')
 

	
 
        cur_user = User.get(cur_usr_id)
 
        self.assertEqual(cur_user.notifications, [])
 

	
 
    def test_show(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 
        u1 = UserModel().create_or_update(username='u1', password='qweqwe',
 
                                          email='u1@example.com',
 
                                          firstname='u1', lastname='u1')
 
                                          firstname=u'u1', lastname=u'u1')
 
        u2 = UserModel().create_or_update(username='u2', password='qweqwe',
 
                                          email='u2@example.com',
 
                                          firstname='u2', lastname='u2')
 
                                          firstname=u'u2', lastname=u'u2')
 

	
 
        subject = u'test'
 
        notif_body = u'hi there'
 
        notification = NotificationModel().create(created_by=cur_user,
 
                                                  subject=subject,
 
                                                  body=notif_body,
 
                                                  recipients=[cur_user, u1, u2])
 

	
 
        response = self.app.get(url('notification',
 
                                    notification_id=notification.notification_id))
 

	
 
        response.mustcontain(subject)
 
        response.mustcontain(notif_body)
 

	
 
    def test_description_with_age(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 
        subject = u'test'
 
        notify_body = u'hi there'
 
        notification = NotificationModel().create(created_by = cur_user,
 
                                                  subject    = subject,
 
                                                  body       = notify_body)
 

	
 
        description = NotificationModel().make_description(notification)
 
        self.assertEqual(
 
            description,
 
            "{0} sent message {1}".format(
 
                cur_user.username,
 
                h.age(notification.created_on)
 
                )
 
            )
 

	
 
    def test_description_with_datetime(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 
        subject = u'test'
 
        notify_body = u'hi there'
 
        notification = NotificationModel().create(created_by = cur_user,
 
                                                  subject    = subject,
 
                                                  body       = notify_body)
 

	
 
        description = NotificationModel().make_description(notification, False)
 
        self.assertEqual(
 
            description,
 
            "{0} sent message at {1}".format(
 
                cur_user.username,
 
                h.fmt_date(notification.created_on)
 
                )
kallithea/tests/functional/test_admin_repos.py
Show inline comments
 
@@ -7,377 +7,377 @@ import urllib
 
from kallithea.lib import vcs
 
from kallithea.lib.utils2 import safe_str, safe_unicode
 
from kallithea.model.db import Repository, RepoGroup, UserRepoToPerm, User, \
 
    Permission
 
from kallithea.model.user import UserModel
 
from kallithea.tests import *
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.meta import Session
 
from kallithea.tests.fixture import Fixture, error_function
 

	
 
fixture = Fixture()
 

	
 

	
 
def _get_permission_for_user(user, repo):
 
    perm = UserRepoToPerm.query() \
 
                .filter(UserRepoToPerm.repository ==
 
                        Repository.get_by_repo_name(repo)) \
 
                .filter(UserRepoToPerm.user == User.get_by_username(user)) \
 
                .all()
 
    return perm
 

	
 

	
 
class _BaseTest(object):
 
    """
 
    Write all tests here
 
    """
 
    REPO = None
 
    REPO_TYPE = None
 
    NEW_REPO = None
 
    OTHER_TYPE_REPO = None
 
    OTHER_TYPE = None
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        pass
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        pass
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('repos'))
 

	
 
    def test_create(self):
 
        self.log_user()
 
        repo_name = self.NEW_REPO
 
        description = 'description for newly created repo'
 
        description = u'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                _authentication_token=self.authentication_token()))
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name))
 
        self.assertEqual(response.json, {u'result': True})
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name, repo_name))
 

	
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name))
 
        response.mustcontain(repo_name)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(TESTS_TMP_PATH, repo_name)))
 
        except vcs.exceptions.VCSError:
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
        RepoModel().delete(repo_name)
 
        Session().commit()
 

	
 
    def test_create_in_group(self):
 
        self.log_user()
 

	
 
        ## create GROUP
 
        group_name = 'sometest_%s' % self.REPO_TYPE
 
        group_name = u'sometest_%s' % self.REPO_TYPE
 
        gr = RepoGroupModel().create(group_name=group_name,
 
                                     group_description='test',
 
                                     group_description=u'test',
 
                                     owner=TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 

	
 
        repo_name = 'ingroup'
 
        repo_name = u'ingroup'
 
        repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
 
        description = 'description for newly created repo'
 
        description = u'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                repo_group=gr.group_id,
 
                                                _authentication_token=self.authentication_token()))
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
 
        self.assertEqual(response.json, {u'result': True})
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name_full, repo_name_full))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name_full).one()
 
        new_repo_id = new_repo.repo_id
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_full)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        self.assertEqual(len(inherited_perms), 1)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(TESTS_TMP_PATH, repo_name_full)))
 
        except vcs.exceptions.VCSError:
 
            RepoGroupModel().delete(group_name)
 
            Session().commit()
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
        RepoModel().delete(repo_name_full)
 
        RepoGroupModel().delete(group_name)
 
        Session().commit()
 

	
 
    def test_create_in_group_without_needed_permissions(self):
 
        usr = self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
 
        # avoid spurious RepoGroup DetachedInstanceError ...
 
        authentication_token = self.authentication_token()
 
        # revoke
 
        user_model = UserModel()
 
        # disable fork and create on default user
 
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
 
        user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
 
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
 
        user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
 

	
 
        # disable on regular user
 
        user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
 
        user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
 
        user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
 
        user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
 
        Session().commit()
 

	
 
        ## create GROUP
 
        group_name = 'reg_sometest_%s' % self.REPO_TYPE
 
        group_name = u'reg_sometest_%s' % self.REPO_TYPE
 
        gr = RepoGroupModel().create(group_name=group_name,
 
                                     group_description='test',
 
                                     group_description=u'test',
 
                                     owner=TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 

	
 
        group_name_allowed = 'reg_sometest_allowed_%s' % self.REPO_TYPE
 
        group_name_allowed = u'reg_sometest_allowed_%s' % self.REPO_TYPE
 
        gr_allowed = RepoGroupModel().create(group_name=group_name_allowed,
 
                                     group_description='test',
 
                                     group_description=u'test',
 
                                     owner=TEST_USER_REGULAR_LOGIN)
 
        Session().commit()
 

	
 
        repo_name = 'ingroup'
 
        repo_name = u'ingroup'
 
        repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
 
        description = 'description for newly created repo'
 
        description = u'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                repo_group=gr.group_id,
 
                                                _authentication_token=authentication_token))
 

	
 
        response.mustcontain('Invalid value')
 

	
 
        # user is allowed to create in this group
 
        repo_name = 'ingroup'
 
        repo_name = u'ingroup'
 
        repo_name_full = RepoGroup.url_sep().join([group_name_allowed, repo_name])
 
        description = 'description for newly created repo'
 
        description = u'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                repo_group=gr_allowed.group_id,
 
                                                _authentication_token=authentication_token))
 

	
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
 
        self.assertEqual(response.json, {u'result': True})
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name_full, repo_name_full))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name_full).one()
 
        new_repo_id = new_repo.repo_id
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_full)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        self.assertEqual(len(inherited_perms), 1)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(TESTS_TMP_PATH, repo_name_full)))
 
        except vcs.exceptions.VCSError:
 
            RepoGroupModel().delete(group_name)
 
            Session().commit()
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
        RepoModel().delete(repo_name_full)
 
        RepoGroupModel().delete(group_name)
 
        RepoGroupModel().delete(group_name_allowed)
 
        Session().commit()
 

	
 
    def test_create_in_group_inherit_permissions(self):
 
        self.log_user()
 

	
 
        ## create GROUP
 
        group_name = 'sometest_%s' % self.REPO_TYPE
 
        group_name = u'sometest_%s' % self.REPO_TYPE
 
        gr = RepoGroupModel().create(group_name=group_name,
 
                                     group_description='test',
 
                                     group_description=u'test',
 
                                     owner=TEST_USER_ADMIN_LOGIN)
 
        perm = Permission.get_by_key('repository.write')
 
        RepoGroupModel().grant_user_permission(gr, TEST_USER_REGULAR_LOGIN, perm)
 

	
 
        ## add repo permissions
 
        Session().commit()
 

	
 
        repo_name = 'ingroup_inherited_%s' % self.REPO_TYPE
 
        repo_name = u'ingroup_inherited_%s' % self.REPO_TYPE
 
        repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
 
        description = 'description for newly created repo'
 
        description = u'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                repo_group=gr.group_id,
 
                                                repo_copy_permissions=True,
 
                                                _authentication_token=self.authentication_token()))
 

	
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name_full, repo_name_full))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name_full).one()
 
        new_repo_id = new_repo.repo_id
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_full)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(TESTS_TMP_PATH, repo_name_full)))
 
        except vcs.exceptions.VCSError:
 
            RepoGroupModel().delete(group_name)
 
            Session().commit()
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
        #check if inherited permissiona are applied
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        self.assertEqual(len(inherited_perms), 2)
 

	
 
        self.assertTrue(TEST_USER_REGULAR_LOGIN in [x.user.username
 
                                                    for x in inherited_perms])
 
        self.assertTrue('repository.write' in [x.permission.permission_name
 
                                               for x in inherited_perms])
 

	
 
        RepoModel().delete(repo_name_full)
 
        RepoGroupModel().delete(group_name)
 
        Session().commit()
 

	
 
    def test_create_remote_repo_wrong_clone_uri(self):
 
        self.log_user()
 
        repo_name = self.NEW_REPO
 
        description = 'description for newly created repo'
 
        description = u'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                clone_uri='http://127.0.0.1/repo',
 
                                                _authentication_token=self.authentication_token()))
 
        response.mustcontain('Invalid repository URL')
 

	
 

	
 
    def test_create_remote_repo_wrong_clone_uri_hg_svn(self):
 
        self.log_user()
 
        repo_name = self.NEW_REPO
 
        description = 'description for newly created repo'
 
        description = u'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                clone_uri='svn+http://127.0.0.1/repo',
 
                                                _authentication_token=self.authentication_token()))
 
        response.mustcontain('Invalid repository URL')
 

	
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        repo_name = 'vcs_test_new_to_delete_%s' % self.REPO_TYPE
 
        description = 'description for newly created repo'
 
        repo_name = u'vcs_test_new_to_delete_%s' % self.REPO_TYPE
 
        description = u'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_name=repo_name,
 
                                                repo_description=description,
 
                                                _authentication_token=self.authentication_token()))
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name))
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name, repo_name))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name))
 
        response.mustcontain(repo_name)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(TESTS_TMP_PATH, repo_name)))
 
        except vcs.exceptions.VCSError:
 
            self.fail('no repo %s in filesystem' % repo_name)
 

	
 
        response = self.app.post(url('delete_repo', repo_name=repo_name),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 

	
 
        self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name))
 

	
 
        response.follow()
 

	
 
        #check if repo was deleted from db
 
        deleted_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name).scalar()
 

	
 
        self.assertEqual(deleted_repo, None)
 

	
 
        self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
 
                                  False)
 

	
 
    def test_delete_non_ascii(self):
 
        self.log_user()
 
        non_ascii = "ąęł"
 
@@ -451,169 +451,169 @@ class _BaseTest(object):
 
        self.assertTrue(len(perm), 1)
 
        self.assertEqual(perm[0].permission.permission_name, 'repository.read')
 
        self.assertEqual(Repository.get_by_repo_name(self.REPO).private, False)
 

	
 
        response = self.app.put(url('put_repo', repo_name=self.REPO),
 
                        fixture._get_repo_create_params(repo_private=1,
 
                                                repo_name=self.REPO,
 
                                                repo_type=self.REPO_TYPE,
 
                                                user=TEST_USER_ADMIN_LOGIN,
 
                                                _authentication_token=self.authentication_token()))
 
        self.checkSessionFlash(response,
 
                               msg='Repository %s updated successfully' % (self.REPO))
 
        self.assertEqual(Repository.get_by_repo_name(self.REPO).private, True)
 

	
 
        #now the repo default permission should be None
 
        perm = _get_permission_for_user(user='default', repo=self.REPO)
 
        self.assertTrue(len(perm), 1)
 
        self.assertEqual(perm[0].permission.permission_name, 'repository.none')
 

	
 
        response = self.app.put(url('put_repo', repo_name=self.REPO),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=self.REPO,
 
                                                repo_type=self.REPO_TYPE,
 
                                                user=TEST_USER_ADMIN_LOGIN,
 
                                                _authentication_token=self.authentication_token()))
 
        self.checkSessionFlash(response,
 
                               msg='Repository %s updated successfully' % (self.REPO))
 
        self.assertEqual(Repository.get_by_repo_name(self.REPO).private, False)
 

	
 
        #we turn off private now the repo default permission should stay None
 
        perm = _get_permission_for_user(user='default', repo=self.REPO)
 
        self.assertTrue(len(perm), 1)
 
        self.assertEqual(perm[0].permission.permission_name, 'repository.none')
 

	
 
        #update this permission back
 
        perm[0].permission = Permission.get_by_key('repository.read')
 
        Session().add(perm[0])
 
        Session().commit()
 

	
 
    def test_set_repo_fork_has_no_self_id(self):
 
        self.log_user()
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        response = self.app.get(url('edit_repo_advanced', repo_name=self.REPO))
 
        opt = """<option value="%s">%s</option>""" % (repo.repo_id, self.REPO)
 
        response.mustcontain(no=[opt])
 

	
 
    def test_set_fork_of_other_repo(self):
 
        self.log_user()
 
        other_repo = 'other_%s' % self.REPO_TYPE
 
        other_repo = u'other_%s' % self.REPO_TYPE
 
        fixture.create_repo(other_repo, repo_type=self.REPO_TYPE)
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        repo2 = Repository.get_by_repo_name(other_repo)
 
        response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO),
 
                                params=dict(id_fork_of=repo2.repo_id, _authentication_token=self.authentication_token()))
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        repo2 = Repository.get_by_repo_name(other_repo)
 
        self.checkSessionFlash(response,
 
            'Marked repository %s as fork of %s' % (repo.repo_name, repo2.repo_name))
 

	
 
        assert repo.fork == repo2
 
        response = response.follow()
 
        # check if given repo is selected
 

	
 
        opt = """<option value="%s" selected="selected">%s</option>""" % (
 
                    repo2.repo_id, repo2.repo_name)
 
        response.mustcontain(opt)
 

	
 
        fixture.destroy_repo(other_repo, forks='detach')
 

	
 
    def test_set_fork_of_other_type_repo(self):
 
        self.log_user()
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
 
        response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO),
 
                                params=dict(id_fork_of=repo2.repo_id, _authentication_token=self.authentication_token()))
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
 
        self.checkSessionFlash(response,
 
            'Cannot set repository as fork of repository with other type')
 

	
 
    def test_set_fork_of_none(self):
 
        self.log_user()
 
        ## mark it as None
 
        response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO),
 
                                params=dict(id_fork_of=None, _authentication_token=self.authentication_token()))
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
 
        self.checkSessionFlash(response,
 
                               'Marked repository %s as fork of %s'
 
                               % (repo.repo_name, "Nothing"))
 
        assert repo.fork is None
 

	
 
    def test_set_fork_of_same_repo(self):
 
        self.log_user()
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO),
 
                                params=dict(id_fork_of=repo.repo_id, _authentication_token=self.authentication_token()))
 
        self.checkSessionFlash(response,
 
                               'An error occurred during this operation')
 

	
 
    def test_create_on_top_level_without_permissions(self):
 
        usr = self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
 
        # revoke
 
        user_model = UserModel()
 
        # disable fork and create on default user
 
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
 
        user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
 
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
 
        user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
 

	
 
        # disable on regular user
 
        user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
 
        user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
 
        user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
 
        user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
 
        Session().commit()
 

	
 

	
 
        user = User.get(usr['user_id'])
 

	
 
        repo_name = self.NEW_REPO+'no_perms'
 
        repo_name = self.NEW_REPO + u'no_perms'
 
        description = 'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                _authentication_token=self.authentication_token()))
 

	
 
        response.mustcontain('<span class="error-message">Invalid value</span>')
 

	
 
        RepoModel().delete(repo_name)
 
        Session().commit()
 

	
 
    @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
 
    def test_create_repo_when_filesystem_op_fails(self):
 
        self.log_user()
 
        repo_name = self.NEW_REPO
 
        description = 'description for newly created repo'
 

	
 
        response = self.app.post(url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                _authentication_token=self.authentication_token()))
 

	
 
        self.checkSessionFlash(response,
 
                               'Error creating repository %s' % repo_name)
 
        # repo must not be in db
 
        repo = Repository.get_by_repo_name(repo_name)
 
        self.assertEqual(repo, None)
 

	
 
        # repo must not be in filesystem !
 
        self.assertFalse(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)))
 

	
 
class TestAdminReposControllerGIT(TestController, _BaseTest):
 
    REPO = GIT_REPO
 
    REPO_TYPE = 'git'
 
    NEW_REPO = NEW_GIT_REPO
 
    OTHER_TYPE_REPO = HG_REPO
 
    OTHER_TYPE = 'hg'
 

	
 

	
 
class TestAdminReposControllerHG(TestController, _BaseTest):
 
    REPO = HG_REPO
 
    REPO_TYPE = 'hg'
 
    NEW_REPO = NEW_HG_REPO
 
    OTHER_TYPE_REPO = GIT_REPO
kallithea/tests/functional/test_admin_user_groups.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
from kallithea.tests import *
 
from kallithea.model.db import UserGroup, UserGroupToPerm, Permission
 
from kallithea.model.meta import Session
 

	
 
TEST_USER_GROUP = 'admins_test'
 
TEST_USER_GROUP = u'admins_test'
 

	
 

	
 
class TestAdminUsersGroupsController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('users_groups'))
 
        # Test response...
 

	
 
    def test_create(self):
 
        self.log_user()
 
        users_group_name = TEST_USER_GROUP
 
        response = self.app.post(url('users_groups'),
 
                                 {'users_group_name': users_group_name,
 
                                  'user_group_description': 'DESC',
 
                                  'user_group_description': u'DESC',
 
                                  'active': True,
 
                                  '_authentication_token': self.authentication_token()})
 
        response.follow()
 

	
 
        self.checkSessionFlash(response,
 
                               'Created user group <a href="/_admin/user_groups/')
 
        self.checkSessionFlash(response,
 
                               '/edit">%s</a>' % TEST_USER_GROUP)
 

	
 
    def test_new(self):
 
        response = self.app.get(url('new_users_group'))
 

	
 
    def test_update(self):
 
        response = self.app.put(url('users_group', id=1), status=403)
 

	
 
    def test_update_browser_fakeout(self):
 
        response = self.app.post(url('users_group', id=1),
 
                                 params=dict(_method='put', _authentication_token=self.authentication_token()))
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        users_group_name = TEST_USER_GROUP + 'another'
 
        response = self.app.post(url('users_groups'),
 
                                 {'users_group_name':users_group_name,
 
                                  'user_group_description': 'DESC',
 
                                  'user_group_description': u'DESC',
 
                                  'active': True,
 
                                  '_authentication_token': self.authentication_token()})
 
        response.follow()
 

	
 
        self.checkSessionFlash(response,
 
                               'Created user group ')
 

	
 
        gr = Session().query(UserGroup) \
 
            .filter(UserGroup.users_group_name == users_group_name).one()
 

	
 
        response = self.app.post(url('users_group', id=gr.users_group_id),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 

	
 
        gr = Session().query(UserGroup) \
 
            .filter(UserGroup.users_group_name == users_group_name).scalar()
 

	
 
        self.assertEqual(gr, None)
 

	
 
    def test_default_perms_enable_repository_read_on_group(self):
 
        self.log_user()
 
        users_group_name = TEST_USER_GROUP + 'another2'
 
        response = self.app.post(url('users_groups'),
 
                                 {'users_group_name': users_group_name,
 
                                  'user_group_description': 'DESC',
 
                                  'user_group_description': u'DESC',
 
                                  'active': True,
 
                                  '_authentication_token': self.authentication_token()})
 
        response.follow()
 

	
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        self.checkSessionFlash(response,
 
                               'Created user group ')
 
        ## ENABLE REPO CREATE ON A GROUP
 
        response = self.app.put(url('edit_user_group_default_perms',
 
                                    id=ug.users_group_id),
 
                                 {'create_repo_perm': True,
 
                                  '_authentication_token': self.authentication_token()})
 
        response.follow()
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        p = Permission.get_by_key('hg.create.repository')
 
        p2 = Permission.get_by_key('hg.usergroup.create.false')
 
        p3 = Permission.get_by_key('hg.fork.none')
 
        # check if user has this perms, they should be here since
 
        # defaults are on
 
        perms = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group == ug).all()
 

	
 
        self.assertEqual(
 
            sorted([[x.users_group_id, x.permission_id, ] for x in perms]),
 
            sorted([[ug.users_group_id, p.permission_id],
 
                    [ug.users_group_id, p2.permission_id],
 
                    [ug.users_group_id, p3.permission_id]]))
 

	
 
        ## DISABLE REPO CREATE ON A GROUP
 
        response = self.app.put(
 
            url('edit_user_group_default_perms', id=ug.users_group_id),
 
            params={'_authentication_token': self.authentication_token()})
 

	
 
        response.follow()
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        p = Permission.get_by_key('hg.create.none')
 
        p2 = Permission.get_by_key('hg.usergroup.create.false')
 
        p3 = Permission.get_by_key('hg.fork.none')
 

	
 
        # check if user has this perms, they should be here since
 
        # defaults are on
 
        perms = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group == ug).all()
 

	
 
        self.assertEqual(
 
            sorted([[x.users_group_id, x.permission_id, ] for x in perms]),
 
            sorted([[ug.users_group_id, p.permission_id],
 
                    [ug.users_group_id, p2.permission_id],
 
                    [ug.users_group_id, p3.permission_id]]))
 

	
 
        # DELETE !
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        ugid = ug.users_group_id
 
        response = self.app.post(url('users_group', id=ug.users_group_id),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
        response = response.follow()
 
        gr = Session().query(UserGroup) \
 
            .filter(UserGroup.users_group_name == users_group_name).scalar()
 

	
 
        self.assertEqual(gr, None)
 
        p = Permission.get_by_key('hg.create.repository')
 
        perms = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group_id == ugid).all()
 
        perms = [[x.users_group_id,
 
                  x.permission_id, ] for x in perms]
 
        self.assertEqual(perms, [])
 

	
 
    def test_default_perms_enable_repository_fork_on_group(self):
 
        self.log_user()
 
        users_group_name = TEST_USER_GROUP + 'another2'
 
        response = self.app.post(url('users_groups'),
 
                                 {'users_group_name': users_group_name,
 
                                  'user_group_description': 'DESC',
 
                                  'user_group_description': u'DESC',
 
                                  'active': True,
 
                                  '_authentication_token': self.authentication_token()})
 
        response.follow()
 

	
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        self.checkSessionFlash(response,
 
                               'Created user group ')
 
        ## ENABLE REPO CREATE ON A GROUP
 
        response = self.app.put(url('edit_user_group_default_perms',
 
                                    id=ug.users_group_id),
 
                                {'fork_repo_perm': True, '_authentication_token': self.authentication_token()})
 

	
 
        response.follow()
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        p = Permission.get_by_key('hg.create.none')
 
        p2 = Permission.get_by_key('hg.usergroup.create.false')
 
        p3 = Permission.get_by_key('hg.fork.repository')
 
        # check if user has this perms, they should be here since
 
        # defaults are on
 
        perms = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group == ug).all()
 

	
 
        self.assertEqual(
 
            sorted([[x.users_group_id, x.permission_id, ] for x in perms]),
 
            sorted([[ug.users_group_id, p.permission_id],
 
                    [ug.users_group_id, p2.permission_id],
 
                    [ug.users_group_id, p3.permission_id]]))
 

	
 
        ## DISABLE REPO CREATE ON A GROUP
 
        response = self.app.put(url('edit_user_group_default_perms', id=ug.users_group_id),
 
            params={'_authentication_token': self.authentication_token()})
 

	
 
        response.follow()
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        p = Permission.get_by_key('hg.create.none')
 
        p2 = Permission.get_by_key('hg.usergroup.create.false')
 
        p3 = Permission.get_by_key('hg.fork.none')
 
        # check if user has this perms, they should be here since
 
        # defaults are on
 
        perms = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group == ug).all()
 

	
 
        self.assertEqual(
 
            sorted([[x.users_group_id, x.permission_id, ] for x in perms]),
 
            sorted([[ug.users_group_id, p.permission_id],
 
                    [ug.users_group_id, p2.permission_id],
 
                    [ug.users_group_id, p3.permission_id]]))
 

	
kallithea/tests/functional/test_admin_users.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
from sqlalchemy.orm.exc import NoResultFound
 

	
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.model.db import User, Permission, UserIpMap, UserApiKeys
 
from kallithea.lib.auth import check_password
 
from kallithea.model.user import UserModel
 
from kallithea.model import validators
 
from kallithea.lib import helpers as h
 
from kallithea.model.meta import Session
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestAdminUsersController(TestController):
 
    test_user_1 = 'testme'
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        if User.get_by_username(cls.test_user_1):
 
            UserModel().delete(cls.test_user_1)
 
            Session().commit()
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('users'))
 
        # Test response...
 

	
 
    def test_create(self):
 
        self.log_user()
 
        username = 'newtestuser'
 
        password = 'test12'
 
        password_confirmation = password
 
        name = 'name'
 
        lastname = 'lastname'
 
        name = u'name'
 
        lastname = u'lastname'
 
        email = 'mail@example.com'
 

	
 
        response = self.app.post(url('users'),
 
            {'username': username,
 
             'password': password,
 
             'password_confirmation': password_confirmation,
 
             'firstname': name,
 
             'active': True,
 
             'lastname': lastname,
 
             'extern_name': 'internal',
 
             'extern_type': 'internal',
 
             'email': email,
 
             '_authentication_token': self.authentication_token()})
 

	
 
        self.checkSessionFlash(response, '''Created user <a href="/_admin/users/''')
 
        self.checkSessionFlash(response, '''/edit">%s</a>''' % (username))
 

	
 
        new_user = Session().query(User). \
 
            filter(User.username == username).one()
 

	
 
        self.assertEqual(new_user.username, username)
 
        self.assertEqual(check_password(password, new_user.password), True)
 
        self.assertEqual(new_user.name, name)
 
        self.assertEqual(new_user.lastname, lastname)
 
        self.assertEqual(new_user.email, email)
 

	
 
        response.follow()
 
        response = response.follow()
 
        response.mustcontain("""newtestuser""")
 

	
 
    def test_create_err(self):
 
        self.log_user()
 
        username = 'new_user'
 
        password = ''
 
        name = 'name'
 
        lastname = 'lastname'
 
        name = u'name'
 
        lastname = u'lastname'
 
        email = 'errmail.example.com'
 

	
 
        response = self.app.post(url('users'), {'username': username,
 
                                               'password': password,
 
                                               'name': name,
 
                                               'active': False,
 
                                               'lastname': lastname,
 
                                               'email': email,
 
                                               '_authentication_token': self.authentication_token()})
 

	
 
        msg = validators.ValidUsername(False, {})._messages['system_invalid_username']
 
        msg = h.html_escape(msg % {'username': 'new_user'})
 
        response.mustcontain("""<span class="error-message">%s</span>""" % msg)
 
        response.mustcontain("""<span class="error-message">Please enter a value</span>""")
 
        response.mustcontain("""<span class="error-message">An email address must contain a single @</span>""")
 

	
 
        def get_user():
 
            Session().query(User).filter(User.username == username).one()
 

	
 
        self.assertRaises(NoResultFound, get_user), 'found user in database'
 

	
 
    def test_new(self):
 
        self.log_user()
 
        response = self.app.get(url('new_user'))
 

	
 
    @parameterized.expand(
 
        [('firstname', {'firstname': 'new_username'}),
 
         ('lastname', {'lastname': 'new_username'}),
 
         ('admin', {'admin': True}),
 
         ('admin', {'admin': False}),
 
         ('extern_type', {'extern_type': 'ldap'}),
 
         ('extern_type', {'extern_type': None}),
 
         ('extern_name', {'extern_name': 'test'}),
 
         ('extern_name', {'extern_name': None}),
 
         ('active', {'active': False}),
 
         ('active', {'active': True}),
 
         ('email', {'email': 'someemail@example.com'}),
 
        # ('new_password', {'new_password': 'foobar123',
 
        #                   'password_confirmation': 'foobar123'})
 
        ])
 
    def test_update(self, name, attrs):
 
        self.log_user()
 
        usr = fixture.create_user(self.test_user_1, password='qweqwe',
 
                                  email='testme@example.com',
 
                                  extern_type='internal',
 
                                  extern_name=self.test_user_1,
 
                                  skip_if_exists=True)
 
        Session().commit()
 
        params = usr.get_api_data(True)
 
        params.update({'password_confirmation': ''})
 
        params.update({'new_password': ''})
 
        params.update(attrs)
 
        if name == 'email':
 
            params['emails'] = [attrs['email']]
 
        if name == 'extern_type':
 
            #cannot update this via form, expected value is original one
 
            params['extern_type'] = "internal"
 
        if name == 'extern_name':
 
            #cannot update this via form, expected value is original one
 
            params['extern_name'] = self.test_user_1
 
            # special case since this user is not
 
                                          # logged in yet his data is not filled
 
                                          # so we use creation data
 

	
 
        params.update({'_authentication_token': self.authentication_token()})
 
        response = self.app.put(url('user', id=usr.user_id), params)
 
        self.checkSessionFlash(response, 'User updated successfully')
 
        params.pop('_authentication_token')
 

	
 
        updated_user = User.get_by_username(self.test_user_1)
 
        updated_params = updated_user.get_api_data(True)
 
        updated_params.update({'password_confirmation': ''})
 
        updated_params.update({'new_password': ''})
 

	
 
        self.assertEqual(params, updated_params)
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        username = 'newtestuserdeleteme'
 

	
 
        fixture.create_user(name=username)
 

	
 
        new_user = Session().query(User) \
 
            .filter(User.username == username).one()
 
        response = self.app.post(url('user', id=new_user.user_id),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 

	
 
        self.checkSessionFlash(response, 'Successfully deleted user')
 

	
 
    def test_delete_repo_err(self):
 
        self.log_user()
 
        username = 'repoerr'
 
        reponame = 'repoerr_fail'
 
        reponame = u'repoerr_fail'
 

	
 
        fixture.create_user(name=username)
 
        fixture.create_repo(name=reponame, cur_user=username)
 

	
 
        new_user = Session().query(User) \
 
            .filter(User.username == username).one()
 
        response = self.app.post(url('user', id=new_user.user_id),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'User "%s" still '
 
                               'owns 1 repositories and cannot be removed. '
 
                               'Switch owners or remove those repositories: '
 
                               '%s' % (username, reponame))
 

	
 
        response = self.app.post(url('delete_repo', repo_name=reponame),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'Deleted repository %s' % reponame)
 

	
 
        response = self.app.post(url('user', id=new_user.user_id),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'Successfully deleted user')
 

	
 
    def test_delete_repo_group_err(self):
 
        self.log_user()
 
        username = 'repogrouperr'
 
        groupname = 'repogroup_fail'
 
        groupname = u'repogroup_fail'
 

	
 
        fixture.create_user(name=username)
 
        fixture.create_repo_group(name=groupname, cur_user=username)
 

	
 
        new_user = Session().query(User) \
 
            .filter(User.username == username).one()
 
        response = self.app.post(url('user', id=new_user.user_id),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'User "%s" still '
 
                               'owns 1 repository groups and cannot be removed. '
 
                               'Switch owners or remove those repository groups: '
 
                               '%s' % (username, groupname))
 

	
 
        # Relevant _if_ the user deletion succeeded to make sure we can render groups without owner
 
        # rg = RepoGroup.get_by_group_name(group_name=groupname)
 
        # response = self.app.get(url('repos_groups', id=rg.group_id))
 

	
 
        response = self.app.post(url('delete_repo_group', group_name=groupname),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'Removed repository group %s' % groupname)
 

	
 
        response = self.app.post(url('user', id=new_user.user_id),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'Successfully deleted user')
 

	
 
    def test_delete_user_group_err(self):
 
        self.log_user()
 
        username = 'usergrouperr'
 
        groupname = 'usergroup_fail'
 
        groupname = u'usergroup_fail'
 

	
 
        fixture.create_user(name=username)
 
        ug = fixture.create_user_group(name=groupname, cur_user=username)
 

	
 
        new_user = Session().query(User) \
 
            .filter(User.username == username).one()
 
        response = self.app.post(url('user', id=new_user.user_id),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'User "%s" still '
 
                               'owns 1 user groups and cannot be removed. '
 
                               'Switch owners or remove those user groups: '
 
                               '%s' % (username, groupname))
 

	
 
        # TODO: why do this fail?
 
        #response = self.app.delete(url('delete_users_group', id=groupname))
 
        #self.checkSessionFlash(response, 'Removed user group %s' % groupname)
 

	
 
        fixture.destroy_user_group(ug.users_group_id)
 

	
 
        response = self.app.post(url('user', id=new_user.user_id),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'Successfully deleted user')
 

	
 
    def test_show(self):
 
        response = self.app.get(url('user', id=1))
 

	
 
    def test_edit(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        response = self.app.get(url('edit_user', id=user.user_id))
 

	
 
    def test_add_perm_create_repo(self):
 
        self.log_user()
 
        perm_none = Permission.get_by_key('hg.create.none')
 
        perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
        user = UserModel().create_or_update(username='dummy', password='qwe',
 
                                            email='dummy', firstname='a',
 
                                            lastname='b')
 
                                            email='dummy', firstname=u'a',
 
                                            lastname=u'b')
 
        Session().commit()
 
        uid = user.user_id
 

	
 
        try:
 
            #User should have None permission on creation repository
 
            self.assertEqual(UserModel().has_perm(user, perm_none), False)
 
            self.assertEqual(UserModel().has_perm(user, perm_create), False)
 

	
 
            response = self.app.post(url('edit_user_perms', id=uid),
 
                                     params=dict(_method='put',
 
                                                 create_repo_perm=True,
 
                                                 _authentication_token=self.authentication_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            #User should have None permission on creation repository
 
            self.assertEqual(UserModel().has_perm(uid, perm_none), False)
 
            self.assertEqual(UserModel().has_perm(uid, perm_create), True)
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_revoke_perm_create_repo(self):
 
        self.log_user()
 
        perm_none = Permission.get_by_key('hg.create.none')
 
        perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
        user = UserModel().create_or_update(username='dummy', password='qwe',
 
                                            email='dummy', firstname='a',
 
                                            lastname='b')
 
                                            email='dummy', firstname=u'a',
 
                                            lastname=u'b')
 
        Session().commit()
 
        uid = user.user_id
 

	
 
        try:
 
            #User should have None permission on creation repository
 
            self.assertEqual(UserModel().has_perm(user, perm_none), False)
 
            self.assertEqual(UserModel().has_perm(user, perm_create), False)
 

	
 
            response = self.app.post(url('edit_user_perms', id=uid),
 
                                     params=dict(_method='put', _authentication_token=self.authentication_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            #User should have None permission on creation repository
 
            self.assertEqual(UserModel().has_perm(uid, perm_none), True)
 
            self.assertEqual(UserModel().has_perm(uid, perm_create), False)
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_add_perm_fork_repo(self):
 
        self.log_user()
 
        perm_none = Permission.get_by_key('hg.fork.none')
 
        perm_fork = Permission.get_by_key('hg.fork.repository')
 

	
 
        user = UserModel().create_or_update(username='dummy', password='qwe',
 
                                            email='dummy', firstname='a',
 
                                            lastname='b')
 
                                            email='dummy', firstname=u'a',
 
                                            lastname=u'b')
 
        Session().commit()
 
        uid = user.user_id
 

	
 
        try:
 
            #User should have None permission on creation repository
 
            self.assertEqual(UserModel().has_perm(user, perm_none), False)
 
            self.assertEqual(UserModel().has_perm(user, perm_fork), False)
 

	
 
            response = self.app.post(url('edit_user_perms', id=uid),
 
                                     params=dict(_method='put',
 
                                                 create_repo_perm=True,
 
                                                 _authentication_token=self.authentication_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            #User should have None permission on creation repository
 
            self.assertEqual(UserModel().has_perm(uid, perm_none), False)
 
            self.assertEqual(UserModel().has_perm(uid, perm_create), True)
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_revoke_perm_fork_repo(self):
 
        self.log_user()
 
        perm_none = Permission.get_by_key('hg.fork.none')
 
        perm_fork = Permission.get_by_key('hg.fork.repository')
 

	
 
        user = UserModel().create_or_update(username='dummy', password='qwe',
 
                                            email='dummy', firstname='a',
 
                                            lastname='b')
 
                                            email='dummy', firstname=u'a',
 
                                            lastname=u'b')
 
        Session().commit()
 
        uid = user.user_id
 

	
 
        try:
 
            #User should have None permission on creation repository
 
            self.assertEqual(UserModel().has_perm(user, perm_none), False)
 
            self.assertEqual(UserModel().has_perm(user, perm_fork), False)
 

	
 
            response = self.app.post(url('edit_user_perms', id=uid),
 
                                     params=dict(_method='put', _authentication_token=self.authentication_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            #User should have None permission on creation repository
 
            self.assertEqual(UserModel().has_perm(uid, perm_none), True)
 
            self.assertEqual(UserModel().has_perm(uid, perm_create), False)
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_ips(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        response = self.app.get(url('edit_user_ips', id=user.user_id))
 
        response.mustcontain('All IP addresses are allowed')
 

	
 
    @parameterized.expand([
 
        ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False),
 
        ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False),
 
        ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False),
 
        ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False),
 
        ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True),
 
        ('127_bad_ip', 'foobar', 'foobar', True),
 
    ])
 
    def test_add_ip(self, test_name, ip, ip_range, failure):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.put(url('edit_user_ips', id=user_id),
 
                                params=dict(new_ip=ip, _authentication_token=self.authentication_token()))
 

	
 
        if failure:
 
            self.checkSessionFlash(response, 'Please enter a valid IPv4 or IPv6 address')
 
            response = self.app.get(url('edit_user_ips', id=user_id))
 
            response.mustcontain(no=[ip])
 
            response.mustcontain(no=[ip_range])
kallithea/tests/functional/test_compare.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
from kallithea.tests import *
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.meta import Session
 
from kallithea.tests.fixture import Fixture
 

	
 
fixture = Fixture()
 

	
 
def _commit_div(sha, msg):
 
    return """<div id="C-%s" class="message">%s</div>""" % (sha, msg)
 

	
 

	
 
class TestCompareController(TestController):
 

	
 
    def setUp(self):
 
        self.r1_id = None
 
        self.r2_id = None
 

	
 
    def tearDown(self):
 
        if self.r2_id:
 
            RepoModel().delete(self.r2_id)
 
        if self.r1_id:
 
            RepoModel().delete(self.r1_id)
 
        Session().commit()
 
        Session.remove()
 

	
 
    def test_compare_forks_on_branch_extra_commits_hg(self):
 
        self.log_user()
 
        repo1 = fixture.create_repo('one', repo_type='hg',
 
        repo1 = fixture.create_repo(u'one', repo_type='hg',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN)
 
        self.r1_id = repo1.repo_id
 
        #commit something !
 
        cs0 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        #fork this repo
 
        repo2 = fixture.create_fork('one', 'one-fork')
 
        repo2 = fixture.create_fork(u'one', u'one-fork')
 
        self.r2_id = repo2.repo_id
 

	
 
        #add two extra commit into fork
 
        cs1 = fixture.commit_change(repo2.repo_name, filename='file1',
 
                content='line1\nline2\n', message='commit2', vcs_type='hg',
 
                parent=cs0)
 

	
 
        cs2 = fixture.commit_change(repo2.repo_name, filename='file1',
 
                content='line1\nline2\nline3\n', message='commit3',
 
                vcs_type='hg', parent=cs1)
 

	
 
        rev1 = 'default'
 
        rev2 = 'default'
 

	
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=repo1.repo_name,
 
                                    org_ref_type="branch",
 
                                    org_ref_name=rev2,
 
                                    other_repo=repo2.repo_name,
 
                                    other_ref_type="branch",
 
                                    other_ref_name=rev1,
 
                                    merge='1',))
 

	
 
        response.mustcontain('%s@%s' % (repo1.repo_name, rev2))
 
        response.mustcontain('%s@%s' % (repo2.repo_name, rev1))
 
        response.mustcontain("""Showing 2 commits""")
 
        response.mustcontain("""1 file changed with 2 insertions and 0 deletions""")
 

	
 
        response.mustcontain(_commit_div(cs1.raw_id, 'commit2'))
 
        response.mustcontain(_commit_div(cs2.raw_id, 'commit3'))
 

	
 
        response.mustcontain("""<a href="/%s/changeset/%s">r1:%s</a>""" % (repo2.repo_name, cs1.raw_id, cs1.short_id))
 
        response.mustcontain("""<a href="/%s/changeset/%s">r2:%s</a>""" % (repo2.repo_name, cs2.raw_id, cs2.short_id))
 
        ## files
 
        response.mustcontain("""<a href="#C--826e8142e6ba">file1</a>""")
 
        #swap
 
        response.mustcontain("""<a class="btn btn-small" href="/%s/compare/branch@%s...branch@%s?other_repo=%s&amp;merge=True"><i class="icon-arrows-cw"></i> Swap</a>""" % (repo2.repo_name, rev1, rev2, repo1.repo_name))
 

	
 
    def test_compare_forks_on_branch_extra_commits_git(self):
 
        self.log_user()
 
        repo1 = fixture.create_repo('one-git', repo_type='git',
 
        repo1 = fixture.create_repo(u'one-git', repo_type='git',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN)
 
        self.r1_id = repo1.repo_id
 
        #commit something !
 
        cs0 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='git',
 
                parent=None, newfile=True)
 

	
 
        #fork this repo
 
        repo2 = fixture.create_fork('one-git', 'one-git-fork')
 
        repo2 = fixture.create_fork(u'one-git', u'one-git-fork')
 
        self.r2_id = repo2.repo_id
 

	
 
        #add two extra commit into fork
 
        cs1 = fixture.commit_change(repo2.repo_name, filename='file1',
 
                content='line1\nline2\n', message='commit2', vcs_type='git',
 
                parent=cs0)
 

	
 
        cs2 = fixture.commit_change(repo2.repo_name, filename='file1',
 
                content='line1\nline2\nline3\n', message='commit3',
 
                vcs_type='git', parent=cs1)
 

	
 
        rev1 = 'master'
 
        rev2 = 'master'
 

	
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=repo1.repo_name,
 
                                    org_ref_type="branch",
 
                                    org_ref_name=rev2,
 
                                    other_repo=repo2.repo_name,
 
                                    other_ref_type="branch",
 
                                    other_ref_name=rev1,
 
                                    merge='1',))
 

	
 
        response.mustcontain('%s@%s' % (repo1.repo_name, rev2))
 
        response.mustcontain('%s@%s' % (repo2.repo_name, rev1))
 
        response.mustcontain("""Showing 2 commits""")
 
        response.mustcontain("""1 file changed with 2 insertions and 0 deletions""")
 

	
 
        response.mustcontain(_commit_div(cs1.raw_id, 'commit2'))
 
        response.mustcontain(_commit_div(cs2.raw_id, 'commit3'))
 

	
 
        response.mustcontain("""<a href="/%s/changeset/%s">r1:%s</a>""" % (repo2.repo_name, cs1.raw_id, cs1.short_id))
 
        response.mustcontain("""<a href="/%s/changeset/%s">r2:%s</a>""" % (repo2.repo_name, cs2.raw_id, cs2.short_id))
 
        ## files
 
        response.mustcontain("""<a href="#C--826e8142e6ba">file1</a>""")
 
        #swap
 
        response.mustcontain("""<a class="btn btn-small" href="/%s/compare/branch@%s...branch@%s?other_repo=%s&amp;merge=True"><i class="icon-arrows-cw"></i> Swap</a>""" % (repo2.repo_name, rev1, rev2, repo1.repo_name))
 

	
 
    def test_compare_forks_on_branch_extra_commits_origin_has_incomming_hg(self):
 
        self.log_user()
 

	
 
        repo1 = fixture.create_repo('one', repo_type='hg',
 
        repo1 = fixture.create_repo(u'one', repo_type='hg',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN)
 

	
 
        self.r1_id = repo1.repo_id
 

	
 
        #commit something !
 
        cs0 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        #fork this repo
 
        repo2 = fixture.create_fork('one', 'one-fork')
 
        repo2 = fixture.create_fork(u'one', u'one-fork')
 
        self.r2_id = repo2.repo_id
 

	
 
        #now commit something to origin repo
 
        cs1_prim = fixture.commit_change(repo1.repo_name, filename='file2',
 
                content='line1file2\n', message='commit2', vcs_type='hg',
 
                parent=cs0, newfile=True)
 

	
 
        #add two extra commit into fork
 
        cs1 = fixture.commit_change(repo2.repo_name, filename='file1',
 
                content='line1\nline2\n', message='commit2', vcs_type='hg',
 
                parent=cs0)
 

	
 
        cs2 = fixture.commit_change(repo2.repo_name, filename='file1',
 
                content='line1\nline2\nline3\n', message='commit3',
 
                vcs_type='hg', parent=cs1)
 

	
 
        rev1 = 'default'
 
        rev2 = 'default'
 

	
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=repo1.repo_name,
 
                                    org_ref_type="branch",
 
                                    org_ref_name=rev2,
 
                                    other_repo=repo2.repo_name,
 
                                    other_ref_type="branch",
 
                                    other_ref_name=rev1,
 
                                    merge='1',))
 

	
 
        response.mustcontain('%s@%s' % (repo1.repo_name, rev2))
 
        response.mustcontain('%s@%s' % (repo2.repo_name, rev1))
 
        response.mustcontain("""Showing 2 commits""")
 
        response.mustcontain("""1 file changed with 2 insertions and 0 deletions""")
 

	
 
        response.mustcontain(_commit_div(cs1.raw_id, 'commit2'))
 
        response.mustcontain(_commit_div(cs2.raw_id, 'commit3'))
 

	
 
        response.mustcontain("""<a href="/%s/changeset/%s">r1:%s</a>""" % (repo2.repo_name, cs1.raw_id, cs1.short_id))
 
        response.mustcontain("""<a href="/%s/changeset/%s">r2:%s</a>""" % (repo2.repo_name, cs2.raw_id, cs2.short_id))
 
        ## files
 
        response.mustcontain("""<a href="#C--826e8142e6ba">file1</a>""")
 
        #swap
 
        response.mustcontain("""<a class="btn btn-small" href="/%s/compare/branch@%s...branch@%s?other_repo=%s&amp;merge=True"><i class="icon-arrows-cw"></i> Swap</a>""" % (repo2.repo_name, rev1, rev2, repo1.repo_name))
 

	
 
    def test_compare_forks_on_branch_extra_commits_origin_has_incomming_git(self):
 
        self.log_user()
 

	
 
        repo1 = fixture.create_repo('one-git', repo_type='git',
 
        repo1 = fixture.create_repo(u'one-git', repo_type='git',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN)
 

	
 
        self.r1_id = repo1.repo_id
 

	
 
        #commit something !
 
        cs0 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='git',
 
                parent=None, newfile=True)
 

	
 
        #fork this repo
 
        repo2 = fixture.create_fork('one-git', 'one-git-fork')
 
        repo2 = fixture.create_fork(u'one-git', u'one-git-fork')
 
        self.r2_id = repo2.repo_id
 

	
 
        #now commit something to origin repo
 
        cs1_prim = fixture.commit_change(repo1.repo_name, filename='file2',
 
                content='line1file2\n', message='commit2', vcs_type='git',
 
                parent=cs0, newfile=True)
 

	
 
        #add two extra commit into fork
 
        cs1 = fixture.commit_change(repo2.repo_name, filename='file1',
 
                content='line1\nline2\n', message='commit2', vcs_type='git',
 
                parent=cs0)
 

	
 
        cs2 = fixture.commit_change(repo2.repo_name, filename='file1',
 
                content='line1\nline2\nline3\n', message='commit3',
 
                vcs_type='git', parent=cs1)
 

	
 
        rev1 = 'master'
 
        rev2 = 'master'
 

	
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=repo1.repo_name,
 
                                    org_ref_type="branch",
 
                                    org_ref_name=rev2,
 
                                    other_repo=repo2.repo_name,
 
                                    other_ref_type="branch",
 
                                    other_ref_name=rev1,
 
                                    merge='1',))
 

	
 
        response.mustcontain('%s@%s' % (repo1.repo_name, rev2))
 
        response.mustcontain('%s@%s' % (repo2.repo_name, rev1))
 
        response.mustcontain("""Showing 2 commits""")
 
        response.mustcontain("""1 file changed with 2 insertions and 0 deletions""")
 

	
 
        response.mustcontain(_commit_div(cs1.raw_id, 'commit2'))
 
        response.mustcontain(_commit_div(cs2.raw_id, 'commit3'))
 

	
 
        response.mustcontain("""<a href="/%s/changeset/%s">r1:%s</a>""" % (repo2.repo_name, cs1.raw_id, cs1.short_id))
 
        response.mustcontain("""<a href="/%s/changeset/%s">r2:%s</a>""" % (repo2.repo_name, cs2.raw_id, cs2.short_id))
 
        ## files
 
        response.mustcontain("""<a href="#C--826e8142e6ba">file1</a>""")
 
        #swap
 
        response.mustcontain("""<a class="btn btn-small" href="/%s/compare/branch@%s...branch@%s?other_repo=%s&amp;merge=True"><i class="icon-arrows-cw"></i> Swap</a>""" % (repo2.repo_name, rev1, rev2, repo1.repo_name))
 

	
 
    def test_compare_cherry_pick_changesets_from_bottom(self):
 

	
 
#        repo1:
 
#            cs0:
 
#            cs1:
 
#        repo1-fork- in which we will cherry pick bottom changesets
 
#            cs0:
 
#            cs1:
 
#            cs2: x
 
#            cs3: x
 
#            cs4: x
 
#            cs5:
 
        #make repo1, and cs1+cs2
 
        self.log_user()
 

	
 
        repo1 = fixture.create_repo('repo1', repo_type='hg',
 
        repo1 = fixture.create_repo(u'repo1', repo_type='hg',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN)
 
        self.r1_id = repo1.repo_id
 

	
 
        #commit something !
 
        cs0 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
        cs1 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\nline2\n', message='commit2', vcs_type='hg',
 
                parent=cs0)
 
        #fork this repo
 
        repo2 = fixture.create_fork('repo1', 'repo1-fork')
 
        repo2 = fixture.create_fork(u'repo1', u'repo1-fork')
 
        self.r2_id = repo2.repo_id
 
        #now make cs3-6
 
        cs2 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\nline2\nline3\n', message='commit3',
 
                vcs_type='hg', parent=cs1)
 
        cs3 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\nline2\nline3\nline4\n', message='commit4',
 
                vcs_type='hg', parent=cs2)
 
        cs4 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\nline2\nline3\nline4\nline5\n',
 
                message='commit5', vcs_type='hg', parent=cs3)
 
        cs5 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\nline2\nline3\nline4\nline5\nline6\n',
 
                message='commit6', vcs_type='hg', parent=cs4)
 

	
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=repo2.repo_name,
 
                                    org_ref_type="rev",
 
                                    org_ref_name=cs1.short_id,  # parent of cs2, in repo2
 
                                    other_repo=repo1.repo_name,
 
                                    other_ref_type="rev",
 
                                    other_ref_name=cs4.short_id,
 
                                    merge='True',
 
                                    ))
 
        response.mustcontain('%s@%s' % (repo2.repo_name, cs1.short_id))
 
        response.mustcontain('%s@%s' % (repo1.repo_name, cs4.short_id))
 
        response.mustcontain("""Showing 3 commits""")
 
        response.mustcontain("""1 file changed with 3 insertions and 0 deletions""")
 

	
 
        response.mustcontain(_commit_div(cs2.raw_id, 'commit3'))
 
        response.mustcontain(_commit_div(cs3.raw_id, 'commit4'))
 
        response.mustcontain(_commit_div(cs4.raw_id, 'commit5'))
 

	
 
        response.mustcontain("""<a href="/%s/changeset/%s">r2:%s</a>""" % (repo1.repo_name, cs2.raw_id, cs2.short_id))
 
        response.mustcontain("""<a href="/%s/changeset/%s">r3:%s</a>""" % (repo1.repo_name, cs3.raw_id, cs3.short_id))
 
        response.mustcontain("""<a href="/%s/changeset/%s">r4:%s</a>""" % (repo1.repo_name, cs4.raw_id, cs4.short_id))
 
        ## files
 
        response.mustcontain("""#C--826e8142e6ba">file1</a>""")
 

	
 
    def test_compare_cherry_pick_changesets_from_top(self):
 
#        repo1:
 
#            cs0:
 
#            cs1:
 
#        repo1-fork- in which we will cherry pick bottom changesets
 
#            cs0:
 
#            cs1:
 
#            cs2:
 
#            cs3: x
 
#            cs4: x
 
#            cs5: x
 
#
 
        #make repo1, and cs1+cs2
 
        self.log_user()
 
        repo1 = fixture.create_repo('repo1', repo_type='hg',
 
        repo1 = fixture.create_repo(u'repo1', repo_type='hg',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN)
 
        self.r1_id = repo1.repo_id
 

	
 
        #commit something !
 
        cs0 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
        cs1 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\nline2\n', message='commit2', vcs_type='hg',
 
                parent=cs0)
 
        #fork this repo
 
        repo2 = fixture.create_fork('repo1', 'repo1-fork')
 
        repo2 = fixture.create_fork(u'repo1', u'repo1-fork')
 
        self.r2_id = repo2.repo_id
 
        #now make cs3-6
 
        cs2 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\nline2\nline3\n', message='commit3',
 
                vcs_type='hg', parent=cs1)
 
        cs3 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\nline2\nline3\nline4\n', message='commit4',
 
                vcs_type='hg', parent=cs2)
 
        cs4 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\nline2\nline3\nline4\nline5\n',
 
                message='commit5', vcs_type='hg', parent=cs3)
 
        cs5 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\nline2\nline3\nline4\nline5\nline6\n',
 
                message='commit6', vcs_type='hg', parent=cs4)
 

	
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=repo1.repo_name,
 
                                    org_ref_type="rev",
 
                                    org_ref_name=cs2.short_id, # parent of cs3, not in repo2
 
                                    other_ref_type="rev",
 
                                    other_ref_name=cs5.short_id,
 
                                    merge='1',))
 

	
 
        response.mustcontain('%s@%s' % (repo1.repo_name, cs2.short_id))
 
        response.mustcontain('%s@%s' % (repo1.repo_name, cs5.short_id))
 
        response.mustcontain("""Showing 3 commits""")
 
        response.mustcontain("""1 file changed with 3 insertions and 0 deletions""")
 

	
 
        response.mustcontain(_commit_div(cs3.raw_id, 'commit4'))
 
        response.mustcontain(_commit_div(cs4.raw_id, 'commit5'))
 
        response.mustcontain(_commit_div(cs5.raw_id, 'commit6'))
 

	
 
        response.mustcontain("""<a href="/%s/changeset/%s">r3:%s</a>""" % (repo1.repo_name, cs3.raw_id, cs3.short_id))
 
        response.mustcontain("""<a href="/%s/changeset/%s">r4:%s</a>""" % (repo1.repo_name, cs4.raw_id, cs4.short_id))
 
        response.mustcontain("""<a href="/%s/changeset/%s">r5:%s</a>""" % (repo1.repo_name, cs5.raw_id, cs5.short_id))
 
        ## files
 
        response.mustcontain("""#C--826e8142e6ba">file1</a>""")
 

	
 
    def test_compare_cherry_pick_changeset_mixed_branches(self):
 
        #TODO: write this
 
        assert 1
 

	
 
    def test_compare_remote_branches_hg(self):
 
        self.log_user()
 

	
 
        repo2 = fixture.create_fork(HG_REPO, HG_FORK)
 
        self.r2_id = repo2.repo_id
 
        rev1 = '56349e29c2af'
 
@@ -400,189 +400,189 @@ class TestCompareController(TestControll
 
                                    merge='1',))
 

	
 
        response.mustcontain('%s@%s' % (HG_REPO, rev1))
 
        response.mustcontain('%s@%s' % (HG_FORK, rev2))
 
        ## outgoing changesets between those revisions
 

	
 
        response.mustcontain("""<a href="/%s/changeset/2dda4e345facb0ccff1a191052dd1606dba6781d">r4:2dda4e345fac</a>""" % (HG_FORK))
 
        response.mustcontain("""<a href="/%s/changeset/6fff84722075f1607a30f436523403845f84cd9e">r5:6fff84722075</a>""" % (HG_FORK))
 
        response.mustcontain("""<a href="/%s/changeset/7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7">r6:%s</a>""" % (HG_FORK, rev2))
 

	
 
        ## files
 
        response.mustcontain("""<a href="#C--9c390eb52cd6">vcs/backends/hg.py</a>""")
 
        response.mustcontain("""<a href="#C--41b41c1f2796">vcs/backends/__init__.py</a>""")
 
        response.mustcontain("""<a href="#C--2f574d260608">vcs/backends/base.py</a>""")
 

	
 
    def test_compare_remote_branches_git(self):
 
        self.log_user()
 

	
 
        repo2 = fixture.create_fork(GIT_REPO, GIT_FORK)
 
        self.r2_id = repo2.repo_id
 
        rev1 = '102607b09cdd60e2793929c4f90478be29f85a17'
 
        rev2 = 'd7e0d30fbcae12c90680eb095a4f5f02505ce501'
 

	
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=GIT_REPO,
 
                                    org_ref_type="rev",
 
                                    org_ref_name=rev1,
 
                                    other_ref_type="rev",
 
                                    other_ref_name=rev2,
 
                                    other_repo=GIT_FORK,
 
                                    merge='1',))
 

	
 
        response.mustcontain('%s@%s' % (GIT_REPO, rev1))
 
        response.mustcontain('%s@%s' % (GIT_FORK, rev2))
 
        ## outgoing changesets between those revisions
 

	
 
        response.mustcontain("""<a href="/%s/changeset/49d3fd156b6f7db46313fac355dca1a0b94a0017">r4:49d3fd156b6f</a>""" % (GIT_FORK))
 
        response.mustcontain("""<a href="/%s/changeset/2d1028c054665b962fa3d307adfc923ddd528038">r5:2d1028c05466</a>""" % (GIT_FORK))
 
        response.mustcontain("""<a href="/%s/changeset/d7e0d30fbcae12c90680eb095a4f5f02505ce501">r6:%s</a>""" % (GIT_FORK, rev2[:12]))
 

	
 
        ## files
 
        response.mustcontain("""<a href="#C--9c390eb52cd6">vcs/backends/hg.py</a>""")
 
        response.mustcontain("""<a href="#C--41b41c1f2796">vcs/backends/__init__.py</a>""")
 
        response.mustcontain("""<a href="#C--2f574d260608">vcs/backends/base.py</a>""")
 

	
 
    def test_org_repo_new_commits_after_forking_simple_diff_hg(self):
 
        self.log_user()
 

	
 
        repo1 = fixture.create_repo('one', repo_type='hg',
 
        repo1 = fixture.create_repo(u'one', repo_type='hg',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN)
 

	
 
        self.r1_id = repo1.repo_id
 
        r1_name = repo1.repo_name
 

	
 
        cs0 = fixture.commit_change(repo=r1_name, filename='file1',
 
                content='line1', message='commit1', vcs_type='hg', newfile=True)
 
        Session().commit()
 
        self.assertEqual(repo1.scm_instance.revisions, [cs0.raw_id])
 
        #fork the repo1
 
        repo2 = fixture.create_repo('one-fork', repo_type='hg',
 
        repo2 = fixture.create_repo(u'one-fork', repo_type='hg',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN,
 
                                    clone_uri=repo1.repo_full_path,
 
                                    fork_of='one')
 
        Session().commit()
 
        self.assertEqual(repo2.scm_instance.revisions, [cs0.raw_id])
 
        self.r2_id = repo2.repo_id
 
        r2_name = repo2.repo_name
 

	
 

	
 
        cs1 = fixture.commit_change(repo=r2_name, filename='file1-fork',
 
                content='file1-line1-from-fork', message='commit1-fork',
 
                vcs_type='hg', parent=repo2.scm_instance[-1], newfile=True)
 

	
 
        cs2 = fixture.commit_change(repo=r2_name, filename='file2-fork',
 
                content='file2-line1-from-fork', message='commit2-fork',
 
                vcs_type='hg', parent=cs1, newfile=True)
 

	
 
        cs3 = fixture.commit_change(repo=r2_name, filename='file3-fork',
 
                content='file3-line1-from-fork', message='commit3-fork',
 
                vcs_type='hg', parent=cs2, newfile=True)
 
        #compare !
 
        rev1 = 'default'
 
        rev2 = 'default'
 

	
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=r2_name,
 
                                    org_ref_type="branch",
 
                                    org_ref_name=rev1,
 
                                    other_ref_type="branch",
 
                                    other_ref_name=rev2,
 
                                    other_repo=r1_name,
 
                                    merge='1',))
 

	
 
        response.mustcontain('%s@%s' % (r2_name, rev1))
 
        response.mustcontain('%s@%s' % (r1_name, rev2))
 
        response.mustcontain('No files')
 
        response.mustcontain('No changesets')
 

	
 
        cs0 = fixture.commit_change(repo=r1_name, filename='file2',
 
                content='line1-added-after-fork', message='commit2-parent',
 
                vcs_type='hg', parent=None, newfile=True)
 

	
 
        #compare !
 
        rev1 = 'default'
 
        rev2 = 'default'
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=r2_name,
 
                                    org_ref_type="branch",
 
                                    org_ref_name=rev1,
 
                                    other_ref_type="branch",
 
                                    other_ref_name=rev2,
 
                                    other_repo=r1_name,
 
                                    merge='1',
 
                                    ))
 

	
 
        response.mustcontain('%s@%s' % (r2_name, rev1))
 
        response.mustcontain('%s@%s' % (r1_name, rev2))
 

	
 
        response.mustcontain("""commit2-parent""")
 
        response.mustcontain("""1 file changed with 1 insertions and 0 deletions""")
 
        response.mustcontain("""line1-added-after-fork""")
 

	
 
    def test_org_repo_new_commits_after_forking_simple_diff_git(self):
 
        self.log_user()
 

	
 
        repo1 = fixture.create_repo('one-git', repo_type='git',
 
        repo1 = fixture.create_repo(u'one-git', repo_type='git',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN)
 

	
 
        self.r1_id = repo1.repo_id
 
        r1_name = repo1.repo_name
 

	
 
        cs0 = fixture.commit_change(repo=r1_name, filename='file1',
 
                content='line1', message='commit1', vcs_type='git',
 
                newfile=True)
 
        Session().commit()
 
        self.assertEqual(repo1.scm_instance.revisions, [cs0.raw_id])
 
        #fork the repo1
 
        repo2 = fixture.create_repo('one-git-fork', repo_type='git',
 
        repo2 = fixture.create_repo(u'one-git-fork', repo_type='git',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN,
 
                                    clone_uri=repo1.repo_full_path,
 
                                    fork_of='one-git')
 
        Session().commit()
 
        self.assertEqual(repo2.scm_instance.revisions, [cs0.raw_id])
 
        self.r2_id = repo2.repo_id
 
        r2_name = repo2.repo_name
 

	
 

	
 
        cs1 = fixture.commit_change(repo=r2_name, filename='file1-fork',
 
                content='file1-line1-from-fork', message='commit1-fork',
 
                vcs_type='git', parent=repo2.scm_instance[-1], newfile=True)
 

	
 
        cs2 = fixture.commit_change(repo=r2_name, filename='file2-fork',
 
                content='file2-line1-from-fork', message='commit2-fork',
 
                vcs_type='git', parent=cs1, newfile=True)
 

	
 
        cs3 = fixture.commit_change(repo=r2_name, filename='file3-fork',
 
                content='file3-line1-from-fork', message='commit3-fork',
 
                vcs_type='git', parent=cs2, newfile=True)
 
        #compare !
 
        rev1 = 'master'
 
        rev2 = 'master'
 

	
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=r2_name,
 
                                    org_ref_type="branch",
 
                                    org_ref_name=rev1,
 
                                    other_ref_type="branch",
 
                                    other_ref_name=rev2,
 
                                    other_repo=r1_name,
 
                                    merge='1',))
 

	
 
        response.mustcontain('%s@%s' % (r2_name, rev1))
 
        response.mustcontain('%s@%s' % (r1_name, rev2))
 
        response.mustcontain('No files')
 
        response.mustcontain('No changesets')
 

	
 
        cs0 = fixture.commit_change(repo=r1_name, filename='file2',
 
                content='line1-added-after-fork', message='commit2-parent',
 
                vcs_type='git', parent=None, newfile=True)
 

	
 
        #compare !
 
        rev1 = 'master'
 
        rev2 = 'master'
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=r2_name,
kallithea/tests/functional/test_files.py
Show inline comments
 
@@ -329,434 +329,434 @@ removed extra unicode conversion in diff
 
                                 params={
 
                                    'content': '',
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'No content')
 

	
 
    def test_add_file_into_hg_missing_filename(self):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=HG_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'No filename')
 

	
 
    @parameterized.expand([
 
        ('/abs', 'foo'),
 
        ('../rel', 'foo'),
 
        ('file/../foo', 'foo'),
 
    ])
 
    def test_add_file_into_hg_bad_filenames(self, location, filename):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=HG_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'Location must be relative path and must not contain .. in path')
 

	
 
    @parameterized.expand([
 
        (1, '', 'foo.txt'),
 
        (2, 'dir', 'foo.rst'),
 
        (3, 'rel/dir', 'foo.bar'),
 
    ])
 
    def test_add_file_into_hg(self, cnt, location, filename):
 
        self.log_user()
 
        repo = fixture.create_repo('commit-test-%s' % cnt, repo_type='hg')
 
        repo = fixture.create_repo(u'commit-test-%s' % cnt, repo_type='hg')
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    # Git - add file
 
    def test_add_file_view_git(self):
 
        self.log_user()
 
        response = self.app.get(url('files_add_home',
 
                                      repo_name=GIT_REPO,
 
                                      revision='tip', f_path='/'))
 

	
 
    def test_add_file_into_git_missing_content(self):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=GIT_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                     'content': '',
 
                                     '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        self.checkSessionFlash(response, 'No content')
 

	
 
    def test_add_file_into_git_missing_filename(self):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=GIT_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'No filename')
 

	
 
    @parameterized.expand([
 
        ('/abs', 'foo'),
 
        ('../rel', 'foo'),
 
        ('file/../foo', 'foo'),
 
    ])
 
    def test_add_file_into_git_bad_filenames(self, location, filename):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=GIT_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'Location must be relative path and must not contain .. in path')
 

	
 
    @parameterized.expand([
 
        (1, '', 'foo.txt'),
 
        (2, 'dir', 'foo.rst'),
 
        (3, 'rel/dir', 'foo.bar'),
 
    ])
 
    def test_add_file_into_git(self, cnt, location, filename):
 
        self.log_user()
 
        repo = fixture.create_repo('commit-test-%s' % cnt, repo_type='git')
 
        repo = fixture.create_repo(u'commit-test-%s' % cnt, repo_type='git')
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    # Hg - EDIT
 
    def test_edit_file_view_hg(self):
 
        self.log_user()
 
        response = self.app.get(url('files_edit_home',
 
                                      repo_name=HG_REPO,
 
                                      revision='tip', f_path='vcs/nodes.py'))
 

	
 
    def test_edit_file_view_not_on_branch_hg(self):
 
        self.log_user()
 
        repo = fixture.create_repo('test-edit-repo', repo_type='hg')
 
        repo = fixture.create_repo(u'test-edit-repo', repo_type='hg')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
            response = self.app.get(url('files_edit_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision='tip', f_path='vcs/nodes.py'),
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                'You can only edit files with revision being a valid branch')
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    def test_edit_file_view_commit_changes_hg(self):
 
        self.log_user()
 
        repo = fixture.create_repo('test-edit-repo', repo_type='hg')
 
        repo = fixture.create_repo(u'test-edit-repo', repo_type='hg')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip',
 
                                      f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
            response = self.app.post(url('files_edit_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision=repo.scm_instance.DEFAULT_BRANCH_NAME,
 
                                          f_path='vcs/nodes.py'),
 
                                     params={
 
                                        'content': "def py():\n print 'hello world'\n",
 
                                        'message': 'i commited',
 
                                        '_authentication_token': self.authentication_token(),
 
                                     },
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                                   'Successfully committed to vcs/nodes.py')
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    # Git - edit
 
    def test_edit_file_view_git(self):
 
        self.log_user()
 
        response = self.app.get(url('files_edit_home',
 
                                      repo_name=GIT_REPO,
 
                                      revision='tip', f_path='vcs/nodes.py'))
 

	
 
    def test_edit_file_view_not_on_branch_git(self):
 
        self.log_user()
 
        repo = fixture.create_repo('test-edit-repo', repo_type='git')
 
        repo = fixture.create_repo(u'test-edit-repo', repo_type='git')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
            response = self.app.get(url('files_edit_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision='tip', f_path='vcs/nodes.py'),
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                'You can only edit files with revision being a valid branch')
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    def test_edit_file_view_commit_changes_git(self):
 
        self.log_user()
 
        repo = fixture.create_repo('test-edit-repo', repo_type='git')
 
        repo = fixture.create_repo(u'test-edit-repo', repo_type='git')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip',
 
                                      f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
            response = self.app.post(url('files_edit_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision=repo.scm_instance.DEFAULT_BRANCH_NAME,
 
                                          f_path='vcs/nodes.py'),
 
                                     params={
 
                                        'content': "def py():\n print 'hello world'\n",
 
                                        'message': 'i commited',
 
                                        '_authentication_token': self.authentication_token(),
 
                                     },
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                                   'Successfully committed to vcs/nodes.py')
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    # Hg - delete
 
    def test_delete_file_view_hg(self):
 
        self.log_user()
 
        response = self.app.get(url('files_delete_home',
 
                                     repo_name=HG_REPO,
 
                                     revision='tip', f_path='vcs/nodes.py'))
 

	
 
    def test_delete_file_view_not_on_branch_hg(self):
 
        self.log_user()
 
        repo = fixture.create_repo('test-delete-repo', repo_type='hg')
 
        repo = fixture.create_repo(u'test-delete-repo', repo_type='hg')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
            response = self.app.get(url('files_delete_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision='tip', f_path='vcs/nodes.py'),
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                'You can only delete files with revision being a valid branch')
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    def test_delete_file_view_commit_changes_hg(self):
 
        self.log_user()
 
        repo = fixture.create_repo('test-delete-repo', repo_type='hg')
 
        repo = fixture.create_repo(u'test-delete-repo', repo_type='hg')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip',
 
                                      f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
            response = self.app.post(url('files_delete_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision=repo.scm_instance.DEFAULT_BRANCH_NAME,
 
                                          f_path='vcs/nodes.py'),
 
                                     params={
 
                                        'message': 'i commited',
 
                                        '_authentication_token': self.authentication_token(),
 
                                     },
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                                   'Successfully deleted file vcs/nodes.py')
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    # Git - delete
 
    def test_delete_file_view_git(self):
 
        self.log_user()
 
        response = self.app.get(url('files_delete_home',
 
                                     repo_name=HG_REPO,
 
                                     revision='tip', f_path='vcs/nodes.py'))
 

	
 
    def test_delete_file_view_not_on_branch_git(self):
 
        self.log_user()
 
        repo = fixture.create_repo('test-delete-repo', repo_type='git')
 
        repo = fixture.create_repo(u'test-delete-repo', repo_type='git')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
            response = self.app.get(url('files_delete_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision='tip', f_path='vcs/nodes.py'),
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                'You can only delete files with revision being a valid branch')
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    def test_delete_file_view_commit_changes_git(self):
 
        self.log_user()
 
        repo = fixture.create_repo('test-delete-repo', repo_type='git')
 
        repo = fixture.create_repo(u'test-delete-repo', repo_type='git')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip',
 
                                      f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
            response = self.app.post(url('files_delete_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision=repo.scm_instance.DEFAULT_BRANCH_NAME,
 
                                          f_path='vcs/nodes.py'),
 
                                     params={
 
                                        'message': 'i commited',
 
                                        '_authentication_token': self.authentication_token(),
 
                                     },
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                                   'Successfully deleted file vcs/nodes.py')
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
kallithea/tests/functional/test_forks.py
Show inline comments
 
@@ -56,97 +56,97 @@ class _BaseTestCase(object):
 
    def test_no_permissions_to_fork(self):
 
        usr = self.log_user(TEST_USER_REGULAR_LOGIN,
 
                            TEST_USER_REGULAR_PASS)['user_id']
 
        user_model = UserModel()
 
        user_model.revoke_perm(usr, 'hg.fork.repository')
 
        user_model.grant_perm(usr, 'hg.fork.none')
 
        u = UserModel().get(usr)
 
        u.inherit_default_permissions = False
 
        Session().commit()
 
        # try create a fork
 
        repo_name = self.REPO
 
        self.app.post(url(controller='forks', action='fork_create',
 
                          repo_name=repo_name), {'_authentication_token': self.authentication_token()}, status=403)
 

	
 
    def test_index_with_fork(self):
 
        self.log_user()
 

	
 
        # create a fork
 
        fork_name = self.REPO_FORK
 
        description = 'fork of vcs test'
 
        repo_name = self.REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        creation_args = {
 
            'repo_name': fork_name,
 
            'repo_group': u'-1',
 
            'fork_parent_id': org_repo.repo_id,
 
            'repo_type': self.REPO_TYPE,
 
            'description': description,
 
            'private': 'False',
 
            'landing_rev': 'rev:tip',
 
            '_authentication_token': self.authentication_token()}
 

	
 
        self.app.post(url(controller='forks', action='fork_create',
 
                          repo_name=repo_name), creation_args)
 

	
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 

	
 
        response.mustcontain(
 
            """<a href="/%s">%s</a>""" % (fork_name, fork_name)
 
        )
 

	
 
        # remove this fork
 
        response = self.app.post(url('delete_repo', repo_name=fork_name),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 

	
 
    def test_fork_create_into_group(self):
 
        self.log_user()
 
        group = fixture.create_repo_group('vc')
 
        group = fixture.create_repo_group(u'vc')
 
        group_id = group.group_id
 
        fork_name = self.REPO_FORK
 
        fork_name_full = 'vc/%s' % fork_name
 
        description = 'fork of vcs test'
 
        repo_name = self.REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        creation_args = {
 
            'repo_name': fork_name,
 
            'repo_group': group_id,
 
            'fork_parent_id': org_repo.repo_id,
 
            'repo_type': self.REPO_TYPE,
 
            'description': description,
 
            'private': 'False',
 
            'landing_rev': 'rev:tip',
 
            '_authentication_token': self.authentication_token()}
 
        self.app.post(url(controller='forks', action='fork_create',
 
                          repo_name=repo_name), creation_args)
 
        repo = Repository.get_by_repo_name(fork_name_full)
 
        assert repo.fork.repo_name == self.REPO
 

	
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=fork_name_full))
 
        #test if we have a message that fork is ok
 
        self.checkSessionFlash(response,
 
                'Forked repository %s as <a href="/%s">%s</a>'
 
                % (repo_name, fork_name_full, fork_name_full))
 

	
 
        #test if the fork was created in the database
 
        fork_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == fork_name_full).one()
 

	
 
        self.assertEqual(fork_repo.repo_name, fork_name_full)
 
        self.assertEqual(fork_repo.fork.repo_name, repo_name)
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=fork_name_full))
 
        response.mustcontain(fork_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 
        response.mustcontain('Fork of "<a href="/%s">%s</a>"' % (repo_name, repo_name))
 

	
 
        fixture.destroy_repo(fork_name_full)
 
        fixture.destroy_repo_group(group_id)
 

	
 
    def test_z_fork_create(self):
 
        self.log_user()
 
        fork_name = self.REPO_FORK
 
        description = 'fork of vcs test'
 
        repo_name = self.REPO
kallithea/tests/functional/test_home.py
Show inline comments
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.model.meta import Session
 
from kallithea.model.db import Repository
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestHomeController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='home', action='index'))
 
        #if global permission is set
 
        response.mustcontain('Add Repository')
 
        # html in javascript variable:
 
        response.mustcontain('var data = {"totalRecords": %s' % len(Repository.getAll()))
 
        response.mustcontain(r'href=\"/%s\"' % HG_REPO)
 

	
 
        response.mustcontain(r'<span class="repotag">git')
 
        response.mustcontain(r'<i class=\"icon-globe\"')
 

	
 
        response.mustcontain("""fixes issue with having custom format for git-log""")
 
        response.mustcontain("""/%s/changeset/5f2c6ee195929b0be80749243c18121c9864a3b3""" % GIT_REPO)
 

	
 
        response.mustcontain("""disable security checks on hg clone for travis""")
 
        response.mustcontain("""/%s/changeset/96507bd11ecc815ebc6270fdf6db110928c09c1e""" % HG_REPO)
 

	
 
    def test_repo_summary_with_anonymous_access_disabled(self):
 
        with fixture.anon_access(False):
 
            response = self.app.get(url(controller='summary',
 
                                        action='index', repo_name=HG_REPO),
 
                                        status=302)
 
            assert 'login' in response.location
 

	
 
    def test_index_with_anonymous_access_disabled(self):
 
        with fixture.anon_access(False):
 
            response = self.app.get(url(controller='home', action='index'),
 
                                    status=302)
 
            assert 'login' in response.location
 

	
 
    def test_index_page_on_groups(self):
 
        self.log_user()
 
        gr = fixture.create_repo_group('gr1')
 
        fixture.create_repo(name='gr1/repo_in_group', repo_group=gr)
 
        response = self.app.get(url('repos_group_home', group_name='gr1'))
 
        gr = fixture.create_repo_group(u'gr1')
 
        fixture.create_repo(name=u'gr1/repo_in_group', repo_group=gr)
 
        response = self.app.get(url('repos_group_home', group_name=u'gr1'))
 

	
 
        try:
 
            response.mustcontain("gr1/repo_in_group")
 
            response.mustcontain(u"gr1/repo_in_group")
 
        finally:
 
            RepoModel().delete('gr1/repo_in_group')
 
            RepoGroupModel().delete(repo_group='gr1', force_delete=True)
 
            RepoModel().delete(u'gr1/repo_in_group')
 
            RepoGroupModel().delete(repo_group=u'gr1', force_delete=True)
 
            Session().commit()
kallithea/tests/functional/test_login.py
Show inline comments
 
@@ -305,98 +305,98 @@ class TestLoginController(TestController
 
    def test_register_ok(self):
 
        username = 'test_regular4'
 
        password = 'qweqwe'
 
        email = 'user4@example.com'
 
        name = 'testname'
 
        lastname = 'testlastname'
 

	
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': username,
 
                                             'password': password,
 
                                             'password_confirmation': password,
 
                                             'email': email,
 
                                             'firstname': name,
 
                                             'lastname': lastname,
 
                                             'admin': True})  # This should be overriden
 
        self.assertEqual(response.status, '302 Found')
 
        self.checkSessionFlash(response, 'You have successfully registered into Kallithea')
 

	
 
        ret = Session().query(User).filter(User.username == 'test_regular4').one()
 
        self.assertEqual(ret.username, username)
 
        self.assertEqual(check_password(password, ret.password), True)
 
        self.assertEqual(ret.email, email)
 
        self.assertEqual(ret.name, name)
 
        self.assertEqual(ret.lastname, lastname)
 
        self.assertNotEqual(ret.api_key, None)
 
        self.assertEqual(ret.admin, False)
 

	
 
    #==========================================================================
 
    # PASSWORD RESET
 
    #==========================================================================
 

	
 
    def test_forgot_password_wrong_mail(self):
 
        bad_email = 'username%wrongmail.org'
 
        response = self.app.post(
 
                        url(controller='login', action='password_reset'),
 
                            {'email': bad_email, }
 
        )
 

	
 
        response.mustcontain('An email address must contain a single @')
 

	
 
    def test_forgot_password(self):
 
        response = self.app.get(url(controller='login',
 
                                    action='password_reset'))
 
        self.assertEqual(response.status, '200 OK')
 

	
 
        username = 'test_password_reset_1'
 
        password = 'qweqwe'
 
        email = 'username@example.com'
 
        name = 'passwd'
 
        lastname = 'reset'
 
        name = u'passwd'
 
        lastname = u'reset'
 
        timestamp = int(time.time())
 

	
 
        new = User()
 
        new.username = username
 
        new.password = password
 
        new.email = email
 
        new.name = name
 
        new.lastname = lastname
 
        new.api_key = generate_api_key()
 
        Session().add(new)
 
        Session().commit()
 

	
 
        response = self.app.post(url(controller='login',
 
                                     action='password_reset'),
 
                                 {'email': email, })
 

	
 
        self.checkSessionFlash(response, 'A password reset confirmation code has been sent')
 

	
 
        response = response.follow()
 

	
 
        # BAD TOKEN
 

	
 
        token = "bad"
 

	
 
        response = self.app.post(url(controller='login',
 
                                     action='password_reset_confirmation'),
 
                                 {'email': email,
 
                                  'timestamp': timestamp,
 
                                  'password': "p@ssw0rd",
 
                                  'password_confirm': "p@ssw0rd",
 
                                  'token': token,
 
                                 })
 
        self.assertEqual(response.status, '200 OK')
 
        response.mustcontain('Invalid password reset token')
 

	
 
        # GOOD TOKEN
 

	
 
        # TODO: The token should ideally be taken from the mail sent
 
        # above, instead of being recalculated.
 

	
 
        token = UserModel().get_reset_password_token(
 
            User.get_by_username(username), timestamp, self.authentication_token())
 

	
 
        response = self.app.get(url(controller='login',
 
                                    action='password_reset_confirmation',
 
                                    email=email,
 
                                    timestamp=timestamp,
 
                                    token=token))
kallithea/tests/functional/test_my_account.py
Show inline comments
 
@@ -111,115 +111,115 @@ class TestMyAccountController(TestContro
 
        user_id = usr.user_id
 
        self.log_user(username=self.test_user_1, password='qweqwe')
 

	
 
        params.update({'password_confirmation': ''})
 
        params.update({'new_password': ''})
 
        params.update({'extern_type': 'internal'})
 
        params.update({'extern_name': self.test_user_1})
 
        params.update({'_authentication_token': self.authentication_token()})
 

	
 
        params.update(attrs)
 
        response = self.app.post(url('my_account'), params)
 

	
 
        self.checkSessionFlash(response,
 
                               'Your account was updated successfully')
 

	
 
        updated_user = User.get_by_username(self.test_user_1)
 
        updated_params = updated_user.get_api_data(True)
 
        updated_params.update({'password_confirmation': ''})
 
        updated_params.update({'new_password': ''})
 

	
 
        params['last_login'] = updated_params['last_login']
 
        if name == 'email':
 
            params['emails'] = [attrs['email']]
 
        if name == 'extern_type':
 
            #cannot update this via form, expected value is original one
 
            params['extern_type'] = "internal"
 
        if name == 'extern_name':
 
            #cannot update this via form, expected value is original one
 
            params['extern_name'] = str(user_id)
 
        if name == 'active':
 
            #my account cannot deactivate account
 
            params['active'] = True
 
        if name == 'admin':
 
            #my account cannot make you an admin !
 
            params['admin'] = False
 

	
 
        params.pop('_authentication_token')
 
        self.assertEqual(params, updated_params)
 

	
 
    def test_my_account_update_err_email_exists(self):
 
        self.log_user()
 

	
 
        new_email = TEST_USER_REGULAR_EMAIL  # already existing email
 
        response = self.app.post(url('my_account'),
 
                                params=dict(
 
                                    username=TEST_USER_ADMIN_LOGIN,
 
                                    new_password=TEST_USER_ADMIN_PASS,
 
                                    password_confirmation='test122',
 
                                    firstname='NewName',
 
                                    lastname='NewLastname',
 
                                    firstname=u'NewName',
 
                                    lastname=u'NewLastname',
 
                                    email=new_email,
 
                                    _authentication_token=self.authentication_token())
 
                                )
 

	
 
        response.mustcontain('This email address is already in use')
 

	
 
    def test_my_account_update_err(self):
 
        self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 

	
 
        new_email = 'newmail.pl'
 
        response = self.app.post(url('my_account'),
 
                                 params=dict(
 
                                            username=TEST_USER_ADMIN_LOGIN,
 
                                            new_password=TEST_USER_ADMIN_PASS,
 
                                            password_confirmation='test122',
 
                                            firstname='NewName',
 
                                            lastname='NewLastname',
 
                                            firstname=u'NewName',
 
                                            lastname=u'NewLastname',
 
                                            email=new_email,
 
                                            _authentication_token=self.authentication_token()))
 

	
 
        response.mustcontain('An email address must contain a single @')
 
        from kallithea.model import validators
 
        msg = validators.ValidUsername(edit=False, old_data={}) \
 
                ._messages['username_exists']
 
        msg = h.html_escape(msg % {'username': TEST_USER_ADMIN_LOGIN})
 
        response.mustcontain(msg)
 

	
 
    def test_my_account_api_keys(self):
 
        usr = self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        user = User.get(usr['user_id'])
 
        response = self.app.get(url('my_account_api_keys'))
 
        response.mustcontain(user.api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
    @parameterized.expand([
 
        ('forever', -1),
 
        ('5mins', 60*5),
 
        ('30days', 60*60*24*30),
 
    ])
 
    def test_my_account_add_api_keys(self, desc, lifetime):
 
        usr = self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        user = User.get(usr['user_id'])
 
        response = self.app.post(url('my_account_api_keys'),
 
                                 {'description': desc, 'lifetime': lifetime, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        try:
 
            response = response.follow()
 
            user = User.get(usr['user_id'])
 
            for api_key in user.api_keys:
 
                response.mustcontain(api_key)
 
        finally:
 
            for api_key in UserApiKeys.query().all():
 
                Session().delete(api_key)
 
                Session().commit()
 

	
 
    def test_my_account_remove_api_key(self):
 
        usr = self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        user = User.get(usr['user_id'])
 
        response = self.app.post(url('my_account_api_keys'),
 
                                 {'description': 'desc', 'lifetime': -1, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        response = response.follow()
 

	
 
        #now delete our key
 
        keys = UserApiKeys.query().all()
kallithea/tests/functional/test_pullrequests.py
Show inline comments
 
@@ -101,103 +101,103 @@ class TestPullrequestsController(TestCon
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  'owner': TEST_USER_ADMIN_LOGIN,
 
                                  '_authentication_token': self.authentication_token(),
 
                                  'review_members': invalid_user_id,
 
                                 },
 
                                 status=400)
 
        response.mustcontain('Invalid reviewer &#34;%s&#34; specified' % invalid_user_id)
 

	
 
    def test_edit_with_invalid_reviewer(self):
 
        invalid_user_id = 99999
 
        self.log_user()
 
        # create a valid pull request
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
                                 {
 
                                  'org_repo': HG_REPO,
 
                                  'org_ref': 'branch:default:default',
 
                                  'other_repo': HG_REPO,
 
                                  'other_ref': 'branch:default:default',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_authentication_token': self.authentication_token(),
 
                                 }
 
                                )
 
        self.assertEqual(response.status, '302 Found')
 
        # location is of the form:
 
        # http://localhost/vcs_test_hg/pull-request/54/_/title
 
        m = re.search('/pull-request/(\d+)/', response.location)
 
        self.assertNotEqual(m, None)
 
        pull_request_id = m.group(1)
 

	
 
        # edit it
 
        response = self.app.post(url(controller='pullrequests', action='post',
 
                                     repo_name=HG_REPO, pull_request_id=pull_request_id),
 
                                 {
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  'owner': TEST_USER_ADMIN_LOGIN,
 
                                  '_authentication_token': self.authentication_token(),
 
                                  'review_members': invalid_user_id,
 
                                 },
 
                                 status=400)
 
        response.mustcontain('Invalid reviewer &#34;%s&#34; specified' % invalid_user_id)
 

	
 
class TestPullrequestsGetRepoRefs(TestController):
 

	
 
    def setUp(self):
 
        self.main = fixture.create_repo('main', repo_type='hg')
 
        self.main = fixture.create_repo(u'main', repo_type='hg')
 
        Session.add(self.main)
 
        Session.commit()
 
        self.c = PullrequestsController()
 

	
 
    def tearDown(self):
 
        fixture.destroy_repo('main')
 
        fixture.destroy_repo(u'main')
 
        Session.commit()
 
        Session.remove()
 

	
 
    def test_repo_refs_empty_repo(self):
 
        # empty repo with no commits, no branches, no bookmarks, just one tag
 
        refs, default = self.c._get_repo_refs(self.main.scm_instance)
 
        self.assertEqual(default, 'tag:null:0000000000000000000000000000000000000000')
 

	
 
    def test_repo_refs_one_commit_no_hints(self):
 
        cs0 = fixture.commit_change(self.main.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.main.scm_instance)
 
        self.assertEqual(default, 'branch:default:%s' % cs0.raw_id)
 
        self.assertIn(([('branch:default:%s' % cs0.raw_id, 'default (current tip)')],
 
                'Branches'), refs)
 

	
 
    def test_repo_refs_one_commit_rev_hint(self):
 
        cs0 = fixture.commit_change(self.main.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.main.scm_instance, rev=cs0.raw_id)
 
        expected = 'branch:default:%s' % cs0.raw_id
 
        self.assertEqual(default, expected)
 
        self.assertIn(([(expected, 'default (current tip)')], 'Branches'), refs)
 

	
 
    def test_repo_refs_two_commits_no_hints(self):
 
        cs0 = fixture.commit_change(self.main.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
        cs1 = fixture.commit_change(self.main.repo_name, filename='file2',
 
                content='line2\n', message='commit2', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.main.scm_instance)
 
        expected = 'branch:default:%s' % cs1.raw_id
 
        self.assertEqual(default, expected)
 
        self.assertIn(([(expected, 'default (current tip)')], 'Branches'), refs)
 

	
 
    def test_repo_refs_two_commits_rev_hints(self):
 
        cs0 = fixture.commit_change(self.main.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
        cs1 = fixture.commit_change(self.main.repo_name, filename='file2',
 
                content='line2\n', message='commit2', vcs_type='hg',
 
                parent=None, newfile=True)
kallithea/tests/functional/test_summary.py
Show inline comments
 
@@ -37,105 +37,105 @@ class TestSummaryController(TestControll
 
        )
 
        #public/private
 
        response.mustcontain(
 
            """<i class="icon-globe">"""
 
        )
 

	
 
        # clone url...
 
        response.mustcontain('''id="clone_url" readonly="readonly" value="http://%s@localhost:80/%s"''' % (TEST_USER_ADMIN_LOGIN, HG_REPO))
 
        response.mustcontain('''id="clone_url_id" readonly="readonly" value="http://%s@localhost:80/_%s"''' % (TEST_USER_ADMIN_LOGIN, ID))
 

	
 
    def test_index_git(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(GIT_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name=GIT_REPO))
 

	
 
        #repo type
 
        response.mustcontain(
 
            """<span class="repotag">git"""
 
        )
 
        #public/private
 
        response.mustcontain(
 
            """<i class="icon-globe">"""
 
        )
 

	
 
        # clone url...
 
        response.mustcontain('''id="clone_url" readonly="readonly" value="http://%s@localhost:80/%s"''' % (TEST_USER_ADMIN_LOGIN, GIT_REPO))
 
        response.mustcontain('''id="clone_url_id" readonly="readonly" value="http://%s@localhost:80/_%s"''' % (TEST_USER_ADMIN_LOGIN, ID))
 

	
 
    def test_index_by_id_hg(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(HG_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name='_%s' % ID))
 

	
 
        #repo type
 
        response.mustcontain(
 
            """<span class="repotag">hg"""
 
        )
 
        #public/private
 
        response.mustcontain(
 
            """<i class="icon-globe">"""
 
        )
 

	
 
    def test_index_by_repo_having_id_path_in_name_hg(self):
 
        self.log_user()
 
        fixture.create_repo(name='repo_1')
 
        fixture.create_repo(name=u'repo_1')
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name='repo_1'))
 

	
 
        try:
 
            response.mustcontain("repo_1")
 
        finally:
 
            RepoModel().delete(Repository.get_by_repo_name('repo_1'))
 
            RepoModel().delete(Repository.get_by_repo_name(u'repo_1'))
 
            Session().commit()
 

	
 
    def test_index_by_id_git(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(GIT_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name='_%s' % ID))
 

	
 
        #repo type
 
        response.mustcontain(
 
            """<span class="repotag">git"""
 
        )
 
        #public/private
 
        response.mustcontain(
 
            """<i class="icon-globe">"""
 
        )
 

	
 
    def _enable_stats(self, repo):
 
        r = Repository.get_by_repo_name(repo)
 
        r.enable_statistics = True
 
        Session().add(r)
 
        Session().commit()
 

	
 
    def test_index_trending(self):
 
        self.log_user()
 
        #codes stats
 
        self._enable_stats(HG_REPO)
 

	
 
        ScmModel().mark_for_invalidation(HG_REPO)
 
        response = self.app.get(url(controller='summary', action='index',
 
                                    repo_name=HG_REPO))
 
        response.mustcontain(
 
            '[["py", {"count": 68, "desc": ["Python"]}], '
 
            '["rst", {"count": 16, "desc": ["Rst"]}], '
 
            '["css", {"count": 2, "desc": ["Css"]}], '
 
            '["sh", {"count": 2, "desc": ["Bash"]}], '
 
            '["yml", {"count": 1, "desc": ["Yaml"]}], '
 
            '["makefile", {"count": 1, "desc": ["Makefile", "Makefile"]}], '
 
            '["js", {"count": 1, "desc": ["Javascript"]}], '
 
            '["cfg", {"count": 1, "desc": ["Ini"]}], '
 
            '["ini", {"count": 1, "desc": ["Ini"]}], '
 
            '["html", {"count": 1, "desc": ["EvoqueHtml", "Html"]}]];'
 
        )
 

	
 
    def test_index_statistics(self):
 
        self.log_user()
 
        #codes stats
kallithea/tests/models/common.py
Show inline comments
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 

	
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.db import RepoGroup, Repository, User
 
from kallithea.model.user import UserModel
 

	
 
from kallithea.lib.auth import AuthUser
 
from kallithea.model.meta import Session
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
def _destroy_project_tree(test_u1_id):
 
    Session.remove()
 
    repo_group = RepoGroup.get_by_group_name(group_name='g0')
 
    repo_group = RepoGroup.get_by_group_name(group_name=u'g0')
 
    for el in reversed(repo_group.recursive_groups_and_repos()):
 
        if isinstance(el, Repository):
 
            RepoModel().delete(el)
 
        elif isinstance(el, RepoGroup):
 
            RepoGroupModel().delete(el, force_delete=True)
 

	
 
    u = User.get(test_u1_id)
 
    Session().delete(u)
 
    Session().commit()
 

	
 

	
 
def _create_project_tree():
 
    """
 
    Creates a tree of groups and repositories to test permissions
 

	
 
    structure
 
     [g0] - group `g0` with 3 subgroups
 
     |
 
     |__[g0_1] group g0_1 with 2 groups 0 repos
 
     |  |
 
     |  |__[g0_1_1] group g0_1_1 with 1 group 2 repos
 
     |  |   |__<g0/g0_1/g0_1_1/g0_1_1_r1>
 
     |  |   |__<g0/g0_1/g0_1_1/g0_1_1_r2>
 
     |  |__<g0/g0_1/g0_1_r1>
 
     |
 
     |__[g0_2] 2 repos
 
     |  |
 
     |  |__<g0/g0_2/g0_2_r1>
 
     |  |__<g0/g0_2/g0_2_r2>
 
     |
 
     |__[g0_3] 1 repo
 
        |
 
        |_<g0/g0_3/g0_3_r1>
 
        |_<g0/g0_3/g0_3_r2_private>
 

	
 
    """
 
    test_u1 = UserModel().create_or_update(
 
        username=u'test_u1', password=u'qweqwe',
 
        email=u'test_u1@example.com', firstname=u'test_u1', lastname=u'test_u1'
 
    )
 
    g0 = fixture.create_repo_group('g0')
 
    g0_1 = fixture.create_repo_group('g0_1', group_parent_id=g0)
 
    g0_1_1 = fixture.create_repo_group('g0_1_1', group_parent_id=g0_1)
 
    g0_1_1_r1 = fixture.create_repo('g0/g0_1/g0_1_1/g0_1_1_r1', repo_group=g0_1_1)
 
    g0_1_1_r2 = fixture.create_repo('g0/g0_1/g0_1_1/g0_1_1_r2', repo_group=g0_1_1)
 
    g0_1_r1 = fixture.create_repo('g0/g0_1/g0_1_r1', repo_group=g0_1)
 
    g0_2 = fixture.create_repo_group('g0_2', group_parent_id=g0)
 
    g0_2_r1 = fixture.create_repo('g0/g0_2/g0_2_r1', repo_group=g0_2)
 
    g0_2_r2 = fixture.create_repo('g0/g0_2/g0_2_r2', repo_group=g0_2)
 
    g0_3 = fixture.create_repo_group('g0_3', group_parent_id=g0)
 
    g0_3_r1 = fixture.create_repo('g0/g0_3/g0_3_r1', repo_group=g0_3)
 
    g0_3_r2_private = fixture.create_repo('g0/g0_3/g0_3_r1_private',
 
    g0 = fixture.create_repo_group(u'g0')
 
    g0_1 = fixture.create_repo_group(u'g0_1', group_parent_id=g0)
 
    g0_1_1 = fixture.create_repo_group(u'g0_1_1', group_parent_id=g0_1)
 
    g0_1_1_r1 = fixture.create_repo(u'g0/g0_1/g0_1_1/g0_1_1_r1', repo_group=g0_1_1)
 
    g0_1_1_r2 = fixture.create_repo(u'g0/g0_1/g0_1_1/g0_1_1_r2', repo_group=g0_1_1)
 
    g0_1_r1 = fixture.create_repo(u'g0/g0_1/g0_1_r1', repo_group=g0_1)
 
    g0_2 = fixture.create_repo_group(u'g0_2', group_parent_id=g0)
 
    g0_2_r1 = fixture.create_repo(u'g0/g0_2/g0_2_r1', repo_group=g0_2)
 
    g0_2_r2 = fixture.create_repo(u'g0/g0_2/g0_2_r2', repo_group=g0_2)
 
    g0_3 = fixture.create_repo_group(u'g0_3', group_parent_id=g0)
 
    g0_3_r1 = fixture.create_repo(u'g0/g0_3/g0_3_r1', repo_group=g0_3)
 
    g0_3_r2_private = fixture.create_repo(u'g0/g0_3/g0_3_r1_private',
 
                                          repo_group=g0_3, repo_private=True)
 
    return test_u1
 

	
 

	
 
def expected_count(group_name, objects=False):
 
    repo_group = RepoGroup.get_by_group_name(group_name=group_name)
 
    objs = repo_group.recursive_groups_and_repos()
 
    if objects:
 
        return objs
 
    return len(objs)
 

	
 

	
 
def _check_expected_count(items, repo_items, expected):
 
    should_be = len(items + repo_items)
 
    there_are = len(expected)
 
    assert should_be == there_are, ('%s != %s' % ((items + repo_items), expected))
 

	
 

	
 
def check_tree_perms(obj_name, repo_perm, prefix, expected_perm):
 
    assert repo_perm == expected_perm, ('obj:`%s` got perm:`%s` should:`%s`'
 
                                    % (obj_name, repo_perm, expected_perm))
 

	
 

	
 
def _get_perms(filter_='', recursive=None, key=None, test_u1_id=None):
 
    test_u1 = AuthUser(user_id=test_u1_id)
 
    for k, v in test_u1.permissions[key].items():
 
        if recursive in ['all', 'repos', 'groups'] and k.startswith(filter_):
 
            yield k, v
 
        elif recursive in ['none']:
 
            if k == filter_:
 
                yield k, v
kallithea/tests/models/test_permissions.py
Show inline comments
 
@@ -60,292 +60,292 @@ class TestPermissions(BaseTestCase):
 
            RepoGroupModel().delete(self.g2.group_id)
 

	
 
        if hasattr(self, 'ug1'):
 
            UserGroupModel().delete(self.ug1, force=True)
 
        if hasattr(self, 'ug2'):
 
            UserGroupModel().delete(self.ug2, force=True)
 

	
 
        Session().commit()
 

	
 
    def test_default_perms_set(self):
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set(['hg.create.repository', 'repository.read',
 
                           'hg.register.manual_activate']),
 
            'repositories': {HG_REPO: 'repository.read'}
 
        }
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         perms['repositories'][HG_REPO])
 
        new_perm = 'repository.write'
 
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
 
                                          perm=new_perm)
 
        Session().commit()
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         new_perm)
 

	
 
    def test_default_admin_perms_set(self):
 
        a1_auth = AuthUser(user_id=self.a1.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set(['hg.admin', 'hg.create.write_on_repogroup.true']),
 
            'repositories': {HG_REPO: 'repository.admin'}
 
        }
 
        self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
 
                         perms['repositories'][HG_REPO])
 
        new_perm = 'repository.write'
 
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1,
 
                                          perm=new_perm)
 
        Session().commit()
 
        # cannot really downgrade admins permissions !? they still gets set as
 
        # admin !
 
        u1_auth = AuthUser(user_id=self.a1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         perms['repositories'][HG_REPO])
 

	
 
    def test_default_group_perms(self):
 
        self.g1 = fixture.create_repo_group('test1', skip_if_exists=True)
 
        self.g2 = fixture.create_repo_group('test2', skip_if_exists=True)
 
        self.g1 = fixture.create_repo_group(u'test1', skip_if_exists=True)
 
        self.g2 = fixture.create_repo_group(u'test2', skip_if_exists=True)
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        perms = {
 
            'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'},
 
            'global': set(Permission.DEFAULT_USER_PERMISSIONS),
 
            'repositories': {HG_REPO: 'repository.read'}
 
        }
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         perms['repositories'][HG_REPO])
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         perms['repositories_groups'])
 
        self.assertEqual(u1_auth.permissions['global'],
 
                         perms['global'])
 

	
 
    def test_default_admin_group_perms(self):
 
        self.g1 = fixture.create_repo_group('test1', skip_if_exists=True)
 
        self.g2 = fixture.create_repo_group('test2', skip_if_exists=True)
 
        self.g1 = fixture.create_repo_group(u'test1', skip_if_exists=True)
 
        self.g2 = fixture.create_repo_group(u'test2', skip_if_exists=True)
 
        a1_auth = AuthUser(user_id=self.a1.user_id)
 
        perms = {
 
            'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'},
 
            'global': set(['hg.admin', 'hg.create.write_on_repogroup.true']),
 
            'repositories': {HG_REPO: 'repository.admin'}
 
        }
 

	
 
        self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
 
                         perms['repositories'][HG_REPO])
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                         perms['repositories_groups'])
 

	
 
    def test_propagated_permission_from_users_group_by_explicit_perms_exist(self):
 
        # make group
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        UserGroupModel().add_user_to_group(self.ug1, self.u1)
 

	
 
        # set permission to lower
 
        new_perm = 'repository.none'
 
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         new_perm)
 

	
 
        # grant perm for group this should not override permission from user
 
        # since it has explicitly set
 
        new_perm_gr = 'repository.write'
 
        RepoModel().grant_user_group_permission(repo=HG_REPO,
 
                                                 group_name=self.ug1,
 
                                                 perm=new_perm_gr)
 
        # check perms
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set(['hg.create.repository', 'repository.read',
 
                           'hg.register.manual_activate']),
 
            'repositories': {HG_REPO: 'repository.read'}
 
        }
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         new_perm)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         perms['repositories_groups'])
 

	
 
    def test_propagated_permission_from_users_group(self):
 
        # make group
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        UserGroupModel().add_user_to_group(self.ug1, self.u3)
 

	
 
        # grant perm for group this should override default permission from user
 
        new_perm_gr = 'repository.write'
 
        RepoModel().grant_user_group_permission(repo=HG_REPO,
 
                                                 group_name=self.ug1,
 
                                                 perm=new_perm_gr)
 
        # check perms
 
        u3_auth = AuthUser(user_id=self.u3.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set(['hg.create.repository', 'repository.read',
 
                           'hg.register.manual_activate']),
 
            'repositories': {HG_REPO: 'repository.read'}
 
        }
 
        self.assertEqual(u3_auth.permissions['repositories'][HG_REPO],
 
                         new_perm_gr)
 
        self.assertEqual(u3_auth.permissions['repositories_groups'],
 
                         perms['repositories_groups'])
 

	
 
    def test_propagated_permission_from_users_group_lower_weight(self):
 
        # make group
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        # add user to group
 
        UserGroupModel().add_user_to_group(self.ug1, self.u1)
 

	
 
        # set permission to lower
 
        new_perm_h = 'repository.write'
 
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
 
                                          perm=new_perm_h)
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         new_perm_h)
 

	
 
        # grant perm for group this should NOT override permission from user
 
        # since it's lower than granted
 
        new_perm_l = 'repository.read'
 
        RepoModel().grant_user_group_permission(repo=HG_REPO,
 
                                                 group_name=self.ug1,
 
                                                 perm=new_perm_l)
 
        # check perms
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set(['hg.create.repository', 'repository.read',
 
                           'hg.register.manual_activate']),
 
            'repositories': {HG_REPO: 'repository.write'}
 
        }
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         new_perm_h)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         perms['repositories_groups'])
 

	
 
    def test_repo_in_group_permissions(self):
 
        self.g1 = fixture.create_repo_group('group1', skip_if_exists=True)
 
        self.g2 = fixture.create_repo_group('group2', skip_if_exists=True)
 
        self.g1 = fixture.create_repo_group(u'group1', skip_if_exists=True)
 
        self.g2 = fixture.create_repo_group(u'group2', skip_if_exists=True)
 
        # both perms should be read !
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.read', u'group2': u'group.read'})
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.read', u'group2': u'group.read'})
 

	
 
        #Change perms to none for both groups
 
        RepoGroupModel().grant_user_permission(repo_group=self.g1,
 
                                               user=self.anon,
 
                                               perm='group.none')
 
        RepoGroupModel().grant_user_permission(repo_group=self.g2,
 
                                               user=self.anon,
 
                                               perm='group.none')
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        # add repo to group
 
        name = RepoGroup.url_sep().join([self.g1.group_name, 'test_perm'])
 
        self.test_repo = fixture.create_repo(name=name,
 
                                             repo_type='hg',
 
                                             repo_group=self.g1,
 
                                             cur_user=self.u1,)
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        #grant permission for u2 !
 
        RepoGroupModel().grant_user_permission(repo_group=self.g1, user=self.u2,
 
                                               perm='group.read')
 
        RepoGroupModel().grant_user_permission(repo_group=self.g2, user=self.u2,
 
                                               perm='group.read')
 
        Session().commit()
 
        self.assertNotEqual(self.u1, self.u2)
 
        #u1 and anon should have not change perms while u2 should !
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        u2_auth = AuthUser(user_id=self.u2.user_id)
 
        self.assertEqual(u2_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.read', u'group2': u'group.read'})
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
    def test_repo_group_user_as_user_group_member(self):
 
        # create Group1
 
        self.g1 = fixture.create_repo_group('group1', skip_if_exists=True)
 
        self.g1 = fixture.create_repo_group(u'group1', skip_if_exists=True)
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 

	
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.read'})
 

	
 
        # set default permission to none
 
        RepoGroupModel().grant_user_permission(repo_group=self.g1,
 
                                               user=self.anon,
 
                                               perm='group.none')
 
        # make group
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        # add user to group
 
        UserGroupModel().add_user_to_group(self.ug1, self.u1)
 
        Session().commit()
 

	
 
        # check if user is in the group
 
        membrs = [x.user_id for x in UserGroupModel().get(self.ug1.users_group_id).members]
 
        self.assertEqual(membrs, [self.u1.user_id])
 
        # add some user to that group
 

	
 
        # check his permissions
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.none'})
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.none'})
 

	
 
        # grant ug1 read permissions for
 
        RepoGroupModel().grant_user_group_permission(repo_group=self.g1,
 
                                                      group_name=self.ug1,
 
                                                      perm='group.read')
 
        Session().commit()
 
        # check if the
 
        obj = Session().query(UserGroupRepoGroupToPerm) \
 
            .filter(UserGroupRepoGroupToPerm.group == self.g1) \
 
            .filter(UserGroupRepoGroupToPerm.users_group == self.ug1) \
 
            .scalar()
 
        self.assertEqual(obj.permission.permission_name, 'group.read')
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 

	
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.none'})
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.read'})
 

	
 
    def test_inherited_permissions_from_default_on_user_enabled(self):
 
        user_model = UserModel()
 
        # enable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.none')
 
        user_model.grant_perm(usr, 'hg.create.repository')
 
        user_model.revoke_perm(usr, 'hg.fork.none')
 
        user_model.grant_perm(usr, 'hg.fork.repository')
 
        # make sure inherit flag is turned on
 
@@ -396,328 +396,328 @@ class TestPermissions(BaseTestCase):
 
        user_model.grant_perm(self.u1, 'hg.fork.none')
 

	
 
        # make sure inherit flag is turned off
 
        self.u1.inherit_default_permissions = False
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        # this user will have non inherited permissions from he's
 
        # explicitly set permissions
 
        self.assertEqual(u1_auth.permissions['global'],
 
                         set(['hg.create.none', 'hg.fork.none',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read', 'hg.create.write_on_repogroup.true']))
 

	
 
    def test_non_inherited_permissions_from_default_on_user_disabled(self):
 
        user_model = UserModel()
 
        # disable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.repository')
 
        user_model.grant_perm(usr, 'hg.create.none')
 
        user_model.revoke_perm(usr, 'hg.fork.repository')
 
        user_model.grant_perm(usr, 'hg.fork.none')
 

	
 
        #enable global perms on specific user
 
        user_model.revoke_perm(self.u1, 'hg.create.none')
 
        user_model.grant_perm(self.u1, 'hg.create.repository')
 
        user_model.revoke_perm(self.u1, 'hg.fork.none')
 
        user_model.grant_perm(self.u1, 'hg.fork.repository')
 

	
 
        # make sure inherit flag is turned off
 
        self.u1.inherit_default_permissions = False
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        # this user will have non inherited permissions from he's
 
        # explicitly set permissions
 
        self.assertEqual(u1_auth.permissions['global'],
 
                         set(['hg.create.repository', 'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read', 'hg.create.write_on_repogroup.true']))
 

	
 
    def test_inactive_user_group_does_not_affect_global_permissions(self):
 
        # Issue #138: Inactive User Groups affecting permissions
 
        # Add user to inactive user group, set specific permissions on user
 
        # group and disable inherit-from-default. User permissions should still
 
        # inherit from default.
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        # enable fork and create on user group
 
        user_group_model.revoke_perm(self.ug1, perm='hg.create.none')
 
        user_group_model.grant_perm(self.ug1, perm='hg.create.repository')
 
        user_group_model.revoke_perm(self.ug1, perm='hg.fork.none')
 
        user_group_model.grant_perm(self.ug1, perm='hg.fork.repository')
 

	
 
        user_model = UserModel()
 
        # disable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.repository')
 
        user_model.grant_perm(usr, 'hg.create.none')
 
        user_model.revoke_perm(usr, 'hg.fork.repository')
 
        user_model.grant_perm(usr, 'hg.fork.none')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 

	
 
        self.assertEqual(u1_auth.permissions['global'],
 
                         set(['hg.create.none', 'hg.fork.none',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read',
 
                              'hg.create.write_on_repogroup.true']))
 

	
 
    def test_inactive_user_group_does_not_affect_global_permissions_inverse(self):
 
        # Issue #138: Inactive User Groups affecting permissions
 
        # Add user to inactive user group, set specific permissions on user
 
        # group and disable inherit-from-default. User permissions should still
 
        # inherit from default.
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        # disable fork and create on user group
 
        user_group_model.revoke_perm(self.ug1, perm='hg.create.repository')
 
        user_group_model.grant_perm(self.ug1, perm='hg.create.none')
 
        user_group_model.revoke_perm(self.ug1, perm='hg.fork.repository')
 
        user_group_model.grant_perm(self.ug1, perm='hg.fork.none')
 

	
 
        user_model = UserModel()
 
        # enable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.none')
 
        user_model.grant_perm(usr, 'hg.create.repository')
 
        user_model.revoke_perm(usr, 'hg.fork.none')
 
        user_model.grant_perm(usr, 'hg.fork.repository')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 

	
 
        self.assertEqual(u1_auth.permissions['global'],
 
                         set(['hg.create.repository', 'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read',
 
                              'hg.create.write_on_repogroup.true']))
 

	
 
    def test_inactive_user_group_does_not_affect_repo_permissions(self):
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        # note: make u2 repo owner rather than u1, because the owner always has
 
        # admin permissions
 
        self.test_repo = fixture.create_repo(name='myownrepo',
 
        self.test_repo = fixture.create_repo(name=u'myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u2)
 

	
 
        # enable admin access for user group on repo
 
        RepoModel().grant_user_group_permission(self.test_repo,
 
                                                group_name=self.ug1,
 
                                                perm='repository.admin')
 
        # enable only write access for default user on repo
 
        RepoModel().grant_user_permission(self.test_repo,
 
                                          user='default',
 
                                          perm='repository.write')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
 
                         'repository.write')
 

	
 
    def test_inactive_user_group_does_not_affect_repo_permissions_inverse(self):
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        # note: make u2 repo owner rather than u1, because the owner always has
 
        # admin permissions
 
        self.test_repo = fixture.create_repo(name='myownrepo',
 
        self.test_repo = fixture.create_repo(name=u'myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u2)
 

	
 
        # enable only write access for user group on repo
 
        RepoModel().grant_user_group_permission(self.test_repo,
 
                                                group_name=self.ug1,
 
                                                perm='repository.write')
 
        # enable admin access for default user on repo
 
        RepoModel().grant_user_permission(self.test_repo,
 
                                          user='default',
 
                                          perm='repository.admin')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
 
                         'repository.admin')
 

	
 
    def test_inactive_user_group_does_not_affect_repo_group_permissions(self):
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.g1 = fixture.create_repo_group('group1', skip_if_exists=True)
 
        self.g1 = fixture.create_repo_group(u'group1', skip_if_exists=True)
 

	
 
        # enable admin access for user group on repo group
 
        RepoGroupModel().grant_user_group_permission(self.g1,
 
                                                     group_name=self.ug1,
 
                                                     perm='group.admin')
 
        # enable only write access for default user on repo group
 
        RepoGroupModel().grant_user_permission(self.g1,
 
                                               user='default',
 
                                               perm='group.write')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.write'})
 

	
 
    def test_inactive_user_group_does_not_affect_repo_group_permissions_inverse(self):
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.g1 = fixture.create_repo_group('group1', skip_if_exists=True)
 
        self.g1 = fixture.create_repo_group(u'group1', skip_if_exists=True)
 

	
 
        # enable only write access for user group on repo group
 
        RepoGroupModel().grant_user_group_permission(self.g1,
 
                                                     group_name=self.ug1,
 
                                                     perm='group.write')
 
        # enable admin access for default user on repo group
 
        RepoGroupModel().grant_user_permission(self.g1,
 
                                               user='default',
 
                                               perm='group.admin')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.admin'})
 

	
 
    def test_inactive_user_group_does_not_affect_user_group_permissions(self):
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.ug2 = fixture.create_user_group('G2')
 
        self.ug2 = fixture.create_user_group(u'G2')
 

	
 
        # enable admin access for user group on user group
 
        UserGroupModel().grant_user_group_permission(self.ug2,
 
                                                     user_group=self.ug1,
 
                                                     perm='usergroup.admin')
 
        # enable only write access for default user on user group
 
        UserGroupModel().grant_user_permission(self.ug2,
 
                                               user='default',
 
                                               perm='usergroup.write')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['user_groups'][u'G1'], u'usergroup.read')
 
        self.assertEqual(u1_auth.permissions['user_groups'][u'G2'], u'usergroup.write')
 

	
 
    def test_inactive_user_group_does_not_affect_user_group_permissions_inverse(self):
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.ug2 = fixture.create_user_group('G2')
 
        self.ug2 = fixture.create_user_group(u'G2')
 

	
 
        # enable only write access for user group on user group
 
        UserGroupModel().grant_user_group_permission(self.ug2,
 
                                                     user_group=self.ug1,
 
                                                     perm='usergroup.write')
 
        # enable admin access for default user on user group
 
        UserGroupModel().grant_user_permission(self.ug2,
 
                                               user='default',
 
                                               perm='usergroup.admin')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['user_groups'][u'G1'], u'usergroup.read')
 
        self.assertEqual(u1_auth.permissions['user_groups'][u'G2'], u'usergroup.admin')
 

	
 
    def test_owner_permissions_doesnot_get_overwritten_by_group(self):
 
        #create repo as USER,
 
        self.test_repo = fixture.create_repo(name='myownrepo',
 
        self.test_repo = fixture.create_repo(name=u'myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u1)
 

	
 
        #he has permissions of admin as owner
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
 
                         'repository.admin')
 
        #set his permission as user group, he should still be admin
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        UserGroupModel().add_user_to_group(self.ug1, self.u1)
 
        RepoModel().grant_user_group_permission(self.test_repo,
 
                                                 group_name=self.ug1,
 
                                                 perm='repository.none')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
 
                         'repository.admin')
 

	
 
    def test_owner_permissions_doesnot_get_overwritten_by_others(self):
 
        #create repo as USER,
 
        self.test_repo = fixture.create_repo(name='myownrepo',
 
        self.test_repo = fixture.create_repo(name=u'myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u1)
 

	
 
        #he has permissions of admin as owner
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
 
                         'repository.admin')
 
        #set his permission as user, he should still be admin
 
        RepoModel().grant_user_permission(self.test_repo, user=self.u1,
 
                                          perm='repository.none')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
 
                         'repository.admin')
 

	
 
    def _test_def_perm_equal(self, user, change_factor=0):
 
        perms = UserToPerm.query() \
 
                .filter(UserToPerm.user == user) \
 
                .all()
 
        self.assertEqual(len(perms),
 
                         len(Permission.DEFAULT_USER_PERMISSIONS,)+change_factor,
 
                         msg=perms)
 

	
 
    def test_set_default_permissions(self):
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
 

	
 
    def test_set_default_permissions_after_one_is_missing(self):
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
 
        #now we delete one, it should be re-created after another call
 
        perms = UserToPerm.query() \
 
                .filter(UserToPerm.user == self.u1) \
 
                .all()
 
        Session().delete(perms[0])
 
        Session().commit()
 

	
 
        self._test_def_perm_equal(user=self.u1, change_factor=-1)
 

	
 
        #create missing one !
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
 

	
 
    @parameterized.expand([
 
        ('repository.read', 'repository.none'),
 
        ('group.read', 'group.none'),
 
        ('usergroup.read', 'usergroup.none'),
 
        ('hg.create.repository', 'hg.create.none'),

Changeset was too big and was cut off... Show full diff anyway

0 comments (0 inline, 0 general)