Changeset - e527cc2ce8dc
[Not reviewed]
default
0 43 0
Mads Kiilerich - 6 years ago 2020-01-02 00:44:56
mads@kiilerich.com
Grafted from: 11ff671647b1
cleanup: get rid of most "import *"

Apply script generated with the following hack:
(
hg loc '*.py'|xargs pyflakes-2 | sed -rn "s/([^:]*):.*'(.*)' may be undefined, or defined from star imports.*/sed -ri 's,\\\\<\2\\\\>([^=]|$),XXXX.\2\\\\1,g' \1/gp" | sort -u
hg loc '*.py'|xargs pyflakes-2 | sed -rn "s/([^:]*):.* undefined name '(.*)'$/sed -ri 's,\\\\<\2\\\\>([^=]|$),XXXX.\2\\\\1,g' \1/gp" | sort -u
hg loc '*.py'|xargs pyflakes-2 | sed -rn "s/([^:]*):.*'(from .*)\.([^.]*) import \*' used.*/sed -ri 's,\\\\<XXXX\\\\.,\3.,g' \1/gp" | sort -u
hg loc '*.py'|xargs pyflakes-2 | sed -rn "s/([^:]*):.*'(from .*)\.([^.]*) import \*' used.*/sed -ri 's,\2\\\\.\3 .*,\2 import \3,g' \1/gp" | sort -u
) | grep -v kallithea/bin/kallithea_cli_ishell.py > fix2.sh
17 files changed:
0 comments (0 inline, 0 general)
kallithea/bin/kallithea_cli_ishell.py
Show inline comments
 
@@ -22,13 +22,13 @@ Original author and date, and relevant c
 

	
 
from __future__ import print_function
 

	
 
import sys
 

	
 
import kallithea.bin.kallithea_cli_base as cli_base
 
from kallithea.model.db import *
 
from kallithea.model.db import *  # these names will be directly available in the IPython shell
 

	
 

	
 
@cli_base.register_command(config_file_initialize_app=True)
 
def ishell():
 
    """Interactive shell for Kallithea."""
 
    try:
kallithea/tests/api/api_base.py
Show inline comments
 
@@ -31,13 +31,13 @@ from kallithea.model.gist import GistMod
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.tests.base import *
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
API_URL = '/_admin/api'
 
TEST_USER_GROUP = u'test_user_group'
 
TEST_REPO_GROUP = u'test_repo_group'
 
@@ -72,32 +72,32 @@ def api_call(test_obj, params):
 
                                 params=params)
 
    return response
 

	
 

	
 
## helpers
 
def make_user_group(name=TEST_USER_GROUP):
 
    gr = fixture.create_user_group(name, cur_user=TEST_USER_ADMIN_LOGIN)
 
    gr = fixture.create_user_group(name, cur_user=base.TEST_USER_ADMIN_LOGIN)
 
    UserGroupModel().add_user_to_group(user_group=gr,
 
                                       user=TEST_USER_ADMIN_LOGIN)
 
                                       user=base.TEST_USER_ADMIN_LOGIN)
 
    Session().commit()
 
    return gr
 

	
 

	
 
def make_repo_group(name=TEST_REPO_GROUP):
 
    gr = fixture.create_repo_group(name, cur_user=TEST_USER_ADMIN_LOGIN)
 
    gr = fixture.create_repo_group(name, cur_user=base.TEST_USER_ADMIN_LOGIN)
 
    Session().commit()
 
    return gr
 

	
 

	
 
class _BaseTestApi(object):
 
    REPO = None
 
    REPO_TYPE = None
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        cls.usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        cls.usr = User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
 
        cls.apikey = cls.usr.api_key
 
        cls.test_user = UserModel().create_or_update(
 
            username='test-api',
 
            password='test',
 
            email='test@example.com',
 
            firstname=u'first',
 
@@ -227,16 +227,16 @@ class _BaseTestApi(object):
 
            ret_all.append(jsonify(ret))
 
        expected = ret_all
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user(self):
 
        id_, params = _build_data(self.apikey, 'get_user',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
                                  userid=base.TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        usr = User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(dbuser=usr).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
@@ -249,13 +249,13 @@ class _BaseTestApi(object):
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_without_giving_userid(self):
 
        id_, params = _build_data(self.apikey, 'get_user')
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        usr = User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(dbuser=usr).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
@@ -391,28 +391,28 @@ class _BaseTestApi(object):
 

	
 
        expected = "repository `%s` does not exist" % (self.REPO,)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_existing_user(self):
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=TEST_USER_ADMIN_LOGIN,
 
                                  username=base.TEST_USER_ADMIN_LOGIN,
 
                                  email='test@example.com',
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "user `%s` already exist" % TEST_USER_ADMIN_LOGIN
 
        expected = "user `%s` already exist" % base.TEST_USER_ADMIN_LOGIN
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_user_with_existing_email(self):
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=TEST_USER_ADMIN_LOGIN + 'new',
 
                                  email=TEST_USER_REGULAR_EMAIL,
 
                                  username=base.TEST_USER_ADMIN_LOGIN + 'new',
 
                                  email=base.TEST_USER_REGULAR_EMAIL,
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "email `%s` already exist" % TEST_USER_REGULAR_EMAIL
 
        expected = "email `%s` already exist" % base.TEST_USER_REGULAR_EMAIL
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_user(self):
 
        username = 'test_new_api_user'
 
        email = username + "@example.com"
 

	
 
@@ -522,13 +522,13 @@ class _BaseTestApi(object):
 
        response = api_call(self, params)
 
        ret = 'failed to delete user ID:%s %s' % (usr.user_id,
 
                                                  usr.username)
 
        expected = ret
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parametrize('name,expected', [
 
    @base.parametrize('name,expected', [
 
        ('firstname', 'new_username'),
 
        ('lastname', 'new_username'),
 
        ('email', 'new_username'),
 
        ('admin', True),
 
        ('admin', False),
 
        ('extern_type', 'ldap'),
 
@@ -555,36 +555,36 @@ class _BaseTestApi(object):
 
        }
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_update_user_no_changed_params(self):
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        usr = User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
 
        ret = jsonify(usr.get_api_data())
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
                                  userid=base.TEST_USER_ADMIN_LOGIN)
 

	
 
        response = api_call(self, params)
 
        ret = {
 
            'msg': 'updated user ID:%s %s' % (
 
                usr.user_id, TEST_USER_ADMIN_LOGIN),
 
                usr.user_id, base.TEST_USER_ADMIN_LOGIN),
 
            'user': ret
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_update_user_by_user_id(self):
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        usr = User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
 
        ret = jsonify(usr.get_api_data())
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=usr.user_id)
 

	
 
        response = api_call(self, params)
 
        ret = {
 
            'msg': 'updated user ID:%s %s' % (
 
                usr.user_id, TEST_USER_ADMIN_LOGIN),
 
                usr.user_id, base.TEST_USER_ADMIN_LOGIN),
 
            'user': ret
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_update_user_default_user(self):
 
@@ -595,13 +595,13 @@ class _BaseTestApi(object):
 
        response = api_call(self, params)
 
        expected = 'editing default user is forbidden'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserModel, 'update_user', crash)
 
    def test_api_update_user_when_exception_happens(self):
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        usr = User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
 
        ret = jsonify(usr.get_api_data())
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=usr.user_id)
 

	
 
        response = api_call(self, params)
 
        ret = 'failed to update user `%s`' % usr.user_id
 
@@ -655,13 +655,13 @@ class _BaseTestApi(object):
 
                                  with_revision_names=True,
 
                                  with_pullrequests=True)
 
        response = api_call(self, params)
 
        assert u"v0.2.0" in response.json[u'result'][u'tags']
 
        assert u'pull_requests' in response.json[u'result']
 

	
 
    @parametrize('grant_perm', [
 
    @base.parametrize('grant_perm', [
 
        ('repository.admin'),
 
        ('repository.write'),
 
        ('repository.read'),
 
    ])
 
    def test_api_get_repo_by_non_admin(self, grant_perm):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
@@ -752,13 +752,13 @@ class _BaseTestApi(object):
 
            repo.get_api_data()
 
            for repo in RepoModel().get_all_user_repos(self.TEST_USER_LOGIN)
 
        ])
 

	
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @parametrize('name,ret_type', [
 
    @base.parametrize('name,ret_type', [
 
        ('all', 'all'),
 
        ('dirs', 'dirs'),
 
        ('files', 'files'),
 
    ])
 
    def test_api_get_repo_nodes(self, name, ret_type):
 
        rev = 'tip'
 
@@ -807,13 +807,13 @@ class _BaseTestApi(object):
 
        response = api_call(self, params)
 

	
 
        expected = ('ret_type must be one of %s'
 
                    % (','.join(sorted(['files', 'dirs', 'all']))))
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parametrize('name,ret_type,grant_perm', [
 
    @base.parametrize('name,ret_type,grant_perm', [
 
        ('all', 'all', 'repository.write'),
 
        ('dirs', 'dirs', 'repository.admin'),
 
        ('files', 'files', 'repository.read'),
 
    ])
 
    def test_api_get_repo_nodes_by_regular_user(self, name, ret_type, grant_perm):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
@@ -838,13 +838,13 @@ class _BaseTestApi(object):
 
            RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
 

	
 
    def test_api_create_repo(self):
 
        repo_name = u'api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  owner=base.TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        assert repo is not None
 
@@ -854,24 +854,24 @@ class _BaseTestApi(object):
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    @parametrize('repo_name', [
 
    @base.parametrize('repo_name', [
 
        u'',
 
        u'.',
 
        u'..',
 
        u':',
 
        u'/',
 
        u'<test>',
 
    ])
 
    def test_api_create_repo_bad_names(self, repo_name):
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  owner=base.TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 
        if repo_name == '/':
 
            expected = "repo group `` not found"
 
            self._compare_error(id_, expected, given=response.body)
 
@@ -880,17 +880,17 @@ class _BaseTestApi(object):
 
            self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_clone_uri_local(self):
 
        # cloning from local repos was a mis-feature - it would bypass access control
 
        # TODO: introduce other test coverage of actual remote cloning
 
        clone_uri = os.path.join(TESTS_TMP_PATH, self.REPO)
 
        clone_uri = os.path.join(base.TESTS_TMP_PATH, self.REPO)
 
        repo_name = u'api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  owner=base.TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,
 
                                  clone_uri=clone_uri,
 
        )
 
        response = api_call(self, params)
 
        expected = "failed to create repository `%s`" % repo_name
 
        self._compare_error(id_, expected, given=response.body)
 
@@ -900,26 +900,26 @@ class _BaseTestApi(object):
 
        repo_group_name = u'my_gr'
 
        repo_name = u'%s/api-repo' % repo_group_name
 

	
 
        # repo creation can no longer also create repo group
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  owner=base.TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = u'repo group `%s` not found' % repo_group_name
 
        self._compare_error(id_, expected, given=response.body)
 
        assert RepoModel().get_by_repo_name(repo_name) is None
 

	
 
        # create group before creating repo
 
        rg = fixture.create_repo_group(repo_group_name)
 
        Session().commit()
 

	
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  owner=base.TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
@@ -1033,44 +1033,44 @@ class _BaseTestApi(object):
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_exists(self):
 
        repo_name = self.REPO
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  owner=base.TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = "repo `%s` already exist" % repo_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_repo_dot_dot(self):
 
        # it is only possible to create repositories in existing repo groups - and '..' can't be used
 
        group_name = '%s/..' % TEST_REPO_GROUP
 
        repo_name = '%s/%s' % (group_name, 'could-be-outside')
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  owner=base.TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = u'repo group `%s` not found' % group_name
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    @mock.patch.object(RepoModel, 'create', crash)
 
    def test_api_create_repo_exception_occurred(self):
 
        repo_name = u'api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  owner=base.TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = 'failed to create repository `%s`' % repo_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parametrize('changing_attr,updates', [
 
        ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
 
    @base.parametrize('changing_attr,updates', [
 
        ('owner', {'owner': base.TEST_USER_REGULAR_LOGIN}),
 
        ('description', {'description': u'new description'}),
 
        ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail
 
        ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a mis-feature - it would bypass access control
 
        ('clone_uri', {'clone_uri': None}),
 
        ('landing_rev', {'landing_rev': 'branch:master'}),
 
        ('enable_statistics', {'enable_statistics': True}),
 
@@ -1103,14 +1103,14 @@ class _BaseTestApi(object):
 
                self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            if changing_attr == 'repo_group':
 
                fixture.destroy_repo_group(updates['group'])
 

	
 
    @parametrize('changing_attr,updates', [
 
        ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
 
    @base.parametrize('changing_attr,updates', [
 
        ('owner', {'owner': base.TEST_USER_REGULAR_LOGIN}),
 
        ('description', {'description': u'new description'}),
 
        ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail
 
        ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a mis-feature - it would bypass access control
 
        ('clone_uri', {'clone_uri': None}),
 
        ('landing_rev', {'landing_rev': 'branch:master'}),
 
        ('enable_statistics', {'enable_statistics': True}),
 
@@ -1177,13 +1177,13 @@ class _BaseTestApi(object):
 

	
 
    @mock.patch.object(RepoModel, 'update', crash)
 
    def test_api_update_repo_exception_occurred(self):
 
        repo_name = u'api_update_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, owner=TEST_USER_ADMIN_LOGIN,)
 
                                  repoid=repo_name, owner=base.TEST_USER_ADMIN_LOGIN,)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'failed to update repo `%s`' % repo_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
@@ -1234,13 +1234,13 @@ class _BaseTestApi(object):
 
    def test_api_update_repo_regular_user_change_owner(self):
 
        repo_name = u'admin_owned'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        RepoModel().grant_user_permission(repo=repo_name,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.admin')
 
        updates = {'owner': TEST_USER_ADMIN_LOGIN}
 
        updates = {'owner': base.TEST_USER_ADMIN_LOGIN}
 
        id_, params = _build_data(self.apikey_regular, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'Only Kallithea admin can specify `owner` param'
 
            self._compare_error(id_, expected, given=response.body)
 
@@ -1311,13 +1311,13 @@ class _BaseTestApi(object):
 

	
 
    def test_api_fork_repo(self):
 
        fork_name = u'api-repo-fork'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  owner=base.TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
 
                                                     fork_name),
 
@@ -1325,13 +1325,13 @@ class _BaseTestApi(object):
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    @parametrize('fork_name', [
 
    @base.parametrize('fork_name', [
 
        u'api-repo-fork',
 
        u'%s/api-repo-fork' % TEST_REPO_GROUP,
 
    ])
 
    def test_api_fork_repo_non_admin(self, fork_name):
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
@@ -1351,13 +1351,13 @@ class _BaseTestApi(object):
 

	
 
    def test_api_fork_repo_non_admin_specify_owner(self):
 
        fork_name = u'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  owner=base.TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 
        expected = 'Only Kallithea admin can specify `owner` param'
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
@@ -1377,13 +1377,13 @@ class _BaseTestApi(object):
 
        finally:
 
            RepoModel().grant_user_permission(repo=self.REPO,
 
                                              user=self.default_user_username,
 
                                              perm='repository.read')
 
            fixture.destroy_repo(fork_name)
 

	
 
    @parametrize('name,perm', [
 
    @base.parametrize('name,perm', [
 
        ('read', 'repository.read'),
 
        ('write', 'repository.write'),
 
        ('admin', 'repository.admin'),
 
    ])
 
    def test_api_fork_repo_non_admin_no_create_repo_permission(self, name, perm):
 
        fork_name = u'api-repo-fork'
 
@@ -1422,13 +1422,13 @@ class _BaseTestApi(object):
 
        try:
 
            fork_name = u'api-repo-fork'
 

	
 
            id_, params = _build_data(self.apikey, 'fork_repo',
 
                                      repoid=self.REPO,
 
                                      fork_name=fork_name,
 
                                      owner=TEST_USER_ADMIN_LOGIN,
 
                                      owner=base.TEST_USER_ADMIN_LOGIN,
 
            )
 
            response = api_call(self, params)
 

	
 
            expected = "fork `%s` already exist" % fork_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
@@ -1437,26 +1437,26 @@ class _BaseTestApi(object):
 
    def test_api_fork_repo_repo_exists(self):
 
        fork_name = self.REPO
 

	
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  owner=base.TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        expected = "repo `%s` already exist" % fork_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'create_fork', crash)
 
    def test_api_fork_repo_exception_occurred(self):
 
        fork_name = u'api-repo-fork'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  owner=base.TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to fork repository `%s` as `%s`' % (self.REPO,
 
                                                               fork_name)
 
        self._compare_error(id_, expected, given=response.body)
 
@@ -1526,16 +1526,16 @@ class _BaseTestApi(object):
 
                                  group_name=group_name)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to create group `%s`' % group_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parametrize('changing_attr,updates', [
 
    @base.parametrize('changing_attr,updates', [
 
        ('group_name', {'group_name': u'new_group_name'}),
 
        ('group_name', {'group_name': u'test_group_for_update'}),
 
        ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
 
        ('owner', {'owner': base.TEST_USER_REGULAR_LOGIN}),
 
        ('active', {'active': False}),
 
        ('active', {'active': True}),
 
    ])
 
    def test_api_update_user_group(self, changing_attr, updates):
 
        gr_name = u'test_group_for_update'
 
        user_group = fixture.create_user_group(gr_name)
 
@@ -1571,75 +1571,75 @@ class _BaseTestApi(object):
 
    def test_api_add_user_to_user_group(self):
 
        gr_name = u'test_group'
 
        fixture.create_user_group(gr_name)
 
        try:
 
            id_, params = _build_data(self.apikey, 'add_user_to_user_group',
 
                                      usergroupid=gr_name,
 
                                      userid=TEST_USER_ADMIN_LOGIN)
 
                                      userid=base.TEST_USER_ADMIN_LOGIN)
 
            response = api_call(self, params)
 
            expected = {
 
            'msg': 'added member `%s` to user group `%s`' % (
 
                    TEST_USER_ADMIN_LOGIN, gr_name),
 
                    base.TEST_USER_ADMIN_LOGIN, gr_name),
 
            'success': True
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_add_user_to_user_group_that_doesnt_exist(self):
 
        id_, params = _build_data(self.apikey, 'add_user_to_user_group',
 
                                  usergroupid='false-group',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
                                  userid=base.TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = 'user group `%s` does not exist' % 'false-group'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserGroupModel, 'add_user_to_group', crash)
 
    def test_api_add_user_to_user_group_exception_occurred(self):
 
        gr_name = u'test_group'
 
        fixture.create_user_group(gr_name)
 
        try:
 
            id_, params = _build_data(self.apikey, 'add_user_to_user_group',
 
                                      usergroupid=gr_name,
 
                                      userid=TEST_USER_ADMIN_LOGIN)
 
                                      userid=base.TEST_USER_ADMIN_LOGIN)
 
            response = api_call(self, params)
 
            expected = 'failed to add member to user group `%s`' % gr_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_remove_user_from_user_group(self):
 
        gr_name = u'test_group_3'
 
        gr = fixture.create_user_group(gr_name)
 
        UserGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
 
        UserGroupModel().add_user_to_group(gr, user=base.TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 
        try:
 
            id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
 
                                      usergroupid=gr_name,
 
                                      userid=TEST_USER_ADMIN_LOGIN)
 
                                      userid=base.TEST_USER_ADMIN_LOGIN)
 
            response = api_call(self, params)
 
            expected = {
 
                'msg': 'removed member `%s` from user group `%s`' % (
 
                    TEST_USER_ADMIN_LOGIN, gr_name
 
                    base.TEST_USER_ADMIN_LOGIN, gr_name
 
                ),
 
                'success': True}
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    @mock.patch.object(UserGroupModel, 'remove_user_from_group', crash)
 
    def test_api_remove_user_from_user_group_exception_occurred(self):
 
        gr_name = u'test_group_3'
 
        gr = fixture.create_user_group(gr_name)
 
        UserGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
 
        UserGroupModel().add_user_to_group(gr, user=base.TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 
        try:
 
            id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
 
                                      usergroupid=gr_name,
 
                                      userid=TEST_USER_ADMIN_LOGIN)
 
                                      userid=base.TEST_USER_ADMIN_LOGIN)
 
            response = api_call(self, params)
 
            expected = 'failed to remove member from user group `%s`' % gr_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
@@ -1690,91 +1690,91 @@ class _BaseTestApi(object):
 
                response = api_call(self, params)
 
                expected = 'failed to delete user group ID:%s %s' % (gr_id, gr_name)
 
                self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    @parametrize('name,perm', [
 
    @base.parametrize('name,perm', [
 
        ('none', 'repository.none'),
 
        ('read', 'repository.read'),
 
        ('write', 'repository.write'),
 
        ('admin', 'repository.admin'),
 
    ])
 
    def test_api_grant_user_permission(self, name, perm):
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  userid=base.TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Granted perm: `%s` for user: `%s` in repo: `%s`' % (
 
                perm, TEST_USER_ADMIN_LOGIN, self.REPO
 
                perm, base.TEST_USER_ADMIN_LOGIN, self.REPO
 
            ),
 
            'success': True
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_grant_user_permission_wrong_permission(self):
 
        perm = 'haha.no.permission'
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  userid=base.TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        expected = 'permission `%s` does not exist' % perm
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'grant_user_permission', crash)
 
    def test_api_grant_user_permission_exception_when_adding(self):
 
        perm = 'repository.read'
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  userid=base.TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
 
            TEST_USER_ADMIN_LOGIN, self.REPO
 
            base.TEST_USER_ADMIN_LOGIN, self.REPO
 
        )
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_revoke_user_permission(self):
 
        id_, params = _build_data(self.apikey,
 
                                  'revoke_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN, )
 
                                  userid=base.TEST_USER_ADMIN_LOGIN, )
 
        response = api_call(self, params)
 

	
 
        expected = {
 
            'msg': 'Revoked perm for user: `%s` in repo: `%s`' % (
 
                TEST_USER_ADMIN_LOGIN, self.REPO
 
                base.TEST_USER_ADMIN_LOGIN, self.REPO
 
            ),
 
            'success': True
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'revoke_user_permission', crash)
 
    def test_api_revoke_user_permission_exception_when_adding(self):
 
        id_, params = _build_data(self.apikey,
 
                                  'revoke_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN, )
 
                                  userid=base.TEST_USER_ADMIN_LOGIN, )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
 
            TEST_USER_ADMIN_LOGIN, self.REPO
 
            base.TEST_USER_ADMIN_LOGIN, self.REPO
 
        )
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parametrize('name,perm', [
 
    @base.parametrize('name,perm', [
 
        ('none', 'repository.none'),
 
        ('read', 'repository.read'),
 
        ('write', 'repository.write'),
 
        ('admin', 'repository.admin'),
 
    ])
 
    def test_api_grant_user_group_permission(self, name, perm):
 
@@ -1850,13 +1850,13 @@ class _BaseTestApi(object):
 

	
 
        expected = 'failed to edit permission for user group: `%s` in repo: `%s`' % (
 
            TEST_USER_GROUP, self.REPO
 
        )
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parametrize('name,perm,apply_to_children', [
 
    @base.parametrize('name,perm,apply_to_children', [
 
        ('none', 'group.none', 'none'),
 
        ('read', 'group.read', 'none'),
 
        ('write', 'group.write', 'none'),
 
        ('admin', 'group.admin', 'none'),
 

	
 
        ('none', 'group.none', 'all'),
 
@@ -1875,26 +1875,26 @@ class _BaseTestApi(object):
 
        ('admin', 'group.admin', 'groups'),
 
    ])
 
    def test_api_grant_user_permission_to_repo_group(self, name, perm, apply_to_children):
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_permission_to_repo_group',
 
                                  repogroupid=TEST_REPO_GROUP,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  userid=base.TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm, apply_to_children=apply_to_children)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Granted perm: `%s` (recursive:%s) for user: `%s` in repo group: `%s`' % (
 
                perm, apply_to_children, TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
 
                perm, apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
 
            ),
 
            'success': True
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @parametrize('name,perm,apply_to_children,grant_admin,access_ok', [
 
    @base.parametrize('name,perm,apply_to_children,grant_admin,access_ok', [
 
        ('none_fails', 'group.none', 'none', False, False),
 
        ('read_fails', 'group.read', 'none', False, False),
 
        ('write_fails', 'group.write', 'none', False, False),
 
        ('admin_fails', 'group.admin', 'none', False, False),
 

	
 
        # with granted perms
 
@@ -1911,19 +1911,19 @@ class _BaseTestApi(object):
 
                                                   'group.admin')
 
            Session().commit()
 

	
 
        id_, params = _build_data(self.apikey_regular,
 
                                  'grant_user_permission_to_repo_group',
 
                                  repogroupid=TEST_REPO_GROUP,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  userid=base.TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm, apply_to_children=apply_to_children)
 
        response = api_call(self, params)
 
        if access_ok:
 
            ret = {
 
                'msg': 'Granted perm: `%s` (recursive:%s) for user: `%s` in repo group: `%s`' % (
 
                    perm, apply_to_children, TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
 
                    perm, apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
 
                ),
 
                'success': True
 
            }
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        else:
 
@@ -1932,62 +1932,62 @@ class _BaseTestApi(object):
 

	
 
    def test_api_grant_user_permission_to_repo_group_wrong_permission(self):
 
        perm = 'haha.no.permission'
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_permission_to_repo_group',
 
                                  repogroupid=TEST_REPO_GROUP,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  userid=base.TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        expected = 'permission `%s` does not exist' % perm
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoGroupModel, 'grant_user_permission', crash)
 
    def test_api_grant_user_permission_to_repo_group_exception_when_adding(self):
 
        perm = 'group.read'
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_permission_to_repo_group',
 
                                  repogroupid=TEST_REPO_GROUP,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  userid=base.TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to edit permission for user: `%s` in repo group: `%s`' % (
 
            TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
 
            base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
 
        )
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parametrize('name,apply_to_children', [
 
    @base.parametrize('name,apply_to_children', [
 
        ('none', 'none'),
 
        ('all', 'all'),
 
        ('repos', 'repos'),
 
        ('groups', 'groups'),
 
    ])
 
    def test_api_revoke_user_permission_from_repo_group(self, name, apply_to_children):
 
        RepoGroupModel().grant_user_permission(repo_group=TEST_REPO_GROUP,
 
                                               user=TEST_USER_ADMIN_LOGIN,
 
                                               user=base.TEST_USER_ADMIN_LOGIN,
 
                                               perm='group.read',)
 
        Session().commit()
 

	
 
        id_, params = _build_data(self.apikey,
 
                                  'revoke_user_permission_from_repo_group',
 
                                  repogroupid=TEST_REPO_GROUP,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  userid=base.TEST_USER_ADMIN_LOGIN,
 
                                  apply_to_children=apply_to_children,)
 
        response = api_call(self, params)
 

	
 
        expected = {
 
            'msg': 'Revoked perm (recursive:%s) for user: `%s` in repo group: `%s`' % (
 
                apply_to_children, TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
 
                apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
 
            ),
 
            'success': True
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @parametrize('name,apply_to_children,grant_admin,access_ok', [
 
    @base.parametrize('name,apply_to_children,grant_admin,access_ok', [
 
        ('none', 'none', False, False),
 
        ('all', 'all', False, False),
 
        ('repos', 'repos', False, False),
 
        ('groups', 'groups', False, False),
 

	
 
        # after granting admin rights
 
@@ -1996,32 +1996,32 @@ class _BaseTestApi(object):
 
        ('repos', 'repos', False, False),
 
        ('groups', 'groups', False, False),
 
    ])
 
    def test_api_revoke_user_permission_from_repo_group_by_regular_user(
 
            self, name, apply_to_children, grant_admin, access_ok):
 
        RepoGroupModel().grant_user_permission(repo_group=TEST_REPO_GROUP,
 
                                               user=TEST_USER_ADMIN_LOGIN,
 
                                               user=base.TEST_USER_ADMIN_LOGIN,
 
                                               perm='group.read',)
 
        Session().commit()
 

	
 
        if grant_admin:
 
            RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
 
                                                   self.TEST_USER_LOGIN,
 
                                                   'group.admin')
 
            Session().commit()
 

	
 
        id_, params = _build_data(self.apikey_regular,
 
                                  'revoke_user_permission_from_repo_group',
 
                                  repogroupid=TEST_REPO_GROUP,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  userid=base.TEST_USER_ADMIN_LOGIN,
 
                                  apply_to_children=apply_to_children,)
 
        response = api_call(self, params)
 
        if access_ok:
 
            expected = {
 
                'msg': 'Revoked perm (recursive:%s) for user: `%s` in repo group: `%s`' % (
 
                    apply_to_children, TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
 
                    apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
 
                ),
 
                'success': True
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        else:
 
            expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP
 
@@ -2029,21 +2029,21 @@ class _BaseTestApi(object):
 

	
 
    @mock.patch.object(RepoGroupModel, 'revoke_user_permission', crash)
 
    def test_api_revoke_user_permission_from_repo_group_exception_when_adding(self):
 
        id_, params = _build_data(self.apikey,
 
                                  'revoke_user_permission_from_repo_group',
 
                                  repogroupid=TEST_REPO_GROUP,
 
                                  userid=TEST_USER_ADMIN_LOGIN, )
 
                                  userid=base.TEST_USER_ADMIN_LOGIN, )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to edit permission for user: `%s` in repo group: `%s`' % (
 
            TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
 
            base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
 
        )
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parametrize('name,perm,apply_to_children', [
 
    @base.parametrize('name,perm,apply_to_children', [
 
        ('none', 'group.none', 'none'),
 
        ('read', 'group.read', 'none'),
 
        ('write', 'group.write', 'none'),
 
        ('admin', 'group.admin', 'none'),
 

	
 
        ('none', 'group.none', 'all'),
 
@@ -2076,13 +2076,13 @@ class _BaseTestApi(object):
 
            ),
 
            'success': True
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @parametrize('name,perm,apply_to_children,grant_admin,access_ok', [
 
    @base.parametrize('name,perm,apply_to_children,grant_admin,access_ok', [
 
        ('none_fails', 'group.none', 'none', False, False),
 
        ('read_fails', 'group.read', 'none', False, False),
 
        ('write_fails', 'group.write', 'none', False, False),
 
        ('admin_fails', 'group.admin', 'none', False, False),
 

	
 
        # with granted perms
 
@@ -2143,13 +2143,13 @@ class _BaseTestApi(object):
 

	
 
        expected = 'failed to edit permission for user group: `%s` in repo group: `%s`' % (
 
            TEST_USER_GROUP, TEST_REPO_GROUP
 
        )
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parametrize('name,apply_to_children', [
 
    @base.parametrize('name,apply_to_children', [
 
        ('none', 'none'),
 
        ('all', 'all'),
 
        ('repos', 'repos'),
 
        ('groups', 'groups'),
 
    ])
 
    def test_api_revoke_user_group_permission_from_repo_group(self, name, apply_to_children):
 
@@ -2169,13 +2169,13 @@ class _BaseTestApi(object):
 
                apply_to_children, TEST_USER_GROUP, TEST_REPO_GROUP
 
            ),
 
            'success': True
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @parametrize('name,apply_to_children,grant_admin,access_ok', [
 
    @base.parametrize('name,apply_to_children,grant_admin,access_ok', [
 
        ('none', 'none', False, False),
 
        ('all', 'all', False, False),
 
        ('repos', 'repos', False, False),
 
        ('groups', 'groups', False, False),
 

	
 
        # after granting admin rights
 
@@ -2184,13 +2184,13 @@ class _BaseTestApi(object):
 
        ('repos', 'repos', False, False),
 
        ('groups', 'groups', False, False),
 
    ])
 
    def test_api_revoke_user_group_permission_from_repo_group_by_regular_user(
 
            self, name, apply_to_children, grant_admin, access_ok):
 
        RepoGroupModel().grant_user_permission(repo_group=TEST_REPO_GROUP,
 
                                               user=TEST_USER_ADMIN_LOGIN,
 
                                               user=base.TEST_USER_ADMIN_LOGIN,
 
                                               perm='group.read',)
 
        Session().commit()
 

	
 
        if grant_admin:
 
            RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
 
                                                   self.TEST_USER_LOGIN,
 
@@ -2203,13 +2203,13 @@ class _BaseTestApi(object):
 
                                  usergroupid=TEST_USER_GROUP,
 
                                  apply_to_children=apply_to_children,)
 
        response = api_call(self, params)
 
        if access_ok:
 
            expected = {
 
                'msg': 'Revoked perm (recursive:%s) for user group: `%s` in repo group: `%s`' % (
 
                    apply_to_children, TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
 
                    apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
 
                ),
 
                'success': True
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        else:
 
            expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP
 
@@ -2307,13 +2307,13 @@ class _BaseTestApi(object):
 
        expected = response.json
 
        assert len(response.json['result']) == 3
 
        #self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_gists_regular_user_with_different_userid(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_gists',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
                                  userid=base.TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 
        expected = 'userid is not the same as your user'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_gist(self):
 
        id_, params = _build_data(self.apikey_regular, 'create_gist',
 
@@ -2498,16 +2498,16 @@ class _BaseTestApi(object):
 
            "description": "No description",
 
            "url": "/%s/pull-request/%s/_/%s" % (self.REPO, pull_request_id, "stable"),
 
            "reviewers": [{"username": "test_regular"}],
 
            "org_repo_url": "http://localhost:80/%s" % self.REPO,
 
            "org_ref_parts": ["branch", "stable", self.TEST_PR_SRC],
 
            "other_ref_parts": ["branch", "default", self.TEST_PR_DST],
 
            "comments": [{"username": TEST_USER_ADMIN_LOGIN, "text": "",
 
            "comments": [{"username": base.TEST_USER_ADMIN_LOGIN, "text": "",
 
                         "comment_id": pullrequest.comments[0].comment_id}],
 
            "owner": TEST_USER_ADMIN_LOGIN,
 
            "statuses": [{"status": "under_review", "reviewer": TEST_USER_ADMIN_LOGIN, "modified_at": "2000-01-01T00:00:00"} for i in range(0, len(self.TEST_PR_REVISIONS))],
 
            "owner": base.TEST_USER_ADMIN_LOGIN,
 
            "statuses": [{"status": "under_review", "reviewer": base.TEST_USER_ADMIN_LOGIN, "modified_at": "2000-01-01T00:00:00"} for i in range(0, len(self.TEST_PR_REVISIONS))],
 
            "title": "get test",
 
            "revisions": self.TEST_PR_REVISIONS,
 
        }
 
        self._compare_ok(random_id, expected,
 
                         given=re.sub(br"\d\d\d\d\-\d\d\-\d\dT\d\d\:\d\d\:\d\d",
 
                                      b"2000-01-01T00:00:00", response.body))
 
@@ -2531,23 +2531,23 @@ class _BaseTestApi(object):
 
    def test_api_status_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, u"status test")
 

	
 
        random_id = random.randrange(1, 9999)
 
        params = json.dumps({
 
            "id": random_id,
 
            "api_key": User.get_by_username(TEST_USER_REGULAR2_LOGIN).api_key,
 
            "api_key": User.get_by_username(base.TEST_USER_REGULAR2_LOGIN).api_key,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "status": ChangesetStatus.STATUS_APPROVED},
 
        })
 
        response = api_call(self, params)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        self._compare_error(random_id, "No permission to change pull request status. User needs to be admin, owner or reviewer.", given=response.body)
 
        assert ChangesetStatus.STATUS_UNDER_REVIEW == ChangesetStatusModel().calculate_pull_request_result(pullrequest)[2]
 
        params = json.dumps({
 
            "id": random_id,
 
            "api_key": User.get_by_username(TEST_USER_REGULAR_LOGIN).api_key,
 
            "api_key": User.get_by_username(base.TEST_USER_REGULAR_LOGIN).api_key,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "status": ChangesetStatus.STATUS_APPROVED},
 
        })
 
        response = api_call(self, params)
 
        self._compare_ok(random_id, True, given=response.body)
 
        pullrequest = PullRequest().get(pull_request_id)
kallithea/tests/functional/test_admin.py
Show inline comments
 
@@ -3,19 +3,19 @@ import datetime
 
import os
 
from os.path import dirname
 

	
 
from kallithea.lib.utils2 import safe_unicode
 
from kallithea.model.db import UserLog
 
from kallithea.model.meta import Session
 
from kallithea.tests.base import *
 
from kallithea.tests import base
 

	
 

	
 
FIXTURES = os.path.join(dirname(dirname(os.path.abspath(__file__))), 'fixtures')
 

	
 

	
 
class TestAdminController(TestController):
 
class TestAdminController(base.TestController):
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        UserLog.query().delete()
 
        Session().commit()
 

	
 
@@ -49,111 +49,111 @@ class TestAdminController(TestController
 
    def teardown_class(cls):
 
        UserLog.query().delete()
 
        Session().commit()
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index'))
 
        response = self.app.get(base.url(controller='admin/admin', action='index'))
 
        response.mustcontain('Admin Journal')
 

	
 
    def test_filter_all_entries(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',))
 
        response = self.app.get(base.url(controller='admin/admin', action='index',))
 
        response.mustcontain(' 2036 Entries')
 

	
 
    def test_filter_journal_filter_exact_match_on_repository(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='repository:xxx'))
 
        response.mustcontain(' 3 Entries')
 

	
 
    def test_filter_journal_filter_exact_match_on_repository_CamelCase(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='repository:XxX'))
 
        response.mustcontain(' 3 Entries')
 

	
 
    def test_filter_journal_filter_wildcard_on_repository(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='repository:*test*'))
 
        response.mustcontain(' 862 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_repository(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='repository:test*'))
 
        response.mustcontain(' 257 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_repository_CamelCase(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='repository:Test*'))
 
        response.mustcontain(' 257 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_repository_and_user(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='repository:test* AND username:demo'))
 
        response.mustcontain(' 130 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_repository_or_other_repo(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='repository:test* OR repository:xxx'))
 
        response.mustcontain(' 260 Entries')  # 257 + 3
 

	
 
    def test_filter_journal_filter_exact_match_on_username(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='username:demo'))
 
        response.mustcontain(' 1087 Entries')
 

	
 
    def test_filter_journal_filter_exact_match_on_username_camelCase(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='username:DemO'))
 
        response.mustcontain(' 1087 Entries')
 

	
 
    def test_filter_journal_filter_wildcard_on_username(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='username:*test*'))
 
        response.mustcontain(' 100 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_username(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='username:demo*'))
 
        response.mustcontain(' 1101 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_user_or_other_user(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='username:demo OR username:volcan'))
 
        response.mustcontain(' 1095 Entries')  # 1087 + 8
 

	
 
    def test_filter_journal_filter_wildcard_on_action(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='action:*pull_request*'))
 
        response.mustcontain(' 187 Entries')
 

	
 
    def test_filter_journal_filter_on_date(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='date:20121010'))
 
        response.mustcontain(' 47 Entries')
 

	
 
    def test_filter_journal_filter_on_date_2(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='date:20121020'))
 
        response.mustcontain(' 17 Entries')
 

	
 
    @parametrize('filter,hit', [
 
    @base.parametrize('filter,hit', [
 
        #### "repository:" filtering
 
        # "/" is used for grouping
 
        ('repository:group/test', 4),
 
        # "-" is often used for "-fork"
 
        ('repository:fork-test1', 5),
 
        # using "stop words"
 
@@ -186,12 +186,12 @@ class TestAdminController(TestController
 
        ('username:THIS-*', 2),
 
        ('username:*-IT', 2),
 
    ])
 
    def test_filter_journal_filter_tokenization(self, filter, hit):
 
        self.log_user()
 

	
 
        response = self.app.get(url(controller='admin/admin', action='index',
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter=filter))
 
        if hit != 1:
 
            response.mustcontain(' %s Entries' % hit)
 
        else:
 
            response.mustcontain(' 1 Entry')
kallithea/tests/functional/test_admin_auth_settings.py
Show inline comments
 
from kallithea.model.db import Setting
 
from kallithea.tests.base import *
 
from kallithea.tests import base
 

	
 

	
 
class TestAuthSettingsController(TestController):
 
class TestAuthSettingsController(base.TestController):
 
    def _enable_plugins(self, plugins_list):
 
        test_url = url(controller='admin/auth_settings',
 
        test_url = base.url(controller='admin/auth_settings',
 
                       action='auth_settings')
 
        params={'auth_plugins': plugins_list, '_session_csrf_secret_token': self.session_csrf_secret_token()}
 

	
 
        for plugin in plugins_list.split(','):
 
            enable = plugin.partition('kallithea.lib.auth_modules.')[-1]
 
            params.update({'%s_enabled' % enable: True})
 
        response = self.app.post(url=test_url, params=params)
 
        return params
 
        #self.checkSessionFlash(response, 'Auth settings updated successfully')
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/auth_settings',
 
        response = self.app.get(base.url(controller='admin/auth_settings',
 
                                    action='index'))
 
        response.mustcontain('Authentication Plugins')
 

	
 
    @skipif(not ldap_lib_installed, reason='skipping due to missing ldap lib')
 
    @base.skipif(not base.ldap_lib_installed, reason='skipping due to missing ldap lib')
 
    def test_ldap_save_settings(self):
 
        self.log_user()
 

	
 
        params = self._enable_plugins('kallithea.lib.auth_modules.auth_internal,kallithea.lib.auth_modules.auth_ldap')
 
        params.update({'auth_ldap_host': u'dc.example.com',
 
                       'auth_ldap_port': '999',
 
@@ -38,22 +38,22 @@ class TestAuthSettingsController(TestCon
 
                       'auth_ldap_search_scope': 'BASE',
 
                       'auth_ldap_attr_login': 'test_attr_login',
 
                       'auth_ldap_attr_firstname': 'ima',
 
                       'auth_ldap_attr_lastname': 'tester',
 
                       'auth_ldap_attr_email': 'test@example.com'})
 

	
 
        test_url = url(controller='admin/auth_settings',
 
        test_url = base.url(controller='admin/auth_settings',
 
                       action='auth_settings')
 

	
 
        response = self.app.post(url=test_url, params=params)
 
        self.checkSessionFlash(response, 'Auth settings updated successfully')
 

	
 
        new_settings = Setting.get_auth_settings()
 
        assert new_settings['auth_ldap_host'] == u'dc.example.com', 'fail db write compare'
 

	
 
    @skipif(not ldap_lib_installed, reason='skipping due to missing ldap lib')
 
    @base.skipif(not base.ldap_lib_installed, reason='skipping due to missing ldap lib')
 
    def test_ldap_error_form_wrong_port_number(self):
 
        self.log_user()
 

	
 
        params = self._enable_plugins('kallithea.lib.auth_modules.auth_internal,kallithea.lib.auth_modules.auth_ldap')
 
        params.update({'auth_ldap_host': '',
 
                       'auth_ldap_port': 'i-should-be-number',  # bad port num
 
@@ -65,21 +65,21 @@ class TestAuthSettingsController(TestCon
 
                       'auth_ldap_filter': '',
 
                       'auth_ldap_search_scope': 'BASE',
 
                       'auth_ldap_attr_login': '',
 
                       'auth_ldap_attr_firstname': '',
 
                       'auth_ldap_attr_lastname': '',
 
                       'auth_ldap_attr_email': ''})
 
        test_url = url(controller='admin/auth_settings',
 
        test_url = base.url(controller='admin/auth_settings',
 
                       action='auth_settings')
 

	
 
        response = self.app.post(url=test_url, params=params)
 

	
 
        response.mustcontain("""<span class="error-message">"""
 
                             """Please enter a number</span>""")
 

	
 
    @skipif(not ldap_lib_installed, reason='skipping due to missing ldap lib')
 
    @base.skipif(not base.ldap_lib_installed, reason='skipping due to missing ldap lib')
 
    def test_ldap_error_form(self):
 
        self.log_user()
 

	
 
        params = self._enable_plugins('kallithea.lib.auth_modules.auth_internal,kallithea.lib.auth_modules.auth_ldap')
 
        params.update({'auth_ldap_host': 'Host',
 
                       'auth_ldap_port': '123',
 
@@ -92,13 +92,13 @@ class TestAuthSettingsController(TestCon
 
                       'auth_ldap_search_scope': 'BASE',
 
                       'auth_ldap_attr_login': '',  # <----- missing required input
 
                       'auth_ldap_attr_firstname': '',
 
                       'auth_ldap_attr_lastname': '',
 
                       'auth_ldap_attr_email': ''})
 

	
 
        test_url = url(controller='admin/auth_settings',
 
        test_url = base.url(controller='admin/auth_settings',
 
                       action='auth_settings')
 

	
 
        response = self.app.post(url=test_url, params=params)
 

	
 
        response.mustcontain("""<span class="error-message">The LDAP Login"""
 
                             """ attribute of the CN must be specified""")
 
@@ -112,22 +112,22 @@ class TestAuthSettingsController(TestCon
 
    def _container_auth_setup(self, **settings):
 
        self.log_user()
 

	
 
        params = self._enable_plugins('kallithea.lib.auth_modules.auth_internal,kallithea.lib.auth_modules.auth_container')
 
        params.update(settings)
 

	
 
        test_url = url(controller='admin/auth_settings',
 
        test_url = base.url(controller='admin/auth_settings',
 
                       action='auth_settings')
 

	
 
        response = self.app.post(url=test_url, params=params)
 
        response = response.follow()
 
        response.click('Log Out') # end admin login session
 

	
 
    def _container_auth_verify_login(self, resulting_username, **get_kwargs):
 
        response = self.app.get(
 
            url=url(controller='admin/my_account', action='my_account'),
 
            url=base.url(controller='admin/my_account', action='my_account'),
 
            **get_kwargs
 
        )
 
        response.mustcontain('My Account %s' % resulting_username)
 

	
 
    def test_container_auth_login_header(self):
 
        self._container_auth_setup(
 
@@ -150,13 +150,13 @@ class TestAuthSettingsController(TestCon
 
            auth_container_firstname_header='THE_USER_FIRSTNAME',
 
            auth_container_lastname_header='THE_USER_LASTNAME',
 
            auth_container_fallback_header='',
 
            auth_container_clean_username='False',
 
        )
 
        response = self.app.get(
 
            url=url(controller='admin/my_account', action='my_account'),
 
            url=base.url(controller='admin/my_account', action='my_account'),
 
            extra_environ={'THE_USER_NAME': 'johnd',
 
                           'THE_USER_EMAIL': 'john@example.org',
 
                           'THE_USER_FIRSTNAME': 'John',
 
                           'THE_USER_LASTNAME': 'Doe',
 
                           }
 
        )
 
@@ -213,13 +213,13 @@ class TestAuthSettingsController(TestCon
 
            auth_container_firstname_header='',
 
            auth_container_lastname_header='',
 
            auth_container_fallback_header='',
 
            auth_container_clean_username='True',
 
        )
 
        response = self.app.get(
 
            url=url(controller='admin/my_account', action='my_account'),
 
            url=base.url(controller='admin/my_account', action='my_account'),
 
            extra_environ={'REMOTE_USER': 'john'},
 
        )
 
        assert b'Log Out' not in response.normal_body
 

	
 
    def test_crowd_save_settings(self):
 
        self.log_user()
 
@@ -229,30 +229,30 @@ class TestAuthSettingsController(TestCon
 
                       'auth_crowd_app_password': 'secret',
 
                       'auth_crowd_admin_groups': 'mygroup',
 
                       'auth_crowd_port': '123',
 
                       'auth_crowd_method': 'https',
 
                       'auth_crowd_app_name': 'xyzzy'})
 

	
 
        test_url = url(controller='admin/auth_settings',
 
        test_url = base.url(controller='admin/auth_settings',
 
                       action='auth_settings')
 

	
 
        response = self.app.post(url=test_url, params=params)
 
        self.checkSessionFlash(response, 'Auth settings updated successfully')
 

	
 
        new_settings = Setting.get_auth_settings()
 
        assert new_settings['auth_crowd_host'] == u'hostname', 'fail db write compare'
 

	
 
    @skipif(not pam_lib_installed, reason='skipping due to missing pam lib')
 
    @base.skipif(not base.pam_lib_installed, reason='skipping due to missing pam lib')
 
    def test_pam_save_settings(self):
 
        self.log_user()
 

	
 
        params = self._enable_plugins('kallithea.lib.auth_modules.auth_internal,kallithea.lib.auth_modules.auth_pam')
 
        params.update({'auth_pam_service': 'kallithea',
 
                       'auth_pam_gecos': '^foo-.*'})
 

	
 
        test_url = url(controller='admin/auth_settings',
 
        test_url = base.url(controller='admin/auth_settings',
 
                       action='auth_settings')
 

	
 
        response = self.app.post(url=test_url, params=params)
 
        self.checkSessionFlash(response, 'Auth settings updated successfully')
 

	
 
        new_settings = Setting.get_auth_settings()
kallithea/tests/functional/test_admin_defaults.py
Show inline comments
 
from kallithea.model.db import Setting
 
from kallithea.tests.base import *
 
from kallithea.tests import base
 

	
 

	
 
class TestDefaultsController(TestController):
 
class TestDefaultsController(base.TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('defaults'))
 
        response = self.app.get(base.url('defaults'))
 
        response.mustcontain('default_repo_private')
 
        response.mustcontain('default_repo_enable_statistics')
 
        response.mustcontain('default_repo_enable_downloads')
 

	
 
    def test_update_params_true_hg(self):
 
        self.log_user()
 
@@ -17,13 +17,13 @@ class TestDefaultsController(TestControl
 
            'default_repo_enable_downloads': True,
 
            'default_repo_enable_statistics': True,
 
            'default_repo_private': True,
 
            'default_repo_type': 'hg',
 
            '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
        }
 
        response = self.app.post(url('defaults_update', id='default'), params=params)
 
        response = self.app.post(base.url('defaults_update', id='default'), params=params)
 
        self.checkSessionFlash(response, 'Default settings updated successfully')
 

	
 
        params.pop('_session_csrf_secret_token')
 
        defs = Setting.get_default_repo_settings()
 
        assert params == defs
 

	
 
@@ -33,12 +33,12 @@ class TestDefaultsController(TestControl
 
            'default_repo_enable_downloads': False,
 
            'default_repo_enable_statistics': False,
 
            'default_repo_private': False,
 
            'default_repo_type': 'git',
 
            '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
        }
 
        response = self.app.post(url('defaults_update', id='default'), params=params)
 
        response = self.app.post(base.url('defaults_update', id='default'), params=params)
 
        self.checkSessionFlash(response, 'Default settings updated successfully')
 

	
 
        params.pop('_session_csrf_secret_token')
 
        defs = Setting.get_default_repo_settings()
 
        assert params == defs
kallithea/tests/functional/test_admin_gists.py
Show inline comments
 
from kallithea.model.db import Gist, User
 
from kallithea.model.gist import GistModel
 
from kallithea.model.meta import Session
 
from kallithea.tests.base import *
 
from kallithea.tests import base
 

	
 

	
 
def _create_gist(f_name, content='some gist', lifetime=-1,
 
                 description=u'gist-desc', gist_type='public',
 
                 owner=TEST_USER_ADMIN_LOGIN):
 
                 owner=base.TEST_USER_ADMIN_LOGIN):
 
    gist_mapping = {
 
        f_name: {'content': content}
 
    }
 
    owner = User.get_by_username(owner)
 
    gist = GistModel().create(description, owner=owner, ip_addr=IP_ADDR,
 
    gist = GistModel().create(description, owner=owner, ip_addr=base.IP_ADDR,
 
                       gist_mapping=gist_mapping, gist_type=gist_type,
 
                       lifetime=lifetime)
 
    Session().commit()
 
    return gist
 

	
 

	
 
class TestGistsController(TestController):
 
class TestGistsController(base.TestController):
 

	
 
    def teardown_method(self, method):
 
        for g in Gist.query():
 
            GistModel().delete(g)
 
        Session().commit()
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('gists'))
 
        response = self.app.get(base.url('gists'))
 
        # Test response...
 
        response.mustcontain('There are no gists yet')
 

	
 
        g1 = _create_gist('gist1').gist_access_id
 
        g2 = _create_gist('gist2', lifetime=1400).gist_access_id
 
        g3 = _create_gist('gist3', description=u'gist3-desc').gist_access_id
 
        g4 = _create_gist('gist4', gist_type='private').gist_access_id
 
        response = self.app.get(url('gists'))
 
        response = self.app.get(base.url('gists'))
 
        # Test response...
 
        response.mustcontain('gist: %s' % g1)
 
        response.mustcontain('gist: %s' % g2)
 
        response.mustcontain('Expires: in 23 hours')  # we don't care about the end
 
        response.mustcontain('gist: %s' % g3)
 
        response.mustcontain('gist3-desc')
 
        response.mustcontain(no=['gist: %s' % g4])
 

	
 
    def test_index_private_gists(self):
 
        self.log_user()
 
        gist = _create_gist('gist5', gist_type='private')
 
        response = self.app.get(url('gists', private=1))
 
        response = self.app.get(base.url('gists', private=1))
 
        # Test response...
 

	
 
        # and privates
 
        response.mustcontain('gist: %s' % gist.gist_access_id)
 

	
 
    def test_create_missing_description(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
        response = self.app.post(base.url('gists'),
 
                                 params={'lifetime': -1, '_session_csrf_secret_token': self.session_csrf_secret_token()},
 
                                 status=200)
 

	
 
        response.mustcontain('Missing value')
 

	
 
    def test_create(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
        response = self.app.post(base.url('gists'),
 
                                 params={'lifetime': -1,
 
                                         'content': 'gist test',
 
                                         'filename': 'foo',
 
                                         'public': 'public',
 
                                         '_session_csrf_secret_token': self.session_csrf_secret_token()},
 
                                 status=302)
 
@@ -74,13 +74,13 @@ class TestGistsController(TestController
 
        response.mustcontain('added file: foo')
 
        response.mustcontain('gist test')
 
        response.mustcontain('<div class="label label-success">Public Gist</div>')
 

	
 
    def test_create_with_path_with_dirs(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
        response = self.app.post(base.url('gists'),
 
                                 params={'lifetime': -1,
 
                                         'content': 'gist test',
 
                                         'filename': '/home/foo',
 
                                         'public': 'public',
 
                                         '_session_csrf_secret_token': self.session_csrf_secret_token()},
 
                                 status=200)
 
@@ -89,17 +89,17 @@ class TestGistsController(TestController
 
    def test_access_expired_gist(self):
 
        self.log_user()
 
        gist = _create_gist('never-see-me')
 
        gist.gist_expires = 0  # 1970
 
        Session().commit()
 

	
 
        response = self.app.get(url('gist', gist_id=gist.gist_access_id), status=404)
 
        response = self.app.get(base.url('gist', gist_id=gist.gist_access_id), status=404)
 

	
 
    def test_create_private(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
        response = self.app.post(base.url('gists'),
 
                                 params={'lifetime': -1,
 
                                         'content': 'private gist test',
 
                                         'filename': 'private-foo',
 
                                         'private': 'private',
 
                                         '_session_csrf_secret_token': self.session_csrf_secret_token()},
 
                                 status=302)
 
@@ -107,13 +107,13 @@ class TestGistsController(TestController
 
        response.mustcontain('added file: private-foo<')
 
        response.mustcontain('private gist test')
 
        response.mustcontain('<div class="label label-warning">Private Gist</div>')
 

	
 
    def test_create_with_description(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
        response = self.app.post(base.url('gists'),
 
                                 params={'lifetime': -1,
 
                                         'content': 'gist test',
 
                                         'filename': 'foo-desc',
 
                                         'description': 'gist-desc',
 
                                         'public': 'public',
 
                                         '_session_csrf_secret_token': self.session_csrf_secret_token()},
 
@@ -123,49 +123,49 @@ class TestGistsController(TestController
 
        response.mustcontain('gist test')
 
        response.mustcontain('gist-desc')
 
        response.mustcontain('<div class="label label-success">Public Gist</div>')
 

	
 
    def test_new(self):
 
        self.log_user()
 
        response = self.app.get(url('new_gist'))
 
        response = self.app.get(base.url('new_gist'))
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        gist = _create_gist('delete-me')
 
        response = self.app.post(url('gist_delete', gist_id=gist.gist_id),
 
        response = self.app.post(base.url('gist_delete', gist_id=gist.gist_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
    def test_delete_normal_user_his_gist(self):
 
        self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
 
        gist = _create_gist('delete-me', owner=TEST_USER_REGULAR_LOGIN)
 
        response = self.app.post(url('gist_delete', gist_id=gist.gist_id),
 
        self.log_user(base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR_PASS)
 
        gist = _create_gist('delete-me', owner=base.TEST_USER_REGULAR_LOGIN)
 
        response = self.app.post(base.url('gist_delete', gist_id=gist.gist_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
    def test_delete_normal_user_not_his_own_gist(self):
 
        self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
 
        self.log_user(base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR_PASS)
 
        gist = _create_gist('delete-me')
 
        response = self.app.post(url('gist_delete', gist_id=gist.gist_id), status=403,
 
        response = self.app.post(base.url('gist_delete', gist_id=gist.gist_id), status=403,
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
    def test_show(self):
 
        gist = _create_gist('gist-show-me')
 
        response = self.app.get(url('gist', gist_id=gist.gist_access_id))
 
        response = self.app.get(base.url('gist', gist_id=gist.gist_access_id))
 
        response.mustcontain('added file: gist-show-me<')
 
        response.mustcontain('%s - created' % TEST_USER_ADMIN_LOGIN)
 
        response.mustcontain('%s - created' % base.TEST_USER_ADMIN_LOGIN)
 
        response.mustcontain('gist-desc')
 
        response.mustcontain('<div class="label label-success">Public Gist</div>')
 

	
 
    def test_show_as_raw(self):
 
        gist = _create_gist('gist-show-me', content='GIST CONTENT')
 
        response = self.app.get(url('formatted_gist',
 
        response = self.app.get(base.url('formatted_gist',
 
                                    gist_id=gist.gist_access_id, format='raw'))
 
        assert response.body == b'GIST CONTENT'
 

	
 
    def test_show_as_raw_individual_file(self):
 
        gist = _create_gist('gist-show-me-raw', content='GIST BODY')
 
        response = self.app.get(url('formatted_gist_file',
 
        response = self.app.get(base.url('formatted_gist_file',
 
                                    gist_id=gist.gist_access_id, format='raw',
 
                                    revision='tip', f_path='gist-show-me-raw'))
 
        assert response.body == b'GIST BODY'
 

	
 
    def test_edit(self):
 
        response = self.app.get(url('edit_gist', gist_id=1))
 
        response = self.app.get(base.url('edit_gist', gist_id=1))
kallithea/tests/functional/test_admin_permissions.py
Show inline comments
 
from kallithea.model.db import User, UserIpMap
 
from kallithea.tests.base import *
 
from kallithea.tests import base
 

	
 

	
 
class TestAdminPermissionsController(TestController):
 
class TestAdminPermissionsController(base.TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_permissions'))
 
        response = self.app.get(base.url('admin_permissions'))
 
        # Test response...
 

	
 
    def test_index_ips(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_permissions_ips'))
 
        response = self.app.get(base.url('admin_permissions_ips'))
 
        # Test response...
 
        response.mustcontain('All IP addresses are allowed')
 

	
 
    def test_add_delete_ips(self, auto_clear_ip_permissions):
 
        self.log_user()
 
        default_user_id = User.get_default_user().user_id
 

	
 
        # Add IP and verify it is shown in UI and both gives access and rejects
 

	
 
        response = self.app.post(url('edit_user_ips_update', id=default_user_id),
 
        response = self.app.post(base.url('edit_user_ips_update', id=default_user_id),
 
                                 params=dict(new_ip='0.0.0.0/24',
 
                                 _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        invalidate_all_caches()
 
        response = self.app.get(url('admin_permissions_ips'),
 
        base.invalidate_all_caches()
 
        response = self.app.get(base.url('admin_permissions_ips'),
 
                                extra_environ={'REMOTE_ADDR': '0.0.0.1'})
 
        response.mustcontain('0.0.0.0/24')
 
        response.mustcontain('0.0.0.0 - 0.0.0.255')
 

	
 
        response = self.app.get(url('admin_permissions_ips'),
 
        response = self.app.get(base.url('admin_permissions_ips'),
 
                                extra_environ={'REMOTE_ADDR': '0.0.1.1'}, status=403)
 

	
 
        # Add another IP and verify previously rejected now works
 

	
 
        response = self.app.post(url('edit_user_ips_update', id=default_user_id),
 
        response = self.app.post(base.url('edit_user_ips_update', id=default_user_id),
 
                                 params=dict(new_ip='0.0.1.0/24',
 
                                 _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        invalidate_all_caches()
 
        base.invalidate_all_caches()
 

	
 
        response = self.app.get(url('admin_permissions_ips'),
 
        response = self.app.get(base.url('admin_permissions_ips'),
 
                                extra_environ={'REMOTE_ADDR': '0.0.1.1'})
 

	
 
        # Delete latest IP and verify same IP is rejected again
 

	
 
        x = UserIpMap.query().filter_by(ip_addr='0.0.1.0/24').first()
 
        response = self.app.post(url('edit_user_ips_delete', id=default_user_id),
 
        response = self.app.post(base.url('edit_user_ips_delete', id=default_user_id),
 
                                 params=dict(del_ip_id=x.ip_id,
 
                                             _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        invalidate_all_caches()
 
        base.invalidate_all_caches()
 

	
 
        response = self.app.get(url('admin_permissions_ips'),
 
        response = self.app.get(base.url('admin_permissions_ips'),
 
                                extra_environ={'REMOTE_ADDR': '0.0.1.1'}, status=403)
 

	
 
        # Delete first IP and verify unlimited access again
 

	
 
        x = UserIpMap.query().filter_by(ip_addr='0.0.0.0/24').first()
 
        response = self.app.post(url('edit_user_ips_delete', id=default_user_id),
 
        response = self.app.post(base.url('edit_user_ips_delete', id=default_user_id),
 
                                 params=dict(del_ip_id=x.ip_id,
 
                                             _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        invalidate_all_caches()
 
        base.invalidate_all_caches()
 

	
 
        response = self.app.get(url('admin_permissions_ips'),
 
        response = self.app.get(base.url('admin_permissions_ips'),
 
                                extra_environ={'REMOTE_ADDR': '0.0.1.1'})
 

	
 
    def test_index_overview(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_permissions_perms'))
 
        response = self.app.get(base.url('admin_permissions_perms'))
 
        # Test response...
 

	
 
    def test_edit_permissions_permissions(self):
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user = User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
 

	
 
        # Test unauthenticated access - it will redirect to login page
 
        response = self.app.post(
 
            url('edit_repo_perms_update', repo_name=HG_REPO),
 
            base.url('edit_repo_perms_update', repo_name=base.HG_REPO),
 
            params=dict(
 
                perm_new_member_1='repository.read',
 
                perm_new_member_name_1=user.username,
 
                perm_new_member_type_1='user',
 
                _session_csrf_secret_token=self.session_csrf_secret_token()),
 
            status=302)
 

	
 
        assert not response.location.endswith(url('edit_repo_perms_update', repo_name=HG_REPO))
 
        assert response.location.endswith(url('login_home', came_from=url('edit_repo_perms_update', repo_name=HG_REPO)))
 
        assert not response.location.endswith(base.url('edit_repo_perms_update', repo_name=base.HG_REPO))
 
        assert response.location.endswith(base.url('login_home', came_from=base.url('edit_repo_perms_update', repo_name=base.HG_REPO)))
 

	
 
        response = self.app.post(
 
            url('edit_repo_perms_revoke', repo_name=HG_REPO),
 
            base.url('edit_repo_perms_revoke', repo_name=base.HG_REPO),
 
            params=dict(
 
                obj_type='user',
 
                user_id=user.user_id,
 
                _session_csrf_secret_token=self.session_csrf_secret_token()),
 
            status=302)
 

	
 
        assert response.location.endswith(url('login_home', came_from=url('edit_repo_perms_revoke', repo_name=HG_REPO)))
 
        assert response.location.endswith(base.url('login_home', came_from=base.url('edit_repo_perms_revoke', repo_name=base.HG_REPO)))
 

	
 
        # Test authenticated access
 
        self.log_user()
 

	
 
        response = self.app.post(
 
            url('edit_repo_perms_update', repo_name=HG_REPO),
 
            base.url('edit_repo_perms_update', repo_name=base.HG_REPO),
 
            params=dict(
 
                perm_new_member_1='repository.read',
 
                perm_new_member_name_1=user.username,
 
                perm_new_member_type_1='user',
 
                _session_csrf_secret_token=self.session_csrf_secret_token()),
 
            status=302)
 

	
 
        assert response.location.endswith(url('edit_repo_perms_update', repo_name=HG_REPO))
 
        assert response.location.endswith(base.url('edit_repo_perms_update', repo_name=base.HG_REPO))
 

	
 
        response = self.app.post(
 
            url('edit_repo_perms_revoke', repo_name=HG_REPO),
 
            base.url('edit_repo_perms_revoke', repo_name=base.HG_REPO),
 
            params=dict(
 
                obj_type='user',
 
                user_id=user.user_id,
 
                _session_csrf_secret_token=self.session_csrf_secret_token()),
 
            status=200)
 
        assert not response.body
kallithea/tests/functional/test_admin_repos.py
Show inline comments
 
@@ -10,13 +10,13 @@ from kallithea.lib import vcs
 
from kallithea.lib.utils2 import safe_str, safe_unicode
 
from kallithea.model.db import Permission, RepoGroup, Repository, Ui, User, UserRepoToPerm
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.user import UserModel
 
from kallithea.tests.base import *
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture, error_function
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
@@ -26,38 +26,38 @@ def _get_permission_for_user(user, repo)
 
                        Repository.get_by_repo_name(repo)) \
 
                .filter(UserRepoToPerm.user == User.get_by_username(user)) \
 
                .all()
 
    return perm
 

	
 

	
 
class _BaseTestCase(TestController):
 
class _BaseTestCase(base.TestController):
 
    """
 
    Write all tests here
 
    """
 
    REPO = None
 
    REPO_TYPE = None
 
    NEW_REPO = None
 
    OTHER_TYPE_REPO = None
 
    OTHER_TYPE = None
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('repos'))
 
        response = self.app.get(base.url('repos'))
 

	
 
    def test_create(self):
 
        self.log_user()
 
        repo_name = self.NEW_REPO
 
        description = u'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name))
 
        response = self.app.get(base.url('repo_check_home', repo_name=repo_name))
 
        assert response.json == {u'result': True}
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name, repo_name))
 

	
 
        # test if the repo was created in the database
 
@@ -65,13 +65,13 @@ class _BaseTestCase(TestController):
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        assert new_repo.repo_name == repo_name
 
        assert new_repo.description == description
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name))
 
        response = self.app.get(base.url('summary_home', repo_name=repo_name))
 
        response.mustcontain(repo_name)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name)))
 
@@ -82,21 +82,21 @@ class _BaseTestCase(TestController):
 
        Session().commit()
 

	
 
    def test_case_insensitivity(self):
 
        self.log_user()
 
        repo_name = self.NEW_REPO
 
        description = u'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
        response = self.app.post(base.url('repos'),
 
                                 fixture._get_repo_create_params(repo_private=False,
 
                                                                 repo_name=repo_name,
 
                                                                 repo_type=self.REPO_TYPE,
 
                                                                 repo_description=description,
 
                                                                 _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        # try to create repo with swapped case
 
        swapped_repo_name = repo_name.swapcase()
 
        response = self.app.post(url('repos'),
 
        response = self.app.post(base.url('repos'),
 
                                 fixture._get_repo_create_params(repo_private=False,
 
                                                                 repo_name=swapped_repo_name,
 
                                                                 repo_type=self.REPO_TYPE,
 
                                                                 repo_description=description,
 
                                                                 _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        response.mustcontain('already exists')
 
@@ -108,27 +108,27 @@ class _BaseTestCase(TestController):
 
        self.log_user()
 

	
 
        ## create GROUP
 
        group_name = u'sometest_%s' % self.REPO_TYPE
 
        gr = RepoGroupModel().create(group_name=group_name,
 
                                     group_description=u'test',
 
                                     owner=TEST_USER_ADMIN_LOGIN)
 
                                     owner=base.TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 

	
 
        repo_name = u'ingroup'
 
        repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
 
        description = u'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                repo_group=gr.group_id,
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
 
        response = self.app.get(base.url('repo_check_home', repo_name=repo_name_full))
 
        assert response.json == {u'result': True}
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name_full, repo_name_full))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
@@ -136,13 +136,13 @@ class _BaseTestCase(TestController):
 
        new_repo_id = new_repo.repo_id
 

	
 
        assert new_repo.repo_name == repo_name_full
 
        assert new_repo.description == description
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name_full))
 
        response = self.app.get(base.url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        assert len(inherited_perms) == 1
 
@@ -157,47 +157,47 @@ class _BaseTestCase(TestController):
 

	
 
        RepoModel().delete(repo_name_full)
 
        RepoGroupModel().delete(group_name)
 
        Session().commit()
 

	
 
    def test_create_in_group_without_needed_permissions(self):
 
        usr = self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
 
        usr = self.log_user(base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR_PASS)
 
        # avoid spurious RepoGroup DetachedInstanceError ...
 
        session_csrf_secret_token = self.session_csrf_secret_token()
 
        # revoke
 
        user_model = UserModel()
 
        # disable fork and create on default user
 
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
 
        user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
 
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
 
        user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
 

	
 
        # disable on regular user
 
        user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
 
        user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
 
        user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
 
        user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
 
        user_model.revoke_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
 
        user_model.grant_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.create.none')
 
        user_model.revoke_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
 
        user_model.grant_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
 
        Session().commit()
 

	
 
        ## create GROUP
 
        group_name = u'reg_sometest_%s' % self.REPO_TYPE
 
        gr = RepoGroupModel().create(group_name=group_name,
 
                                     group_description=u'test',
 
                                     owner=TEST_USER_ADMIN_LOGIN)
 
                                     owner=base.TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 

	
 
        group_name_allowed = u'reg_sometest_allowed_%s' % self.REPO_TYPE
 
        gr_allowed = RepoGroupModel().create(group_name=group_name_allowed,
 
                                     group_description=u'test',
 
                                     owner=TEST_USER_REGULAR_LOGIN)
 
                                     owner=base.TEST_USER_REGULAR_LOGIN)
 
        Session().commit()
 

	
 
        repo_name = u'ingroup'
 
        repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
 
        description = u'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                repo_group=gr.group_id,
 
                                                _session_csrf_secret_token=session_csrf_secret_token))
 
@@ -205,22 +205,22 @@ class _BaseTestCase(TestController):
 
        response.mustcontain('Invalid value')
 

	
 
        # user is allowed to create in this group
 
        repo_name = u'ingroup'
 
        repo_name_full = RepoGroup.url_sep().join([group_name_allowed, repo_name])
 
        description = u'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                repo_group=gr_allowed.group_id,
 
                                                _session_csrf_secret_token=session_csrf_secret_token))
 

	
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
 
        response = self.app.get(base.url('repo_check_home', repo_name=repo_name_full))
 
        assert response.json == {u'result': True}
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name_full, repo_name_full))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
@@ -228,13 +228,13 @@ class _BaseTestCase(TestController):
 
        new_repo_id = new_repo.repo_id
 

	
 
        assert new_repo.repo_name == repo_name_full
 
        assert new_repo.description == description
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name_full))
 
        response = self.app.get(base.url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        assert len(inherited_perms) == 1
 
@@ -256,46 +256,46 @@ class _BaseTestCase(TestController):
 
        self.log_user()
 

	
 
        ## create GROUP
 
        group_name = u'sometest_%s' % self.REPO_TYPE
 
        gr = RepoGroupModel().create(group_name=group_name,
 
                                     group_description=u'test',
 
                                     owner=TEST_USER_ADMIN_LOGIN)
 
                                     owner=base.TEST_USER_ADMIN_LOGIN)
 
        perm = Permission.get_by_key('repository.write')
 
        RepoGroupModel().grant_user_permission(gr, TEST_USER_REGULAR_LOGIN, perm)
 
        RepoGroupModel().grant_user_permission(gr, base.TEST_USER_REGULAR_LOGIN, perm)
 

	
 
        ## add repo permissions
 
        Session().commit()
 

	
 
        repo_name = u'ingroup_inherited_%s' % self.REPO_TYPE
 
        repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
 
        description = u'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                repo_group=gr.group_id,
 
                                                repo_copy_permissions=True,
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
 
        response = self.app.get(base.url('repo_check_home', repo_name=repo_name_full))
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name_full, repo_name_full))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name_full).one()
 
        new_repo_id = new_repo.repo_id
 

	
 
        assert new_repo.repo_name == repo_name_full
 
        assert new_repo.description == description
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name_full))
 
        response = self.app.get(base.url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full)))
 
@@ -306,81 +306,81 @@ class _BaseTestCase(TestController):
 

	
 
        # check if inherited permissiona are applied
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        assert len(inherited_perms) == 2
 

	
 
        assert TEST_USER_REGULAR_LOGIN in [x.user.username
 
        assert base.TEST_USER_REGULAR_LOGIN in [x.user.username
 
                                                    for x in inherited_perms]
 
        assert 'repository.write' in [x.permission.permission_name
 
                                               for x in inherited_perms]
 

	
 
        RepoModel().delete(repo_name_full)
 
        RepoGroupModel().delete(group_name)
 
        Session().commit()
 

	
 
    def test_create_remote_repo_wrong_clone_uri(self):
 
        self.log_user()
 
        repo_name = self.NEW_REPO
 
        description = u'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                clone_uri='http://127.0.0.1/repo',
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        response.mustcontain('Invalid repository URL')
 

	
 
    def test_create_remote_repo_wrong_clone_uri_hg_svn(self):
 
        self.log_user()
 
        repo_name = self.NEW_REPO
 
        description = u'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                clone_uri='svn+http://127.0.0.1/repo',
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        response.mustcontain('Invalid repository URL')
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        repo_name = u'vcs_test_new_to_delete_%s' % self.REPO_TYPE
 
        description = u'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_name=repo_name,
 
                                                repo_description=description,
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name))
 
        response = self.app.get(base.url('repo_check_home', repo_name=repo_name))
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name, repo_name))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        assert new_repo.repo_name == repo_name
 
        assert new_repo.description == description
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name))
 
        response = self.app.get(base.url('summary_home', repo_name=repo_name))
 
        response.mustcontain(repo_name)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name)))
 
        except vcs.exceptions.VCSError:
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        response = self.app.post(url('delete_repo', repo_name=repo_name),
 
        response = self.app.post(base.url('delete_repo', repo_name=repo_name),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name))
 

	
 
        response.follow()
 

	
 
@@ -396,43 +396,43 @@ class _BaseTestCase(TestController):
 
        self.log_user()
 
        non_ascii = "ąęł"
 
        repo_name = "%s%s" % (safe_str(self.NEW_REPO), non_ascii)
 
        repo_name_unicode = safe_unicode(repo_name)
 
        description = 'description for newly created repo' + non_ascii
 
        description_unicode = safe_unicode(description)
 
        response = self.app.post(url('repos'),
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name))
 
        response = self.app.get(base.url('repo_check_home', repo_name=repo_name))
 
        assert response.json == {u'result': True}
 
        self.checkSessionFlash(response,
 
                               u'Created repository <a href="/%s">%s</a>'
 
                               % (urllib.quote(repo_name), repo_name_unicode))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name_unicode).one()
 

	
 
        assert new_repo.repo_name == repo_name_unicode
 
        assert new_repo.description == description_unicode
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name))
 
        response = self.app.get(base.url('summary_home', repo_name=repo_name))
 
        response.mustcontain(repo_name)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_unicode)))
 
        except vcs.exceptions.VCSError:
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        response = self.app.post(url('delete_repo', repo_name=repo_name),
 
        response = self.app.post(base.url('delete_repo', repo_name=repo_name),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name_unicode))
 
        response.follow()
 

	
 
        # check if repo was deleted from db
 
        deleted_repo = Session().query(Repository) \
 
@@ -444,50 +444,50 @@ class _BaseTestCase(TestController):
 

	
 
    def test_delete_repo_with_group(self):
 
        # TODO:
 
        pass
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('delete_repo', repo_name=self.REPO),
 
        response = self.app.post(base.url('delete_repo', repo_name=self.REPO),
 
                                 params=dict(_session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
    def test_show(self):
 
        self.log_user()
 
        response = self.app.get(url('summary_home', repo_name=self.REPO))
 
        response = self.app.get(base.url('summary_home', repo_name=self.REPO))
 

	
 
    def test_edit(self):
 
        response = self.app.get(url('edit_repo', repo_name=self.REPO))
 
        response = self.app.get(base.url('edit_repo', repo_name=self.REPO))
 

	
 
    def test_set_private_flag_sets_default_to_none(self):
 
        self.log_user()
 
        # initially repository perm should be read
 
        perm = _get_permission_for_user(user='default', repo=self.REPO)
 
        assert len(perm), 1
 
        assert perm[0].permission.permission_name == 'repository.read'
 
        assert Repository.get_by_repo_name(self.REPO).private == False
 

	
 
        response = self.app.post(url('update_repo', repo_name=self.REPO),
 
        response = self.app.post(base.url('update_repo', repo_name=self.REPO),
 
                        fixture._get_repo_create_params(repo_private=1,
 
                                                repo_name=self.REPO,
 
                                                repo_type=self.REPO_TYPE,
 
                                                owner=TEST_USER_ADMIN_LOGIN,
 
                                                owner=base.TEST_USER_ADMIN_LOGIN,
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        self.checkSessionFlash(response,
 
                               msg='Repository %s updated successfully' % (self.REPO))
 
        assert Repository.get_by_repo_name(self.REPO).private == True
 

	
 
        # now the repo default permission should be None
 
        perm = _get_permission_for_user(user='default', repo=self.REPO)
 
        assert len(perm), 1
 
        assert perm[0].permission.permission_name == 'repository.none'
 

	
 
        response = self.app.post(url('update_repo', repo_name=self.REPO),
 
        response = self.app.post(base.url('update_repo', repo_name=self.REPO),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=self.REPO,
 
                                                repo_type=self.REPO_TYPE,
 
                                                owner=TEST_USER_ADMIN_LOGIN,
 
                                                owner=base.TEST_USER_ADMIN_LOGIN,
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        self.checkSessionFlash(response,
 
                               msg='Repository %s updated successfully' % (self.REPO))
 
        assert Repository.get_by_repo_name(self.REPO).private == False
 

	
 
        # we turn off private now the repo default permission should stay None
 
@@ -499,23 +499,23 @@ class _BaseTestCase(TestController):
 
        perm[0].permission = Permission.get_by_key('repository.read')
 
        Session().commit()
 

	
 
    def test_set_repo_fork_has_no_self_id(self):
 
        self.log_user()
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        response = self.app.get(url('edit_repo_advanced', repo_name=self.REPO))
 
        response = self.app.get(base.url('edit_repo_advanced', repo_name=self.REPO))
 
        opt = """<option value="%s">%s</option>""" % (repo.repo_id, self.REPO)
 
        response.mustcontain(no=[opt])
 

	
 
    def test_set_fork_of_other_repo(self):
 
        self.log_user()
 
        other_repo = u'other_%s' % self.REPO_TYPE
 
        fixture.create_repo(other_repo, repo_type=self.REPO_TYPE)
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        repo2 = Repository.get_by_repo_name(other_repo)
 
        response = self.app.post(url('edit_repo_advanced_fork', repo_name=self.REPO),
 
        response = self.app.post(base.url('edit_repo_advanced_fork', repo_name=self.REPO),
 
                                params=dict(id_fork_of=repo2.repo_id, _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        repo2 = Repository.get_by_repo_name(other_repo)
 
        self.checkSessionFlash(response,
 
            'Marked repository %s as fork of %s' % (repo.repo_name, repo2.repo_name))
 

	
 
@@ -530,62 +530,62 @@ class _BaseTestCase(TestController):
 
        fixture.destroy_repo(other_repo, forks='detach')
 

	
 
    def test_set_fork_of_other_type_repo(self):
 
        self.log_user()
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
 
        response = self.app.post(url('edit_repo_advanced_fork', repo_name=self.REPO),
 
        response = self.app.post(base.url('edit_repo_advanced_fork', repo_name=self.REPO),
 
                                params=dict(id_fork_of=repo2.repo_id, _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
 
        self.checkSessionFlash(response,
 
            'Cannot set repository as fork of repository with other type')
 

	
 
    def test_set_fork_of_none(self):
 
        self.log_user()
 
        ## mark it as None
 
        response = self.app.post(url('edit_repo_advanced_fork', repo_name=self.REPO),
 
        response = self.app.post(base.url('edit_repo_advanced_fork', repo_name=self.REPO),
 
                                params=dict(id_fork_of=None, _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
 
        self.checkSessionFlash(response,
 
                               'Marked repository %s as fork of %s'
 
                               % (repo.repo_name, "Nothing"))
 
        assert repo.fork is None
 

	
 
    def test_set_fork_of_same_repo(self):
 
        self.log_user()
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        response = self.app.post(url('edit_repo_advanced_fork', repo_name=self.REPO),
 
        response = self.app.post(base.url('edit_repo_advanced_fork', repo_name=self.REPO),
 
                                params=dict(id_fork_of=repo.repo_id, _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        self.checkSessionFlash(response,
 
                               'An error occurred during this operation')
 

	
 
    def test_create_on_top_level_without_permissions(self):
 
        usr = self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
 
        usr = self.log_user(base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR_PASS)
 
        # revoke
 
        user_model = UserModel()
 
        # disable fork and create on default user
 
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
 
        user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
 
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
 
        user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
 

	
 
        # disable on regular user
 
        user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
 
        user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
 
        user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
 
        user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
 
        user_model.revoke_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
 
        user_model.grant_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.create.none')
 
        user_model.revoke_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
 
        user_model.grant_perm(base.TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
 
        Session().commit()
 

	
 

	
 
        user = User.get(usr['user_id'])
 

	
 
        repo_name = self.NEW_REPO + u'no_perms'
 
        description = 'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
@@ -597,13 +597,13 @@ class _BaseTestCase(TestController):
 
    @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
 
    def test_create_repo_when_filesystem_op_fails(self):
 
        self.log_user()
 
        repo_name = self.NEW_REPO
 
        description = 'description for newly created repo'
 

	
 
        response = self.app.post(url('repos'),
 
        response = self.app.post(base.url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
@@ -615,29 +615,29 @@ class _BaseTestCase(TestController):
 

	
 
        # repo must not be in filesystem !
 
        assert not os.path.isdir(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name))
 

	
 

	
 
class TestAdminReposControllerGIT(_BaseTestCase):
 
    REPO = GIT_REPO
 
    REPO = base.GIT_REPO
 
    REPO_TYPE = 'git'
 
    NEW_REPO = NEW_GIT_REPO
 
    OTHER_TYPE_REPO = HG_REPO
 
    NEW_REPO = base.NEW_GIT_REPO
 
    OTHER_TYPE_REPO = base.HG_REPO
 
    OTHER_TYPE = 'hg'
 

	
 

	
 
class TestAdminReposControllerHG(_BaseTestCase):
 
    REPO = HG_REPO
 
    REPO = base.HG_REPO
 
    REPO_TYPE = 'hg'
 
    NEW_REPO = NEW_HG_REPO
 
    OTHER_TYPE_REPO = GIT_REPO
 
    NEW_REPO = base.NEW_HG_REPO
 
    OTHER_TYPE_REPO = base.GIT_REPO
 
    OTHER_TYPE = 'git'
 

	
 
    def test_permanent_url_protocol_access(self):
 
        repo = Repository.get_by_repo_name(self.REPO)
 
        permanent_name = '_%d' % repo.repo_id
 

	
 
        # 400 Bad Request - Unable to detect pull/push action
 
        self.app.get(url('summary_home', repo_name=permanent_name),
 
        self.app.get(base.url('summary_home', repo_name=permanent_name),
 
            extra_environ={'HTTP_ACCEPT': 'application/mercurial'},
 
            status=400,
 
        )
kallithea/tests/functional/test_admin_settings.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
from kallithea.model.db import Setting, Ui
 
from kallithea.tests.base import *
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestAdminSettingsController(TestController):
 
class TestAdminSettingsController(base.TestController):
 

	
 
    def test_index_main(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings'))
 
        response = self.app.get(base.url('admin_settings'))
 

	
 
    def test_index_mapping(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_mapping'))
 
        response = self.app.get(base.url('admin_settings_mapping'))
 

	
 
    def test_index_global(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_global'))
 
        response = self.app.get(base.url('admin_settings_global'))
 

	
 
    def test_index_visual(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_visual'))
 
        response = self.app.get(base.url('admin_settings_visual'))
 

	
 
    def test_index_email(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_email'))
 
        response = self.app.get(base.url('admin_settings_email'))
 

	
 
    def test_index_hooks(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_hooks'))
 
        response = self.app.get(base.url('admin_settings_hooks'))
 

	
 
    def test_create_custom_hook(self):
 
        self.log_user()
 
        response = self.app.post(url('admin_settings_hooks'),
 
        response = self.app.post(base.url('admin_settings_hooks'),
 
                                params=dict(new_hook_ui_key='test_hooks_1',
 
                                            new_hook_ui_value='cd %s' % TESTS_TMP_PATH,
 
                                            new_hook_ui_value='cd %s' % base.TESTS_TMP_PATH,
 
                                            _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        self.checkSessionFlash(response, 'Added new hook')
 
        response = response.follow()
 
        response.mustcontain('test_hooks_1')
 
        response.mustcontain('cd %s' % TESTS_TMP_PATH)
 
        response.mustcontain('cd %s' % base.TESTS_TMP_PATH)
 

	
 
    def test_edit_custom_hook(self):
 
        self.log_user()
 
        response = self.app.post(url('admin_settings_hooks'),
 
        response = self.app.post(base.url('admin_settings_hooks'),
 
                                params=dict(hook_ui_key='test_hooks_1',
 
                                            hook_ui_value='old_value_of_hook_1',
 
                                            hook_ui_value_new='new_value_of_hook_1',
 
                                            _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        response = response.follow()
 
        response.mustcontain('test_hooks_1')
 
        response.mustcontain('new_value_of_hook_1')
 

	
 
    def test_add_existing_custom_hook(self):
 
        self.log_user()
 
        response = self.app.post(url('admin_settings_hooks'),
 
        response = self.app.post(base.url('admin_settings_hooks'),
 
                                params=dict(new_hook_ui_key='test_hooks_1',
 
                                            new_hook_ui_value='attempted_new_value',
 
                                            _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        self.checkSessionFlash(response, 'Hook already exists')
 
        response = response.follow()
 
        response.mustcontain('test_hooks_1')
 
        response.mustcontain('new_value_of_hook_1')
 

	
 
    def test_create_custom_hook_delete(self):
 
        self.log_user()
 
        response = self.app.post(url('admin_settings_hooks'),
 
        response = self.app.post(base.url('admin_settings_hooks'),
 
                                params=dict(new_hook_ui_key='test_hooks_2',
 
                                            new_hook_ui_value='cd %s2' % TESTS_TMP_PATH,
 
                                            new_hook_ui_value='cd %s2' % base.TESTS_TMP_PATH,
 
                                            _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        self.checkSessionFlash(response, 'Added new hook')
 
        response = response.follow()
 
        response.mustcontain('test_hooks_2')
 
        response.mustcontain('cd %s2' % TESTS_TMP_PATH)
 
        response.mustcontain('cd %s2' % base.TESTS_TMP_PATH)
 

	
 
        hook_id = Ui.get_by_key('hooks', 'test_hooks_2').ui_id
 
        ## delete
 
        self.app.post(url('admin_settings_hooks'),
 
        self.app.post(base.url('admin_settings_hooks'),
 
                        params=dict(hook_id=hook_id, _session_csrf_secret_token=self.session_csrf_secret_token()))
 
        response = self.app.get(url('admin_settings_hooks'))
 
        response = self.app.get(base.url('admin_settings_hooks'))
 
        response.mustcontain(no=['test_hooks_2'])
 
        response.mustcontain(no=['cd %s2' % TESTS_TMP_PATH])
 
        response.mustcontain(no=['cd %s2' % base.TESTS_TMP_PATH])
 

	
 
    def test_add_existing_builtin_hook(self):
 
        self.log_user()
 
        response = self.app.post(url('admin_settings_hooks'),
 
        response = self.app.post(base.url('admin_settings_hooks'),
 
                                params=dict(new_hook_ui_key='changegroup.update',
 
                                            new_hook_ui_value='attempted_new_value',
 
                                            _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        self.checkSessionFlash(response, 'Builtin hooks are read-only')
 
        response = response.follow()
 
        response.mustcontain('changegroup.update')
 
        response.mustcontain('hg update &gt;&amp;2')
 

	
 
    def test_index_search(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_search'))
 
        response = self.app.get(base.url('admin_settings_search'))
 

	
 
    def test_index_system(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_system'))
 
        response = self.app.get(base.url('admin_settings_system'))
 

	
 
    def test_ga_code_active(self):
 
        self.log_user()
 
        old_title = 'Kallithea'
 
        old_realm = 'Kallithea authentication'
 
        new_ga_code = 'ga-test-123456789'
 
        response = self.app.post(url('admin_settings_global'),
 
        response = self.app.post(base.url('admin_settings_global'),
 
                        params=dict(title=old_title,
 
                                 realm=old_realm,
 
                                 ga_code=new_ga_code,
 
                                 captcha_private_key='',
 
                                 captcha_public_key='',
 
                                 _session_csrf_secret_token=self.session_csrf_secret_token(),
 
@@ -133,13 +133,13 @@ class TestAdminSettingsController(TestCo
 

	
 
    def test_ga_code_inactive(self):
 
        self.log_user()
 
        old_title = 'Kallithea'
 
        old_realm = 'Kallithea authentication'
 
        new_ga_code = ''
 
        response = self.app.post(url('admin_settings_global'),
 
        response = self.app.post(base.url('admin_settings_global'),
 
                        params=dict(title=old_title,
 
                                 realm=old_realm,
 
                                 ga_code=new_ga_code,
 
                                 captcha_private_key='',
 
                                 captcha_public_key='',
 
                                 _session_csrf_secret_token=self.session_csrf_secret_token(),
 
@@ -153,55 +153,55 @@ class TestAdminSettingsController(TestCo
 

	
 
    def test_captcha_activate(self):
 
        self.log_user()
 
        old_title = 'Kallithea'
 
        old_realm = 'Kallithea authentication'
 
        new_ga_code = ''
 
        response = self.app.post(url('admin_settings_global'),
 
        response = self.app.post(base.url('admin_settings_global'),
 
                        params=dict(title=old_title,
 
                                 realm=old_realm,
 
                                 ga_code=new_ga_code,
 
                                 captcha_private_key='1234567890',
 
                                 captcha_public_key='1234567890',
 
                                 _session_csrf_secret_token=self.session_csrf_secret_token(),
 
                                 ))
 

	
 
        self.checkSessionFlash(response, 'Updated application settings')
 
        assert Setting.get_app_settings()['captcha_private_key'] == '1234567890'
 

	
 
        response = self.app.get(url('register'))
 
        response = self.app.get(base.url('register'))
 
        response.mustcontain('captcha')
 

	
 
    def test_captcha_deactivate(self):
 
        self.log_user()
 
        old_title = 'Kallithea'
 
        old_realm = 'Kallithea authentication'
 
        new_ga_code = ''
 
        response = self.app.post(url('admin_settings_global'),
 
        response = self.app.post(base.url('admin_settings_global'),
 
                        params=dict(title=old_title,
 
                                 realm=old_realm,
 
                                 ga_code=new_ga_code,
 
                                 captcha_private_key='',
 
                                 captcha_public_key='1234567890',
 
                                 _session_csrf_secret_token=self.session_csrf_secret_token(),
 
                                 ))
 

	
 
        self.checkSessionFlash(response, 'Updated application settings')
 
        assert Setting.get_app_settings()['captcha_private_key'] == ''
 

	
 
        response = self.app.get(url('register'))
 
        response = self.app.get(base.url('register'))
 
        response.mustcontain(no=['captcha'])
 

	
 
    def test_title_change(self):
 
        self.log_user()
 
        old_title = 'Kallithea'
 
        new_title = old_title + '_changed'
 
        old_realm = 'Kallithea authentication'
 

	
 
        for new_title in ['Changed', 'Żółwik', old_title]:
 
            response = self.app.post(url('admin_settings_global'),
 
            response = self.app.post(base.url('admin_settings_global'),
 
                        params=dict(title=new_title,
 
                                 realm=old_realm,
 
                                 ga_code='',
 
                                 captcha_private_key='',
 
                                 captcha_public_key='',
 
                                 _session_csrf_secret_token=self.session_csrf_secret_token(),
kallithea/tests/functional/test_admin_user_groups.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
from kallithea.model.db import Permission, UserGroup, UserGroupToPerm
 
from kallithea.model.meta import Session
 
from kallithea.tests.base import *
 
from kallithea.tests import base
 

	
 

	
 
TEST_USER_GROUP = u'admins_test'
 

	
 

	
 
class TestAdminUsersGroupsController(TestController):
 
class TestAdminUsersGroupsController(base.TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('users_groups'))
 
        response = self.app.get(base.url('users_groups'))
 
        # Test response...
 

	
 
    def test_create(self):
 
        self.log_user()
 
        users_group_name = TEST_USER_GROUP
 
        response = self.app.post(url('users_groups'),
 
        response = self.app.post(base.url('users_groups'),
 
                                 {'users_group_name': users_group_name,
 
                                  'user_group_description': u'DESC',
 
                                  'active': True,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        response.follow()
 

	
 
        self.checkSessionFlash(response,
 
                               'Created user group <a href="/_admin/user_groups/')
 
        self.checkSessionFlash(response,
 
                               '/edit">%s</a>' % TEST_USER_GROUP)
 

	
 
    def test_new(self):
 
        response = self.app.get(url('new_users_group'))
 
        response = self.app.get(base.url('new_users_group'))
 

	
 
    def test_update(self):
 
        response = self.app.post(url('update_users_group', id=1), status=403)
 
        response = self.app.post(base.url('update_users_group', id=1), status=403)
 

	
 
    def test_update_browser_fakeout(self):
 
        response = self.app.post(url('update_users_group', id=1),
 
        response = self.app.post(base.url('update_users_group', id=1),
 
                                 params=dict(_session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        users_group_name = TEST_USER_GROUP + 'another'
 
        response = self.app.post(url('users_groups'),
 
        response = self.app.post(base.url('users_groups'),
 
                                 {'users_group_name': users_group_name,
 
                                  'user_group_description': u'DESC',
 
                                  'active': True,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        response.follow()
 

	
 
        self.checkSessionFlash(response,
 
                               'Created user group ')
 

	
 
        gr = Session().query(UserGroup) \
 
            .filter(UserGroup.users_group_name == users_group_name).one()
 

	
 
        response = self.app.post(url('delete_users_group', id=gr.users_group_id),
 
        response = self.app.post(base.url('delete_users_group', id=gr.users_group_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        gr = Session().query(UserGroup) \
 
            .filter(UserGroup.users_group_name == users_group_name).scalar()
 

	
 
        assert gr is None
 

	
 
    def test_default_perms_enable_repository_read_on_group(self):
 
        self.log_user()
 
        users_group_name = TEST_USER_GROUP + 'another2'
 
        response = self.app.post(url('users_groups'),
 
        response = self.app.post(base.url('users_groups'),
 
                                 {'users_group_name': users_group_name,
 
                                  'user_group_description': u'DESC',
 
                                  'active': True,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        response.follow()
 

	
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        self.checkSessionFlash(response,
 
                               'Created user group ')
 
        ## ENABLE REPO CREATE ON A GROUP
 
        response = self.app.post(url('edit_user_group_default_perms_update',
 
        response = self.app.post(base.url('edit_user_group_default_perms_update',
 
                                     id=ug.users_group_id),
 
                                 {'create_repo_perm': True,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        response.follow()
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        p = Permission.get_by_key('hg.create.repository')
 
@@ -94,13 +94,13 @@ class TestAdminUsersGroupsController(Tes
 
        assert sorted([[x.users_group_id, x.permission_id, ] for x in perms]) == sorted([[ug.users_group_id, p.permission_id],
 
                    [ug.users_group_id, p2.permission_id],
 
                    [ug.users_group_id, p3.permission_id]])
 

	
 
        ## DISABLE REPO CREATE ON A GROUP
 
        response = self.app.post(
 
            url('edit_user_group_default_perms_update', id=ug.users_group_id),
 
            base.url('edit_user_group_default_perms_update', id=ug.users_group_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        response.follow()
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        p = Permission.get_by_key('hg.create.none')
 
        p2 = Permission.get_by_key('hg.usergroup.create.false')
 
@@ -115,13 +115,13 @@ class TestAdminUsersGroupsController(Tes
 
                    [ug.users_group_id, p2.permission_id],
 
                    [ug.users_group_id, p3.permission_id]])
 

	
 
        # DELETE !
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        ugid = ug.users_group_id
 
        response = self.app.post(url('delete_users_group', id=ug.users_group_id),
 
        response = self.app.post(base.url('delete_users_group', id=ug.users_group_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        response = response.follow()
 
        gr = Session().query(UserGroup) \
 
            .filter(UserGroup.users_group_name == users_group_name).scalar()
 

	
 
        assert gr is None
 
@@ -132,24 +132,24 @@ class TestAdminUsersGroupsController(Tes
 
                  x.permission_id, ] for x in perms]
 
        assert perms == []
 

	
 
    def test_default_perms_enable_repository_fork_on_group(self):
 
        self.log_user()
 
        users_group_name = TEST_USER_GROUP + 'another2'
 
        response = self.app.post(url('users_groups'),
 
        response = self.app.post(base.url('users_groups'),
 
                                 {'users_group_name': users_group_name,
 
                                  'user_group_description': u'DESC',
 
                                  'active': True,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        response.follow()
 

	
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        self.checkSessionFlash(response,
 
                               'Created user group ')
 
        ## ENABLE REPO CREATE ON A GROUP
 
        response = self.app.post(url('edit_user_group_default_perms_update',
 
        response = self.app.post(base.url('edit_user_group_default_perms_update',
 
                                     id=ug.users_group_id),
 
                                 {'fork_repo_perm': True, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        response.follow()
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        p = Permission.get_by_key('hg.create.none')
 
@@ -162,13 +162,13 @@ class TestAdminUsersGroupsController(Tes
 

	
 
        assert sorted([[x.users_group_id, x.permission_id, ] for x in perms]) == sorted([[ug.users_group_id, p.permission_id],
 
                    [ug.users_group_id, p2.permission_id],
 
                    [ug.users_group_id, p3.permission_id]])
 

	
 
        ## DISABLE REPO CREATE ON A GROUP
 
        response = self.app.post(url('edit_user_group_default_perms_update', id=ug.users_group_id),
 
        response = self.app.post(base.url('edit_user_group_default_perms_update', id=ug.users_group_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        response.follow()
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        p = Permission.get_by_key('hg.create.none')
 
        p2 = Permission.get_by_key('hg.usergroup.create.false')
 
@@ -182,13 +182,13 @@ class TestAdminUsersGroupsController(Tes
 
                    [ug.users_group_id, p2.permission_id],
 
                    [ug.users_group_id, p3.permission_id]])
 

	
 
        # DELETE !
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        ugid = ug.users_group_id
 
        response = self.app.post(url('delete_users_group', id=ug.users_group_id),
 
        response = self.app.post(base.url('delete_users_group', id=ug.users_group_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        response = response.follow()
 
        gr = Session().query(UserGroup) \
 
                           .filter(UserGroup.users_group_name ==
 
                                   users_group_name).scalar()
 

	
 
@@ -198,8 +198,8 @@ class TestAdminUsersGroupsController(Tes
 
            .filter(UserGroupToPerm.users_group_id == ugid).all()
 
        perms = [[x.users_group_id,
 
                  x.permission_id, ] for x in perms]
 
        assert perms == []
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('delete_users_group', id=1),
 
        response = self.app.post(base.url('delete_users_group', id=1),
 
                                 params=dict(_session_csrf_secret_token=self.session_csrf_secret_token()))
kallithea/tests/functional/test_admin_users.py
Show inline comments
 
@@ -21,13 +21,13 @@ from kallithea.controllers.admin.users i
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import check_password
 
from kallithea.model import validators
 
from kallithea.model.db import Permission, RepoGroup, User, UserApiKeys, UserSshKeys
 
from kallithea.model.meta import Session
 
from kallithea.model.user import UserModel
 
from kallithea.tests.base import *
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
@@ -40,36 +40,36 @@ def user_and_repo_group_fail():
 
    yield user, repo_group
 
    # cleanup
 
    if RepoGroup.get_by_group_name(groupname):
 
        fixture.destroy_repo_group(repo_group)
 

	
 

	
 
class TestAdminUsersController(TestController):
 
class TestAdminUsersController(base.TestController):
 
    test_user_1 = 'testme'
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        if User.get_by_username(cls.test_user_1):
 
            UserModel().delete(cls.test_user_1)
 
            Session().commit()
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('users'))
 
        response = self.app.get(base.url('users'))
 
        # TODO: Test response...
 

	
 
    def test_create(self):
 
        self.log_user()
 
        username = 'newtestuser'
 
        password = 'test12'
 
        password_confirmation = password
 
        name = u'name'
 
        lastname = u'lastname'
 
        email = 'mail@example.com'
 

	
 
        response = self.app.post(url('new_user'),
 
        response = self.app.post(base.url('new_user'),
 
            {'username': username,
 
             'password': password,
 
             'password_confirmation': password_confirmation,
 
             'firstname': name,
 
             'active': True,
 
             'lastname': lastname,
 
@@ -99,13 +99,13 @@ class TestAdminUsersController(TestContr
 
        username = 'new_user'
 
        password = ''
 
        name = u'name'
 
        lastname = u'lastname'
 
        email = 'errmail.example.com'
 

	
 
        response = self.app.post(url('new_user'),
 
        response = self.app.post(base.url('new_user'),
 
            {'username': username,
 
             'password': password,
 
             'name': name,
 
             'active': False,
 
             'lastname': lastname,
 
             'email': email,
 
@@ -123,15 +123,15 @@ class TestAdminUsersController(TestContr
 

	
 
        with pytest.raises(NoResultFound):
 
            get_user(), 'found user in database'
 

	
 
    def test_new(self):
 
        self.log_user()
 
        response = self.app.get(url('new_user'))
 
        response = self.app.get(base.url('new_user'))
 

	
 
    @parametrize('name,attrs',
 
    @base.parametrize('name,attrs',
 
        [('firstname', {'firstname': 'new_username'}),
 
         ('lastname', {'lastname': 'new_username'}),
 
         ('admin', {'admin': True}),
 
         ('admin', {'admin': False}),
 
         ('extern_type', {'extern_type': 'ldap'}),
 
         ('extern_type', {'extern_type': None}),
 
@@ -164,13 +164,13 @@ class TestAdminUsersController(TestContr
 
            # cannot update this via form, expected value is original one
 
            params['extern_name'] = self.test_user_1
 
            # special case since this user is not logged in yet his data is
 
            # not filled so we use creation data
 

	
 
        params.update({'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        response = self.app.post(url('update_user', id=usr.user_id), params)
 
        response = self.app.post(base.url('update_user', id=usr.user_id), params)
 
        self.checkSessionFlash(response, 'User updated successfully')
 
        params.pop('_session_csrf_secret_token')
 

	
 
        updated_user = User.get_by_username(self.test_user_1)
 
        updated_params = updated_user.get_api_data(True)
 
        updated_params.update({'password_confirmation': ''})
 
@@ -183,13 +183,13 @@ class TestAdminUsersController(TestContr
 
        username = 'newtestuserdeleteme'
 

	
 
        fixture.create_user(name=username)
 

	
 
        new_user = Session().query(User) \
 
            .filter(User.username == username).one()
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
        response = self.app.post(base.url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        self.checkSessionFlash(response, 'Successfully deleted user')
 

	
 
    def test_delete_repo_err(self):
 
        self.log_user()
 
@@ -198,50 +198,50 @@ class TestAdminUsersController(TestContr
 

	
 
        fixture.create_user(name=username)
 
        fixture.create_repo(name=reponame, cur_user=username)
 

	
 
        new_user = Session().query(User) \
 
            .filter(User.username == username).one()
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
        response = self.app.post(base.url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'User &quot;%s&quot; still '
 
                               'owns 1 repositories and cannot be removed. '
 
                               'Switch owners or remove those repositories: '
 
                               '%s' % (username, reponame))
 

	
 
        response = self.app.post(url('delete_repo', repo_name=reponame),
 
        response = self.app.post(base.url('delete_repo', repo_name=reponame),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'Deleted repository %s' % reponame)
 

	
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
        response = self.app.post(base.url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'Successfully deleted user')
 

	
 
    def test_delete_repo_group_err(self, user_and_repo_group_fail):
 
        new_user, repo_group = user_and_repo_group_fail
 
        username = new_user.username
 
        groupname = repo_group.group_name
 

	
 
        self.log_user()
 

	
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
        response = self.app.post(base.url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'User &quot;%s&quot; still '
 
                               'owns 1 repository groups and cannot be removed. '
 
                               'Switch owners or remove those repository groups: '
 
                               '%s' % (username, groupname))
 

	
 
        # Relevant _if_ the user deletion succeeded to make sure we can render groups without owner
 
        # rg = RepoGroup.get_by_group_name(group_name=groupname)
 
        # response = self.app.get(url('repos_groups', id=rg.group_id))
 
        # response = self.app.get(base.url('repos_groups', id=rg.group_id))
 

	
 
        response = self.app.post(url('delete_repo_group', group_name=groupname),
 
        response = self.app.post(base.url('delete_repo_group', group_name=groupname),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'Removed repository group %s' % groupname)
 

	
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
        response = self.app.post(base.url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'Successfully deleted user')
 

	
 
    def test_delete_user_group_err(self):
 
        self.log_user()
 
        username = 'usergrouperr'
 
@@ -249,33 +249,33 @@ class TestAdminUsersController(TestContr
 

	
 
        fixture.create_user(name=username)
 
        ug = fixture.create_user_group(name=groupname, cur_user=username)
 

	
 
        new_user = Session().query(User) \
 
            .filter(User.username == username).one()
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
        response = self.app.post(base.url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'User &quot;%s&quot; still '
 
                               'owns 1 user groups and cannot be removed. '
 
                               'Switch owners or remove those user groups: '
 
                               '%s' % (username, groupname))
 

	
 
        # TODO: why do this fail?
 
        #response = self.app.delete(url('delete_users_group', id=groupname))
 
        #response = self.app.delete(base.url('delete_users_group', id=groupname))
 
        #self.checkSessionFlash(response, 'Removed user group %s' % groupname)
 

	
 
        fixture.destroy_user_group(ug.users_group_id)
 

	
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
        response = self.app.post(base.url('delete_user', id=new_user.user_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'Successfully deleted user')
 

	
 
    def test_edit(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        response = self.app.get(url('edit_user', id=user.user_id))
 
        user = User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
 
        response = self.app.get(base.url('edit_user', id=user.user_id))
 

	
 
    def test_add_perm_create_repo(self):
 
        self.log_user()
 
        perm_none = Permission.get_by_key('hg.create.none')
 
        perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
@@ -287,13 +287,13 @@ class TestAdminUsersController(TestContr
 

	
 
        try:
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(user, perm_none) == False
 
            assert UserModel().has_perm(user, perm_create) == False
 

	
 
            response = self.app.post(url('edit_user_perms_update', id=uid),
 
            response = self.app.post(base.url('edit_user_perms_update', id=uid),
 
                                     params=dict(create_repo_perm=True,
 
                                                 _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
@@ -317,13 +317,13 @@ class TestAdminUsersController(TestContr
 

	
 
        try:
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(user, perm_none) == False
 
            assert UserModel().has_perm(user, perm_create) == False
 

	
 
            response = self.app.post(url('edit_user_perms_update', id=uid),
 
            response = self.app.post(base.url('edit_user_perms_update', id=uid),
 
                                     params=dict(_session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            # User should have None permission on creation repository
 
@@ -346,13 +346,13 @@ class TestAdminUsersController(TestContr
 

	
 
        try:
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(user, perm_none) == False
 
            assert UserModel().has_perm(user, perm_fork) == False
 

	
 
            response = self.app.post(url('edit_user_perms_update', id=uid),
 
            response = self.app.post(base.url('edit_user_perms_update', id=uid),
 
                                     params=dict(create_repo_perm=True,
 
                                                 _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
@@ -376,13 +376,13 @@ class TestAdminUsersController(TestContr
 

	
 
        try:
 
            # User should have None permission on creation repository
 
            assert UserModel().has_perm(user, perm_none) == False
 
            assert UserModel().has_perm(user, perm_fork) == False
 

	
 
            response = self.app.post(url('edit_user_perms_update', id=uid),
 
            response = self.app.post(base.url('edit_user_perms_update', id=uid),
 
                                     params=dict(_session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            # User should have None permission on creation repository
 
@@ -391,85 +391,85 @@ class TestAdminUsersController(TestContr
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_ips(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        response = self.app.get(url('edit_user_ips', id=user.user_id))
 
        user = User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
 
        response = self.app.get(base.url('edit_user_ips', id=user.user_id))
 
        response.mustcontain('All IP addresses are allowed')
 

	
 
    @parametrize('test_name,ip,ip_range,failure', [
 
    @base.parametrize('test_name,ip,ip_range,failure', [
 
        ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False),
 
        ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False),
 
        ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False),
 
        ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False),
 
        ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True),
 
        ('127_bad_ip', 'foobar', 'foobar', True),
 
    ])
 
    def test_add_ip(self, test_name, ip, ip_range, failure, auto_clear_ip_permissions):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user = User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_ips_update', id=user_id),
 
        response = self.app.post(base.url('edit_user_ips_update', id=user_id),
 
                                 params=dict(new_ip=ip, _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        if failure:
 
            self.checkSessionFlash(response, 'Please enter a valid IPv4 or IPv6 address')
 
            response = self.app.get(url('edit_user_ips', id=user_id))
 
            response = self.app.get(base.url('edit_user_ips', id=user_id))
 
            response.mustcontain(no=[ip])
 
            response.mustcontain(no=[ip_range])
 

	
 
        else:
 
            response = self.app.get(url('edit_user_ips', id=user_id))
 
            response = self.app.get(base.url('edit_user_ips', id=user_id))
 
            response.mustcontain(ip)
 
            response.mustcontain(ip_range)
 

	
 
    def test_delete_ip(self, auto_clear_ip_permissions):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user = User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 
        ip = '127.0.0.1/32'
 
        ip_range = '127.0.0.1 - 127.0.0.1'
 
        with test_context(self.app):
 
            new_ip = UserModel().add_extra_ip(user_id, ip)
 
            Session().commit()
 
        new_ip_id = new_ip.ip_id
 

	
 
        response = self.app.get(url('edit_user_ips', id=user_id))
 
        response = self.app.get(base.url('edit_user_ips', id=user_id))
 
        response.mustcontain(ip)
 
        response.mustcontain(ip_range)
 

	
 
        self.app.post(url('edit_user_ips_delete', id=user_id),
 
        self.app.post(base.url('edit_user_ips_delete', id=user_id),
 
                      params=dict(del_ip_id=new_ip_id, _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        response = self.app.get(url('edit_user_ips', id=user_id))
 
        response = self.app.get(base.url('edit_user_ips', id=user_id))
 
        response.mustcontain('All IP addresses are allowed')
 
        response.mustcontain(no=[ip])
 
        response.mustcontain(no=[ip_range])
 

	
 
    def test_api_keys(self):
 
        self.log_user()
 

	
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        response = self.app.get(url('edit_user_api_keys', id=user.user_id))
 
        user = User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
 
        response = self.app.get(base.url('edit_user_api_keys', id=user.user_id))
 
        response.mustcontain(user.api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
    @parametrize('desc,lifetime', [
 
    @base.parametrize('desc,lifetime', [
 
        ('forever', -1),
 
        ('5mins', 60*5),
 
        ('30days', 60*60*24*30),
 
    ])
 
    def test_add_api_keys(self, desc, lifetime):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user = User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_api_keys_update', id=user_id),
 
        response = self.app.post(base.url('edit_user_api_keys_update', id=user_id),
 
                 {'description': desc, 'lifetime': lifetime, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        try:
 
            response = response.follow()
 
            user = User.get(user_id)
 
            for api_key in user.api_keys:
 
@@ -478,55 +478,55 @@ class TestAdminUsersController(TestContr
 
            for api_key in UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all():
 
                Session().delete(api_key)
 
                Session().commit()
 

	
 
    def test_remove_api_key(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user = User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_api_keys_update', id=user_id),
 
        response = self.app.post(base.url('edit_user_api_keys_update', id=user_id),
 
                {'description': 'desc', 'lifetime': -1, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        response = response.follow()
 

	
 
        # now delete our key
 
        keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
 
        assert 1 == len(keys)
 

	
 
        response = self.app.post(url('edit_user_api_keys_delete', id=user_id),
 
        response = self.app.post(base.url('edit_user_api_keys_delete', id=user_id),
 
                 {'del_api_key': keys[0].api_key, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully deleted')
 
        keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
 
        assert 0 == len(keys)
 

	
 
    def test_reset_main_api_key(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user = User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 
        api_key = user.api_key
 
        response = self.app.get(url('edit_user_api_keys', id=user_id))
 
        response = self.app.get(base.url('edit_user_api_keys', id=user_id))
 
        response.mustcontain(api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
        response = self.app.post(url('edit_user_api_keys_delete', id=user_id),
 
        response = self.app.post(base.url('edit_user_api_keys_delete', id=user_id),
 
                 {'del_api_key_builtin': api_key, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully reset')
 
        response = response.follow()
 
        response.mustcontain(no=[api_key])
 

	
 
    def test_add_ssh_key(self):
 
        description = u'something'
 
        public_key = u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== me@localhost'
 
        fingerprint = u'Ke3oUCNJM87P0jJTb3D+e3shjceP2CqMpQKVd75E9I8'
 

	
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user = User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_ssh_keys', id=user_id),
 
        response = self.app.post(base.url('edit_user_ssh_keys', id=user_id),
 
                                 {'description': description,
 
                                  'public_key': public_key,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key %s successfully added' % fingerprint)
 

	
 
        response = response.follow()
 
@@ -540,114 +540,114 @@ class TestAdminUsersController(TestContr
 
    def test_remove_ssh_key(self):
 
        description = u''
 
        public_key = u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== me@localhost'
 
        fingerprint = u'Ke3oUCNJM87P0jJTb3D+e3shjceP2CqMpQKVd75E9I8'
 

	
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user = User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_ssh_keys', id=user_id),
 
        response = self.app.post(base.url('edit_user_ssh_keys', id=user_id),
 
                                 {'description': description,
 
                                  'public_key': public_key,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key %s successfully added' % fingerprint)
 
        response.follow()
 
        ssh_key = UserSshKeys.query().filter(UserSshKeys.user_id == user_id).one()
 
        assert ssh_key.description == u'me@localhost'
 

	
 
        response = self.app.post(url('edit_user_ssh_keys_delete', id=user_id),
 
        response = self.app.post(base.url('edit_user_ssh_keys_delete', id=user_id),
 
                                 {'del_public_key': ssh_key.public_key,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'SSH key successfully deleted')
 
        keys = UserSshKeys.query().all()
 
        assert 0 == len(keys)
 

	
 

	
 
class TestAdminUsersController_unittest(TestController):
 
class TestAdminUsersController_unittest(base.TestController):
 
    """ Unit tests for the users controller """
 

	
 
    def test_get_user_or_raise_if_default(self, monkeypatch, test_context_fixture):
 
        # flash complains about an non-existing session
 
        def flash_mock(*args, **kwargs):
 
            pass
 
        monkeypatch.setattr(h, 'flash', flash_mock)
 

	
 
        u = UsersController()
 
        # a regular user should work correctly
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user = User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
 
        assert u._get_user_or_raise_if_default(user.user_id) == user
 
        # the default user should raise
 
        with pytest.raises(HTTPNotFound):
 
            u._get_user_or_raise_if_default(User.get_default_user().user_id)
 

	
 

	
 
class TestAdminUsersControllerForDefaultUser(TestController):
 
class TestAdminUsersControllerForDefaultUser(base.TestController):
 
    """
 
    Edit actions on the default user are not allowed.
 
    Validate that they throw a 404 exception.
 
    """
 
    def test_edit_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user', id=user.user_id), status=404)
 
        response = self.app.get(base.url('edit_user', id=user.user_id), status=404)
 

	
 
    def test_edit_advanced_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_advanced', id=user.user_id), status=404)
 
        response = self.app.get(base.url('edit_user_advanced', id=user.user_id), status=404)
 

	
 
    # API keys
 
    def test_edit_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_api_keys', id=user.user_id), status=404)
 
        response = self.app.get(base.url('edit_user_api_keys', id=user.user_id), status=404)
 

	
 
    def test_add_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_api_keys_update', id=user.user_id),
 
        response = self.app.post(base.url('edit_user_api_keys_update', id=user.user_id),
 
                 {'_session_csrf_secret_token': self.session_csrf_secret_token()}, status=404)
 

	
 
    def test_delete_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_api_keys_delete', id=user.user_id),
 
        response = self.app.post(base.url('edit_user_api_keys_delete', id=user.user_id),
 
                 {'_session_csrf_secret_token': self.session_csrf_secret_token()}, status=404)
 

	
 
    # Permissions
 
    def test_edit_perms_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_perms', id=user.user_id), status=404)
 
        response = self.app.get(base.url('edit_user_perms', id=user.user_id), status=404)
 

	
 
    def test_update_perms_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_perms_update', id=user.user_id),
 
        response = self.app.post(base.url('edit_user_perms_update', id=user.user_id),
 
                 {'_session_csrf_secret_token': self.session_csrf_secret_token()}, status=404)
 

	
 
    # Emails
 
    def test_edit_emails_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_emails', id=user.user_id), status=404)
 
        response = self.app.get(base.url('edit_user_emails', id=user.user_id), status=404)
 

	
 
    def test_add_emails_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_emails_update', id=user.user_id),
 
        response = self.app.post(base.url('edit_user_emails_update', id=user.user_id),
 
                 {'_session_csrf_secret_token': self.session_csrf_secret_token()}, status=404)
 

	
 
    def test_delete_emails_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_emails_delete', id=user.user_id),
 
        response = self.app.post(base.url('edit_user_emails_delete', id=user.user_id),
 
                 {'_session_csrf_secret_token': self.session_csrf_secret_token()}, status=404)
 

	
 
    # IP addresses
 
    # Add/delete of IP addresses for the default user is used to maintain
 
    # the global IP whitelist and thus allowed. Only 'edit' is forbidden.
 
    def test_edit_ip_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_ips', id=user.user_id), status=404)
 
        response = self.app.get(base.url('edit_user_ips', id=user.user_id), status=404)
kallithea/tests/functional/test_changelog.py
Show inline comments
 
from kallithea.tests.base import *
 
from kallithea.tests import base
 

	
 

	
 
class TestChangelogController(TestController):
 
class TestChangelogController(base.TestController):
 

	
 
    def test_index_hg(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=HG_REPO))
 
        response = self.app.get(base.url(controller='changelog', action='index',
 
                                    repo_name=base.HG_REPO))
 

	
 
        response.mustcontain('''id="chg_20" class="mergerow"''')
 
        response.mustcontain(
 
            """<input class="changeset_range" """
 
            """id="7b22a518347bb9bc19679f6af07cd0a61bfe16e7" """
 
            """name="7b22a518347bb9bc19679f6af07cd0a61bfe16e7" """
 
            """type="checkbox" value="1" />"""
 
        )
 
        # rev 640: code garden
 
        response.mustcontain(
 
            """<a class="changeset_hash" href="/%s/changeset/0a4e54a4460401d6dbbd6a3604b17cd2b3606b82">r640:0a4e54a44604</a>""" % HG_REPO
 
            """<a class="changeset_hash" href="/%s/changeset/0a4e54a4460401d6dbbd6a3604b17cd2b3606b82">r640:0a4e54a44604</a>""" % base.HG_REPO
 
        )
 
        response.mustcontain("""code garden""")
 

	
 
        response.mustcontain("""var jsdata = ([[[0, 1], [[0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 1, 2, 0], [0, 0, 1, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 1, 0], [1, 1, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 1], [[0, 0, 2, 0], [1, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 1, 3, 0], [0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 3, 0]], 0, 0, 0, 0, 0, 0], [[1, 3], [[0, 0, 2, 0], [1, 1, 3, 0]], 0, 0, 0, 0, 0, 0], [[1, 3], [[0, 0, 2, 0], [1, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 1, 4, 0], [0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 4, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 4, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 4, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 4, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 4, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 4, 0]], 0, 0, 0, 0, 0, 0], [[1, 4], [[0, 0, 2, 0], [1, 1, 4, 0]], 0, 0, 0, 0, 0, 0], [[1, 4], [[0, 0, 2, 0], [1, 1, 4, 0], [1, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 4, 0]], 0, 0, 0, 0, 0, 0], [[1, 4], [[0, 0, 2, 0], [1, 1, 4, 0]], 0, 0, 0, 0, 0, 0], [[1, 4], [[0, 0, 2, 0], [1, 1, 4, 0]], 0, 0, 0, 0, 0, 0], [[1, 4], [[0, 0, 2, 0], [1, 1, 4, 0]], 0, 0, 0, 0, 0, 0], [[1, 4], [[0, 0, 2, 0], [1, 1, 4, 0], [1, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 4, 0]], 0, 0, 0, 0, 0, 0], [[1, 4], [[0, 0, 2, 0], [1, 1, 4, 0]], 0, 0, 0, 0, 0, 0], [[1, 4], [[0, 0, 2, 0], [1, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 1, 5, 0], [0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 5, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 5, 0]], 0, 0, 0, 0, 0, 0], [[1, 5], [[0, 0, 2, 0], [1, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 1, 6, 0], [0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 6, 0]], 0, 0, 0, 0, 0, 0], [[1, 6], [[0, 0, 2, 0], [1, 1, 6, 0]], 0, 0, 0, 0, 0, 0], [[1, 6], [[0, 0, 2, 0], [1, 1, 6, 0], [1, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 6, 0]], 0, 0, 0, 0, 0, 0], [[1, 6], [[0, 0, 2, 0], [1, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 1, 7, 0], [0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 7, 0]], 0, 0, 0, 0, 0, 0], [[1, 7], [[0, 0, 2, 0], [1, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 1, 8, 0], [0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 8, 0]], 0, 0, 0, 0, 0, 0], [[1, 8], [[0, 0, 2, 0], [1, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 1, 9, 0], [0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 1, 10, 0], [0, 0, 2, 0], [1, 2, 9, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 10, 0], [2, 2, 9, 0]], 0, 0, 0, 0, 0, 0], [[2, 9], [[0, 0, 2, 0], [1, 1, 10, 0], [2, 1, 10, 0]], 0, 0, 0, 0, 0, 0], [[1, 10], [[0, 0, 2, 0], [1, 1, 10, 0]], 0, 0, 0, 0, 0, 0], [[1, 10], [[0, 0, 2, 0], [1, 1, 10, 0]], 0, 0, 0, 0, 0, 0], [[1, 10], [[0, 0, 2, 0], [1, 1, 10, 0]], 0, 0, 0, 0, 0, 0], [[1, 10], [[0, 0, 2, 0], [1, 1, 10, 0]], 0, 0, 0, 0, 0, 0], [[1, 10], [[0, 0, 2, 0], [1, 1, 10, 0]], 0, 0, 0, 0, 0, 0], [[1, 10], [[0, 0, 2, 0], [1, 1, 10, 0]], 0, 0, 0, 0, 0, 0], [[1, 10], [[0, 0, 2, 0], [1, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[1, 11], [[0, 0, 2, 0], [1, 1, 11, 0]], 1, 0, 0, 0, 0, 0], [[2, 12], [[0, 0, 2, 0], [2, 1, 12, 0]], 1, 0, 0, 0, 0, 0], [[0, 2], [[0, 1, 13, 0], [0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 13, 0]], 0, 0, 0, 0, 0, 0], [[1, 13], [[0, 0, 2, 0], [1, 1, 13, 0], [1, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 13, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 13, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0], [1, 1, 13, 0]], 0, 0, 0, 0, 0, 0], [[1, 13], [[0, 0, 2, 0], [1, 1, 13, 0], [1, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 1, 14, 0], [0, 0, 2, 0]], 0, 0, 0, 0, 0, 0], [[0, 2], [[0, 0, 2, 0]], 0, 0, 0, 0, 0, 0]]);""")
 

	
 
    def test_index_pagination_hg(self):
 
        self.log_user()
 
        # pagination
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=HG_REPO), {'page': 1})
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=HG_REPO), {'page': 2})
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=HG_REPO), {'page': 3})
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=HG_REPO), {'page': 4})
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=HG_REPO), {'page': 5})
 
        response = self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=HG_REPO), {'page': 6, 'size': 20})
 
        self.app.get(base.url(controller='changelog', action='index',
 
                                    repo_name=base.HG_REPO), {'page': 1})
 
        self.app.get(base.url(controller='changelog', action='index',
 
                                    repo_name=base.HG_REPO), {'page': 2})
 
        self.app.get(base.url(controller='changelog', action='index',
 
                                    repo_name=base.HG_REPO), {'page': 3})
 
        self.app.get(base.url(controller='changelog', action='index',
 
                                    repo_name=base.HG_REPO), {'page': 4})
 
        self.app.get(base.url(controller='changelog', action='index',
 
                                    repo_name=base.HG_REPO), {'page': 5})
 
        response = self.app.get(base.url(controller='changelog', action='index',
 
                                    repo_name=base.HG_REPO), {'page': 6, 'size': 20})
 

	
 
        # Test response after pagination...
 
        response.mustcontain(
 
            """<input class="changeset_range" """
 
            """id="22baf968d547386b9516965ce89d189665003a31" """
 
            """name="22baf968d547386b9516965ce89d189665003a31" """
 
@@ -50,14 +50,14 @@ class TestChangelogController(TestContro
 
        response.mustcontain(
 
            """<a class="changeset_hash" href="/vcs_test_hg/changeset/22baf968d547386b9516965ce89d189665003a31">r539:22baf968d547</a>"""
 
        )
 

	
 
    def test_index_git(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=GIT_REPO))
 
        response = self.app.get(base.url(controller='changelog', action='index',
 
                                    repo_name=base.GIT_REPO))
 

	
 
        response.mustcontain('''id="chg_20" class=""''') # why no mergerow for git?
 
        response.mustcontain(
 
            """<input class="changeset_range" """
 
            """id="95f9a91d775b0084b2368ae7779e44931c849c0e" """
 
            """name="95f9a91d775b0084b2368ae7779e44931c849c0e" """
 
@@ -79,24 +79,24 @@ class TestChangelogController(TestContro
 
#            """more details">3</div>"""
 
#        )
 

	
 
    def test_index_pagination_git(self):
 
        self.log_user()
 
        # pagination
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=GIT_REPO), {'page': 1})
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=GIT_REPO), {'page': 2})
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=GIT_REPO), {'page': 3})
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=GIT_REPO), {'page': 4})
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=GIT_REPO), {'page': 5})
 
        response = self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=GIT_REPO), {'page': 6, 'size': 20})
 
        self.app.get(base.url(controller='changelog', action='index',
 
                                    repo_name=base.GIT_REPO), {'page': 1})
 
        self.app.get(base.url(controller='changelog', action='index',
 
                                    repo_name=base.GIT_REPO), {'page': 2})
 
        self.app.get(base.url(controller='changelog', action='index',
 
                                    repo_name=base.GIT_REPO), {'page': 3})
 
        self.app.get(base.url(controller='changelog', action='index',
 
                                    repo_name=base.GIT_REPO), {'page': 4})
 
        self.app.get(base.url(controller='changelog', action='index',
 
                                    repo_name=base.GIT_REPO), {'page': 5})
 
        response = self.app.get(base.url(controller='changelog', action='index',
 
                                    repo_name=base.GIT_REPO), {'page': 6, 'size': 20})
 

	
 
        # Test response after pagination...
 
        response.mustcontain(
 
            """<input class="changeset_range" """
 
            """id="636ed213f2f11ef91071b9c24f2d5e6bd01a6ed5" """
 
            """name="636ed213f2f11ef91071b9c24f2d5e6bd01a6ed5" """
 
@@ -106,52 +106,52 @@ class TestChangelogController(TestContro
 
        response.mustcontain(
 
            """<a class="changeset_hash" href="/vcs_test_git/changeset/636ed213f2f11ef91071b9c24f2d5e6bd01a6ed5">r515:636ed213f2f1</a>"""
 
        )
 

	
 
    def test_index_hg_with_filenode(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
        response = self.app.get(base.url(controller='changelog', action='index',
 
                                    revision='tip', f_path='/vcs/exceptions.py',
 
                                    repo_name=HG_REPO))
 
                                    repo_name=base.HG_REPO))
 
        # history commits messages
 
        response.mustcontain('Added exceptions module, this time for real')
 
        response.mustcontain('Added not implemented hg backend test case')
 
        response.mustcontain('Added BaseChangeset class')
 
        # Test response...
 

	
 
    def test_index_git_with_filenode(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
        response = self.app.get(base.url(controller='changelog', action='index',
 
                                    revision='tip', f_path='/vcs/exceptions.py',
 
                                    repo_name=GIT_REPO))
 
                                    repo_name=base.GIT_REPO))
 
        # history commits messages
 
        response.mustcontain('Added exceptions module, this time for real')
 
        response.mustcontain('Added not implemented hg backend test case')
 
        response.mustcontain('Added BaseChangeset class')
 

	
 
    def test_index_hg_with_filenode_that_is_dirnode(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
        response = self.app.get(base.url(controller='changelog', action='index',
 
                                    revision='tip', f_path='/tests',
 
                                    repo_name=HG_REPO))
 
                                    repo_name=base.HG_REPO))
 
        assert response.status == '302 Found'
 

	
 
    def test_index_git_with_filenode_that_is_dirnode(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
        response = self.app.get(base.url(controller='changelog', action='index',
 
                                    revision='tip', f_path='/tests',
 
                                    repo_name=GIT_REPO))
 
                                    repo_name=base.GIT_REPO))
 
        assert response.status == '302 Found'
 

	
 
    def test_index_hg_with_filenode_not_existing(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
        response = self.app.get(base.url(controller='changelog', action='index',
 
                                    revision='tip', f_path='/wrong_path',
 
                                    repo_name=HG_REPO))
 
                                    repo_name=base.HG_REPO))
 
        assert response.status == '302 Found'
 

	
 
    def test_index_git_with_filenode_not_existing(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
        response = self.app.get(base.url(controller='changelog', action='index',
 
                                    revision='tip', f_path='/wrong_path',
 
                                    repo_name=GIT_REPO))
 
                                    repo_name=base.GIT_REPO))
 
        assert response.status == '302 Found'
kallithea/tests/functional/test_changeset.py
Show inline comments
 
from kallithea.tests.base import *
 
from kallithea.tests import base
 

	
 

	
 
class TestChangesetController(TestController):
 
class TestChangesetController(base.TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                    repo_name=HG_REPO, revision='tip'))
 
        response = self.app.get(base.url(controller='changeset', action='index',
 
                                    repo_name=base.HG_REPO, revision='tip'))
 
        # Test response...
 

	
 
    def test_changeset_range(self):
 
        #print self.app.get(url(controller='changelog', action='index', repo_name=HG_REPO))
 
        #print self.app.get(base.url(controller='changelog', action='index', repo_name=base.HG_REPO))
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                    repo_name=HG_REPO, revision='a53d9201d4bc278910d416d94941b7ea007ecd52...96507bd11ecc815ebc6270fdf6db110928c09c1e'))
 
        response = self.app.get(base.url(controller='changeset', action='index',
 
                                    repo_name=base.HG_REPO, revision='a53d9201d4bc278910d416d94941b7ea007ecd52...96507bd11ecc815ebc6270fdf6db110928c09c1e'))
 

	
 
        response = self.app.get(url(controller='changeset', action='changeset_raw',
 
                                    repo_name=HG_REPO, revision='a53d9201d4bc278910d416d94941b7ea007ecd52'))
 
        response = self.app.get(base.url(controller='changeset', action='changeset_raw',
 
                                    repo_name=base.HG_REPO, revision='a53d9201d4bc278910d416d94941b7ea007ecd52'))
 

	
 
        response = self.app.get(url(controller='changeset', action='changeset_patch',
 
                                    repo_name=HG_REPO, revision='a53d9201d4bc278910d416d94941b7ea007ecd52'))
 
        response = self.app.get(base.url(controller='changeset', action='changeset_patch',
 
                                    repo_name=base.HG_REPO, revision='a53d9201d4bc278910d416d94941b7ea007ecd52'))
 

	
 
        response = self.app.get(url(controller='changeset', action='changeset_download',
 
                                    repo_name=HG_REPO, revision='a53d9201d4bc278910d416d94941b7ea007ecd52'))
 
        response = self.app.get(base.url(controller='changeset', action='changeset_download',
 
                                    repo_name=base.HG_REPO, revision='a53d9201d4bc278910d416d94941b7ea007ecd52'))
kallithea/tests/functional/test_changeset_pullrequests_comments.py
Show inline comments
 
import re
 

	
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.db import ChangesetComment, PullRequest
 
from kallithea.model.meta import Session
 
from kallithea.tests.base import *
 
from kallithea.tests import base
 

	
 

	
 
class TestChangeSetCommentsController(TestController):
 
class TestChangeSetCommentsController(base.TestController):
 

	
 
    def setup_method(self, method):
 
        for x in ChangesetComment.query().all():
 
            Session().delete(x)
 
        Session().commit()
 

	
 
    def test_create(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'general comment on changeset'
 

	
 
        params = {'text': text, '_session_csrf_secret_token': self.session_csrf_secret_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
        response = self.app.post(base.url(controller='changeset', action='comment',
 
                                     repo_name=base.HG_REPO, revision=rev),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        # Test response...
 
        assert response.status == '200 OK'
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        response = self.app.get(base.url(controller='changeset', action='index',
 
                                repo_name=base.HG_REPO, revision=rev))
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (0 inline, 1 general)'''
 
        )
 
        response.mustcontain(text)
 

	
 
@@ -41,20 +41,20 @@ class TestChangeSetCommentsController(Te
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'inline comment on changeset'
 
        f_path = 'vcs/web/simplevcs/views/repository.py'
 
        line = 'n1'
 

	
 
        params = {'text': text, 'f_path': f_path, 'line': line, '_session_csrf_secret_token': self.session_csrf_secret_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
        response = self.app.post(base.url(controller='changeset', action='comment',
 
                                     repo_name=base.HG_REPO, revision=rev),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        # Test response...
 
        assert response.status == '200 OK'
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        response = self.app.get(base.url(controller='changeset', action='index',
 
                                repo_name=base.HG_REPO, revision=rev))
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (1 inline, 0 general)'''
 
        )
 
        response.mustcontain(
 
            '''<div class="comments-list-chunk" '''
 
@@ -67,104 +67,104 @@ class TestChangeSetCommentsController(Te
 
        assert ChangesetComment.query().count() == 1
 

	
 
    def test_create_with_mention(self):
 
        self.log_user()
 

	
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'@%s check CommentOnRevision' % TEST_USER_REGULAR_LOGIN
 
        text = u'@%s check CommentOnRevision' % base.TEST_USER_REGULAR_LOGIN
 

	
 
        params = {'text': text, '_session_csrf_secret_token': self.session_csrf_secret_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
        response = self.app.post(base.url(controller='changeset', action='comment',
 
                                     repo_name=base.HG_REPO, revision=rev),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        # Test response...
 
        assert response.status == '200 OK'
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        response = self.app.get(base.url(controller='changeset', action='index',
 
                                repo_name=base.HG_REPO, revision=rev))
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (0 inline, 1 general)'''
 
        )
 
        response.mustcontain('<b>@%s</b> check CommentOnRevision' % TEST_USER_REGULAR_LOGIN)
 
        response.mustcontain('<b>@%s</b> check CommentOnRevision' % base.TEST_USER_REGULAR_LOGIN)
 

	
 
        # test DB
 
        assert ChangesetComment.query().count() == 1
 

	
 
    def test_create_status_change(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'general comment on changeset'
 

	
 
        params = {'text': text, 'changeset_status': 'rejected',
 
                '_session_csrf_secret_token': self.session_csrf_secret_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
        response = self.app.post(base.url(controller='changeset', action='comment',
 
                                     repo_name=base.HG_REPO, revision=rev),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        # Test response...
 
        assert response.status == '200 OK'
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        response = self.app.get(base.url(controller='changeset', action='index',
 
                                repo_name=base.HG_REPO, revision=rev))
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (0 inline, 1 general)'''
 
        )
 
        response.mustcontain(text)
 

	
 
        # test DB
 
        assert ChangesetComment.query().count() == 1
 

	
 
        # check status
 
        status = ChangesetStatusModel().get_status(repo=HG_REPO, revision=rev)
 
        status = ChangesetStatusModel().get_status(repo=base.HG_REPO, revision=rev)
 
        assert status == 'rejected'
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'general comment on changeset to be deleted'
 

	
 
        params = {'text': text, '_session_csrf_secret_token': self.session_csrf_secret_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
        response = self.app.post(base.url(controller='changeset', action='comment',
 
                                     repo_name=base.HG_REPO, revision=rev),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 

	
 
        comments = ChangesetComment.query().all()
 
        assert len(comments) == 1
 
        comment_id = comments[0].comment_id
 

	
 
        self.app.post(url("changeset_comment_delete",
 
                                    repo_name=HG_REPO,
 
        self.app.post(base.url("changeset_comment_delete",
 
                                    repo_name=base.HG_REPO,
 
                                    comment_id=comment_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        comments = ChangesetComment.query().all()
 
        assert len(comments) == 0
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        response = self.app.get(base.url(controller='changeset', action='index',
 
                                repo_name=base.HG_REPO, revision=rev))
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 0 comments (0 inline, 0 general)'''
 
        )
 
        response.mustcontain(no=text)
 

	
 

	
 
class TestPullrequestsCommentsController(TestController):
 
class TestPullrequestsCommentsController(base.TestController):
 

	
 
    def setup_method(self, method):
 
        for x in ChangesetComment.query().all():
 
            Session().delete(x)
 
        Session().commit()
 

	
 
    def _create_pr(self):
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
                                 {'org_repo': HG_REPO,
 
        response = self.app.post(base.url(controller='pullrequests', action='create',
 
                                     repo_name=base.HG_REPO),
 
                                 {'org_repo': base.HG_REPO,
 
                                  'org_ref': 'branch:stable:4f7e2131323e0749a740c0a56ab68ae9269c562a',
 
                                  'other_repo': HG_REPO,
 
                                  'other_repo': base.HG_REPO,
 
                                  'other_ref': 'branch:default:96507bd11ecc815ebc6270fdf6db110928c09c1e',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token(),
 
                                 },
 
                                 status=302)
 
@@ -174,20 +174,20 @@ class TestPullrequestsCommentsController
 
    def test_create(self):
 
        self.log_user()
 
        pr_id = self._create_pr()
 

	
 
        text = u'general comment on pullrequest'
 
        params = {'text': text, '_session_csrf_secret_token': self.session_csrf_secret_token()}
 
        response = self.app.post(url(controller='pullrequests', action='comment',
 
                                     repo_name=HG_REPO, pull_request_id=pr_id),
 
        response = self.app.post(base.url(controller='pullrequests', action='comment',
 
                                     repo_name=base.HG_REPO, pull_request_id=pr_id),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        # Test response...
 
        assert response.status == '200 OK'
 

	
 
        response = self.app.get(url(controller='pullrequests', action='show',
 
                                repo_name=HG_REPO, pull_request_id=pr_id, extra=''))
 
        response = self.app.get(base.url(controller='pullrequests', action='show',
 
                                repo_name=base.HG_REPO, pull_request_id=pr_id, extra=''))
 
        # PRs currently always have an initial 'Under Review' status change
 
        # that counts as a general comment, hence '2' in the test below. That
 
        # could be counted as a misfeature, to be reworked later.
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 2 comments (0 inline, 2 general)'''
 
@@ -202,20 +202,20 @@ class TestPullrequestsCommentsController
 
        pr_id = self._create_pr()
 

	
 
        text = u'inline comment on changeset'
 
        f_path = 'vcs/web/simplevcs/views/repository.py'
 
        line = 'n1'
 
        params = {'text': text, 'f_path': f_path, 'line': line, '_session_csrf_secret_token': self.session_csrf_secret_token()}
 
        response = self.app.post(url(controller='pullrequests', action='comment',
 
                                     repo_name=HG_REPO, pull_request_id=pr_id),
 
        response = self.app.post(base.url(controller='pullrequests', action='comment',
 
                                     repo_name=base.HG_REPO, pull_request_id=pr_id),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        # Test response...
 
        assert response.status == '200 OK'
 

	
 
        response = self.app.get(url(controller='pullrequests', action='show',
 
                                repo_name=HG_REPO, pull_request_id=pr_id, extra=''))
 
        response = self.app.get(base.url(controller='pullrequests', action='show',
 
                                repo_name=base.HG_REPO, pull_request_id=pr_id, extra=''))
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 2 comments (1 inline, 1 general)'''
 
        )
 
        response.mustcontain(
 
            '''<div class="comments-list-chunk" '''
 
@@ -228,46 +228,46 @@ class TestPullrequestsCommentsController
 
        assert ChangesetComment.query().count() == 2
 

	
 
    def test_create_with_mention(self):
 
        self.log_user()
 
        pr_id = self._create_pr()
 

	
 
        text = u'@%s check CommentOnRevision' % TEST_USER_REGULAR_LOGIN
 
        text = u'@%s check CommentOnRevision' % base.TEST_USER_REGULAR_LOGIN
 
        params = {'text': text, '_session_csrf_secret_token': self.session_csrf_secret_token()}
 
        response = self.app.post(url(controller='pullrequests', action='comment',
 
                                     repo_name=HG_REPO, pull_request_id=pr_id),
 
        response = self.app.post(base.url(controller='pullrequests', action='comment',
 
                                     repo_name=base.HG_REPO, pull_request_id=pr_id),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        # Test response...
 
        assert response.status == '200 OK'
 

	
 
        response = self.app.get(url(controller='pullrequests', action='show',
 
                                repo_name=HG_REPO, pull_request_id=pr_id, extra=''))
 
        response = self.app.get(base.url(controller='pullrequests', action='show',
 
                                repo_name=base.HG_REPO, pull_request_id=pr_id, extra=''))
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 2 comments (0 inline, 2 general)'''
 
        )
 
        response.mustcontain('<b>@%s</b> check CommentOnRevision' % TEST_USER_REGULAR_LOGIN)
 
        response.mustcontain('<b>@%s</b> check CommentOnRevision' % base.TEST_USER_REGULAR_LOGIN)
 

	
 
        # test DB
 
        assert ChangesetComment.query().count() == 2
 

	
 
    def test_create_status_change(self):
 
        self.log_user()
 
        pr_id = self._create_pr()
 

	
 
        text = u'general comment on pullrequest'
 
        params = {'text': text, 'changeset_status': 'rejected',
 
                '_session_csrf_secret_token': self.session_csrf_secret_token()}
 
        response = self.app.post(url(controller='pullrequests', action='comment',
 
                                     repo_name=HG_REPO, pull_request_id=pr_id),
 
        response = self.app.post(base.url(controller='pullrequests', action='comment',
 
                                     repo_name=base.HG_REPO, pull_request_id=pr_id),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        # Test response...
 
        assert response.status == '200 OK'
 

	
 
        response = self.app.get(url(controller='pullrequests', action='show',
 
                                repo_name=HG_REPO, pull_request_id=pr_id, extra=''))
 
        response = self.app.get(base.url(controller='pullrequests', action='show',
 
                                repo_name=base.HG_REPO, pull_request_id=pr_id, extra=''))
 
        # PRs currently always have an initial 'Under Review' status change
 
        # that counts as a general comment, hence '2' in the test below. That
 
        # could be counted as a misfeature, to be reworked later.
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 2 comments (0 inline, 2 general)'''
 
@@ -275,39 +275,39 @@ class TestPullrequestsCommentsController
 
        response.mustcontain(text)
 

	
 
        # test DB
 
        assert ChangesetComment.query().count() == 2
 

	
 
        # check status
 
        status = ChangesetStatusModel().get_status(repo=HG_REPO, pull_request=pr_id)
 
        status = ChangesetStatusModel().get_status(repo=base.HG_REPO, pull_request=pr_id)
 
        assert status == 'rejected'
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        pr_id = self._create_pr()
 

	
 
        text = u'general comment on changeset to be deleted'
 
        params = {'text': text, '_session_csrf_secret_token': self.session_csrf_secret_token()}
 
        response = self.app.post(url(controller='pullrequests', action='comment',
 
                                     repo_name=HG_REPO, pull_request_id=pr_id),
 
        response = self.app.post(base.url(controller='pullrequests', action='comment',
 
                                     repo_name=base.HG_REPO, pull_request_id=pr_id),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 

	
 
        comments = ChangesetComment.query().all()
 
        assert len(comments) == 2
 
        comment_id = comments[-1].comment_id
 

	
 
        self.app.post(url("pullrequest_comment_delete",
 
                                    repo_name=HG_REPO,
 
        self.app.post(base.url("pullrequest_comment_delete",
 
                                    repo_name=base.HG_REPO,
 
                                    comment_id=comment_id),
 
            params={'_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        comments = ChangesetComment.query().all()
 
        assert len(comments) == 1
 

	
 
        response = self.app.get(url(controller='pullrequests', action='show',
 
                                repo_name=HG_REPO, pull_request_id=pr_id, extra=''))
 
        response = self.app.get(base.url(controller='pullrequests', action='show',
 
                                repo_name=base.HG_REPO, pull_request_id=pr_id, extra=''))
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (0 inline, 1 general)'''
 
        )
 
        response.mustcontain(no=text)
 

	
 
@@ -315,20 +315,20 @@ class TestPullrequestsCommentsController
 
        self.log_user()
 
        pr_id = self._create_pr()
 

	
 
        text = u'general comment on pullrequest'
 
        params = {'text': text, 'save_close': 'close',
 
                '_session_csrf_secret_token': self.session_csrf_secret_token()}
 
        response = self.app.post(url(controller='pullrequests', action='comment',
 
                                     repo_name=HG_REPO, pull_request_id=pr_id),
 
        response = self.app.post(base.url(controller='pullrequests', action='comment',
 
                                     repo_name=base.HG_REPO, pull_request_id=pr_id),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        # Test response...
 
        assert response.status == '200 OK'
 

	
 
        response = self.app.get(url(controller='pullrequests', action='show',
 
                                repo_name=HG_REPO, pull_request_id=pr_id, extra=''))
 
        response = self.app.get(base.url(controller='pullrequests', action='show',
 
                                repo_name=base.HG_REPO, pull_request_id=pr_id, extra=''))
 
        response.mustcontain(
 
            '''title (Closed)'''
 
        )
 
        response.mustcontain(text)
 

	
 
        # test DB
 
@@ -338,40 +338,40 @@ class TestPullrequestsCommentsController
 
        self.log_user()
 
        pr_id = self._create_pr()
 

	
 
        text = u'general comment on pullrequest'
 
        params = {'text': text, 'save_delete': 'delete',
 
                '_session_csrf_secret_token': self.session_csrf_secret_token()}
 
        response = self.app.post(url(controller='pullrequests', action='comment',
 
                                     repo_name=HG_REPO, pull_request_id=pr_id),
 
        response = self.app.post(base.url(controller='pullrequests', action='comment',
 
                                     repo_name=base.HG_REPO, pull_request_id=pr_id),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        # Test response...
 
        assert response.status == '200 OK'
 

	
 
        response = self.app.get(url(controller='pullrequests', action='show',
 
                                repo_name=HG_REPO, pull_request_id=pr_id, extra=''), status=404)
 
        response = self.app.get(base.url(controller='pullrequests', action='show',
 
                                repo_name=base.HG_REPO, pull_request_id=pr_id, extra=''), status=404)
 

	
 
        # test DB
 
        assert PullRequest.get(pr_id) is None
 

	
 
    def test_delete_closed_pr(self):
 
        self.log_user()
 
        pr_id = self._create_pr()
 

	
 
        # first close
 
        text = u'general comment on pullrequest'
 
        params = {'text': text, 'save_close': 'close',
 
                '_session_csrf_secret_token': self.session_csrf_secret_token()}
 
        response = self.app.post(url(controller='pullrequests', action='comment',
 
                                     repo_name=HG_REPO, pull_request_id=pr_id),
 
        response = self.app.post(base.url(controller='pullrequests', action='comment',
 
                                     repo_name=base.HG_REPO, pull_request_id=pr_id),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        assert response.status == '200 OK'
 

	
 
        # attempt delete, should fail
 
        params = {'text': text, 'save_delete': 'delete',
 
                '_session_csrf_secret_token': self.session_csrf_secret_token()}
 
        response = self.app.post(url(controller='pullrequests', action='comment',
 
                                     repo_name=HG_REPO, pull_request_id=pr_id),
 
        response = self.app.post(base.url(controller='pullrequests', action='comment',
 
                                     repo_name=base.HG_REPO, pull_request_id=pr_id),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'}, status=403)
 

	
 
        # verify that PR still exists, in closed state
 
        assert PullRequest.get(pr_id).status == PullRequest.STATUS_CLOSED
kallithea/tests/functional/test_compare.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.tests.base import *
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
def _commit_ref(repo_name, sha, msg):
 
    return '''<div class="message-firstline"><a class="message-link" href="/%s/changeset/%s">%s</a></div>''' % (repo_name, sha, msg)
 

	
 

	
 
class TestCompareController(TestController):
 
class TestCompareController(base.TestController):
 

	
 
    def setup_method(self, method):
 
        self.r1_id = None
 
        self.r2_id = None
 

	
 
    def teardown_method(self, method):
 
@@ -27,13 +27,13 @@ class TestCompareController(TestControll
 
        Session.remove()
 

	
 
    def test_compare_forks_on_branch_extra_commits_hg(self):
 
        self.log_user()
 
        repo1 = fixture.create_repo(u'one', repo_type='hg',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN)
 
                                    cur_user=base.TEST_USER_ADMIN_LOGIN)
 
        self.r1_id = repo1.repo_id
 
        # commit something !
 
        cs0 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
@@ -50,13 +50,13 @@ class TestCompareController(TestControll
 
                content='line1\nline2\nline3\n', message='commit3',
 
                vcs_type='hg', parent=cs1)
 

	
 
        rev1 = 'default'
 
        rev2 = 'default'
 

	
 
        response = self.app.get(url('compare_url',
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=repo1.repo_name,
 
                                    org_ref_type="branch",
 
                                    org_ref_name=rev2,
 
                                    other_repo=repo2.repo_name,
 
                                    other_ref_type="branch",
 
                                    other_ref_name=rev1,
 
@@ -78,13 +78,13 @@ class TestCompareController(TestControll
 
        response.mustcontain("""<a class="btn btn-default btn-sm" href="/%s/compare/branch@%s...branch@%s?other_repo=%s&amp;merge=True"><i class="icon-arrows-cw"></i>Swap</a>""" % (repo2.repo_name, rev1, rev2, repo1.repo_name))
 

	
 
    def test_compare_forks_on_branch_extra_commits_git(self):
 
        self.log_user()
 
        repo1 = fixture.create_repo(u'one-git', repo_type='git',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN)
 
                                    cur_user=base.TEST_USER_ADMIN_LOGIN)
 
        self.r1_id = repo1.repo_id
 
        # commit something !
 
        cs0 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='git',
 
                parent=None, newfile=True)
 

	
 
@@ -101,13 +101,13 @@ class TestCompareController(TestControll
 
                content='line1\nline2\nline3\n', message='commit3',
 
                vcs_type='git', parent=cs1)
 

	
 
        rev1 = 'master'
 
        rev2 = 'master'
 

	
 
        response = self.app.get(url('compare_url',
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=repo1.repo_name,
 
                                    org_ref_type="branch",
 
                                    org_ref_name=rev2,
 
                                    other_repo=repo2.repo_name,
 
                                    other_ref_type="branch",
 
                                    other_ref_name=rev1,
 
@@ -130,13 +130,13 @@ class TestCompareController(TestControll
 

	
 
    def test_compare_forks_on_branch_extra_commits_origin_has_incoming_hg(self):
 
        self.log_user()
 

	
 
        repo1 = fixture.create_repo(u'one', repo_type='hg',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN)
 
                                    cur_user=base.TEST_USER_ADMIN_LOGIN)
 

	
 
        self.r1_id = repo1.repo_id
 

	
 
        # commit something !
 
        cs0 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
@@ -160,13 +160,13 @@ class TestCompareController(TestControll
 
                content='line1\nline2\nline3\n', message='commit3',
 
                vcs_type='hg', parent=cs1)
 

	
 
        rev1 = 'default'
 
        rev2 = 'default'
 

	
 
        response = self.app.get(url('compare_url',
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=repo1.repo_name,
 
                                    org_ref_type="branch",
 
                                    org_ref_name=rev2,
 
                                    other_repo=repo2.repo_name,
 
                                    other_ref_type="branch",
 
                                    other_ref_name=rev1,
 
@@ -189,13 +189,13 @@ class TestCompareController(TestControll
 

	
 
    def test_compare_forks_on_branch_extra_commits_origin_has_incoming_git(self):
 
        self.log_user()
 

	
 
        repo1 = fixture.create_repo(u'one-git', repo_type='git',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN)
 
                                    cur_user=base.TEST_USER_ADMIN_LOGIN)
 

	
 
        self.r1_id = repo1.repo_id
 

	
 
        # commit something !
 
        cs0 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='git',
 
@@ -219,13 +219,13 @@ class TestCompareController(TestControll
 
                content='line1\nline2\nline3\n', message='commit3',
 
                vcs_type='git', parent=cs1)
 

	
 
        rev1 = 'master'
 
        rev2 = 'master'
 

	
 
        response = self.app.get(url('compare_url',
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=repo1.repo_name,
 
                                    org_ref_type="branch",
 
                                    org_ref_name=rev2,
 
                                    other_repo=repo2.repo_name,
 
                                    other_ref_type="branch",
 
                                    other_ref_name=rev1,
 
@@ -260,13 +260,13 @@ class TestCompareController(TestControll
 
#            cs5:
 
        # make repo1, and cs1+cs2
 
        self.log_user()
 

	
 
        repo1 = fixture.create_repo(u'repo1', repo_type='hg',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN)
 
                                    cur_user=base.TEST_USER_ADMIN_LOGIN)
 
        self.r1_id = repo1.repo_id
 

	
 
        # commit something !
 
        cs0 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
@@ -287,13 +287,13 @@ class TestCompareController(TestControll
 
                content='line1\nline2\nline3\nline4\nline5\n',
 
                message='commit5', vcs_type='hg', parent=cs3)
 
        cs5 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\nline2\nline3\nline4\nline5\nline6\n',
 
                message='commit6', vcs_type='hg', parent=cs4)
 

	
 
        response = self.app.get(url('compare_url',
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=repo2.repo_name,
 
                                    org_ref_type="rev",
 
                                    org_ref_name=cs1.short_id,  # parent of cs2, in repo2
 
                                    other_repo=repo1.repo_name,
 
                                    other_ref_type="rev",
 
                                    other_ref_name=cs4.short_id,
 
@@ -328,13 +328,13 @@ class TestCompareController(TestControll
 
#            cs5: x
 
#
 
        # make repo1, and cs1+cs2
 
        self.log_user()
 
        repo1 = fixture.create_repo(u'repo1', repo_type='hg',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN)
 
                                    cur_user=base.TEST_USER_ADMIN_LOGIN)
 
        self.r1_id = repo1.repo_id
 

	
 
        # commit something !
 
        cs0 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
@@ -355,13 +355,13 @@ class TestCompareController(TestControll
 
                content='line1\nline2\nline3\nline4\nline5\n',
 
                message='commit5', vcs_type='hg', parent=cs3)
 
        cs5 = fixture.commit_change(repo1.repo_name, filename='file1',
 
                content='line1\nline2\nline3\nline4\nline5\nline6\n',
 
                message='commit6', vcs_type='hg', parent=cs4)
 

	
 
        response = self.app.get(url('compare_url',
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=repo1.repo_name,
 
                                    org_ref_type="rev",
 
                                    org_ref_name=cs2.short_id, # parent of cs3, not in repo2
 
                                    other_ref_type="rev",
 
                                    other_ref_name=cs5.short_id,
 
                                    merge='1',))
 
@@ -385,86 +385,86 @@ class TestCompareController(TestControll
 
        # TODO: write this
 
        assert 1
 

	
 
    def test_compare_remote_branches_hg(self):
 
        self.log_user()
 

	
 
        repo2 = fixture.create_fork(HG_REPO, HG_FORK)
 
        repo2 = fixture.create_fork(base.HG_REPO, base.HG_FORK)
 
        self.r2_id = repo2.repo_id
 
        rev1 = '56349e29c2af'
 
        rev2 = '7d4bc8ec6be5'
 

	
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=HG_REPO,
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=base.HG_REPO,
 
                                    org_ref_type="rev",
 
                                    org_ref_name=rev1,
 
                                    other_ref_type="rev",
 
                                    other_ref_name=rev2,
 
                                    other_repo=HG_FORK,
 
                                    other_repo=base.HG_FORK,
 
                                    merge='1',))
 

	
 
        response.mustcontain('%s@%s' % (HG_REPO, rev1))
 
        response.mustcontain('%s@%s' % (HG_FORK, rev2))
 
        response.mustcontain('%s@%s' % (base.HG_REPO, rev1))
 
        response.mustcontain('%s@%s' % (base.HG_FORK, rev2))
 
        ## outgoing changesets between those revisions
 

	
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/2dda4e345facb0ccff1a191052dd1606dba6781d">r4:2dda4e345fac</a>""" % (HG_FORK))
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/6fff84722075f1607a30f436523403845f84cd9e">r5:6fff84722075</a>""" % (HG_FORK))
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7">r6:%s</a>""" % (HG_FORK, rev2))
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/2dda4e345facb0ccff1a191052dd1606dba6781d">r4:2dda4e345fac</a>""" % (base.HG_FORK))
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/6fff84722075f1607a30f436523403845f84cd9e">r5:6fff84722075</a>""" % (base.HG_FORK))
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7">r6:%s</a>""" % (base.HG_FORK, rev2))
 

	
 
        ## files
 
        response.mustcontain("""<a href="#C--9c390eb52cd6">vcs/backends/hg.py</a>""")
 
        response.mustcontain("""<a href="#C--41b41c1f2796">vcs/backends/__init__.py</a>""")
 
        response.mustcontain("""<a href="#C--2f574d260608">vcs/backends/base.py</a>""")
 

	
 
    def test_compare_remote_branches_git(self):
 
        self.log_user()
 

	
 
        repo2 = fixture.create_fork(GIT_REPO, GIT_FORK)
 
        repo2 = fixture.create_fork(base.GIT_REPO, base.GIT_FORK)
 
        self.r2_id = repo2.repo_id
 
        rev1 = '102607b09cdd60e2793929c4f90478be29f85a17'
 
        rev2 = 'd7e0d30fbcae12c90680eb095a4f5f02505ce501'
 

	
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=GIT_REPO,
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=base.GIT_REPO,
 
                                    org_ref_type="rev",
 
                                    org_ref_name=rev1,
 
                                    other_ref_type="rev",
 
                                    other_ref_name=rev2,
 
                                    other_repo=GIT_FORK,
 
                                    other_repo=base.GIT_FORK,
 
                                    merge='1',))
 

	
 
        response.mustcontain('%s@%s' % (GIT_REPO, rev1))
 
        response.mustcontain('%s@%s' % (GIT_FORK, rev2))
 
        response.mustcontain('%s@%s' % (base.GIT_REPO, rev1))
 
        response.mustcontain('%s@%s' % (base.GIT_FORK, rev2))
 
        ## outgoing changesets between those revisions
 

	
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/49d3fd156b6f7db46313fac355dca1a0b94a0017">r4:49d3fd156b6f</a>""" % (GIT_FORK))
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/2d1028c054665b962fa3d307adfc923ddd528038">r5:2d1028c05466</a>""" % (GIT_FORK))
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/d7e0d30fbcae12c90680eb095a4f5f02505ce501">r6:%s</a>""" % (GIT_FORK, rev2[:12]))
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/49d3fd156b6f7db46313fac355dca1a0b94a0017">r4:49d3fd156b6f</a>""" % (base.GIT_FORK))
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/2d1028c054665b962fa3d307adfc923ddd528038">r5:2d1028c05466</a>""" % (base.GIT_FORK))
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/d7e0d30fbcae12c90680eb095a4f5f02505ce501">r6:%s</a>""" % (base.GIT_FORK, rev2[:12]))
 

	
 
        ## files
 
        response.mustcontain("""<a href="#C--9c390eb52cd6">vcs/backends/hg.py</a>""")
 
        response.mustcontain("""<a href="#C--41b41c1f2796">vcs/backends/__init__.py</a>""")
 
        response.mustcontain("""<a href="#C--2f574d260608">vcs/backends/base.py</a>""")
 

	
 
    def test_org_repo_new_commits_after_forking_simple_diff_hg(self):
 
        self.log_user()
 

	
 
        repo1 = fixture.create_repo(u'one', repo_type='hg',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN)
 
                                    cur_user=base.TEST_USER_ADMIN_LOGIN)
 

	
 
        self.r1_id = repo1.repo_id
 
        r1_name = repo1.repo_name
 

	
 
        cs0 = fixture.commit_change(repo=r1_name, filename='file1',
 
                content='line1', message='commit1', vcs_type='hg', newfile=True)
 
        Session().commit()
 
        assert repo1.scm_instance.revisions == [cs0.raw_id]
 
        # fork the repo1
 
        repo2 = fixture.create_fork(r1_name, u'one-fork',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN)
 
                                    cur_user=base.TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 
        assert repo2.scm_instance.revisions == [cs0.raw_id]
 
        self.r2_id = repo2.repo_id
 
        r2_name = repo2.repo_name
 

	
 
        cs1 = fixture.commit_change(repo=r2_name, filename='file1-fork',
 
@@ -479,13 +479,13 @@ class TestCompareController(TestControll
 
                content='file3-line1-from-fork', message='commit3-fork',
 
                vcs_type='hg', parent=cs2, newfile=True)
 
        # compare !
 
        rev1 = 'default'
 
        rev2 = 'default'
 

	
 
        response = self.app.get(url('compare_url',
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=r2_name,
 
                                    org_ref_type="branch",
 
                                    org_ref_name=rev2,
 
                                    other_ref_type="branch",
 
                                    other_ref_name=rev1,
 
                                    other_repo=r1_name,
 
@@ -497,13 +497,13 @@ class TestCompareController(TestControll
 
                content='line1-added-after-fork', message='commit2-parent',
 
                vcs_type='hg', parent=None, newfile=True)
 

	
 
        # compare !
 
        rev1 = 'default'
 
        rev2 = 'default'
 
        response = self.app.get(url('compare_url',
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=r2_name,
 
                                    org_ref_type="branch",
 
                                    org_ref_name=rev2,
 
                                    other_ref_type="branch",
 
                                    other_ref_name=rev1,
 
                                    other_repo=r1_name,
 
@@ -519,25 +519,25 @@ class TestCompareController(TestControll
 

	
 
    def test_org_repo_new_commits_after_forking_simple_diff_git(self):
 
        self.log_user()
 

	
 
        repo1 = fixture.create_repo(u'one-git', repo_type='git',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN)
 
                                    cur_user=base.TEST_USER_ADMIN_LOGIN)
 

	
 
        self.r1_id = repo1.repo_id
 
        r1_name = repo1.repo_name
 

	
 
        cs0 = fixture.commit_change(repo=r1_name, filename='file1',
 
                content='line1', message='commit1', vcs_type='git',
 
                newfile=True)
 
        Session().commit()
 
        assert repo1.scm_instance.revisions == [cs0.raw_id]
 
        # fork the repo1
 
        repo2 = fixture.create_fork(r1_name, u'one-git-fork',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN)
 
                                    cur_user=base.TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 
        assert repo2.scm_instance.revisions == [cs0.raw_id]
 
        self.r2_id = repo2.repo_id
 
        r2_name = repo2.repo_name
 

	
 

	
 
@@ -553,13 +553,13 @@ class TestCompareController(TestControll
 
                content='file3-line1-from-fork', message='commit3-fork',
 
                vcs_type='git', parent=cs2, newfile=True)
 
        # compare !
 
        rev1 = 'master'
 
        rev2 = 'master'
 

	
 
        response = self.app.get(url('compare_url',
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=r2_name,
 
                                    org_ref_type="branch",
 
                                    org_ref_name=rev1,
 
                                    other_ref_type="branch",
 
                                    other_ref_name=rev2,
 
                                    other_repo=r1_name,
 
@@ -571,13 +571,13 @@ class TestCompareController(TestControll
 
                content='line1-added-after-fork', message='commit2-parent',
 
                vcs_type='git', parent=None, newfile=True)
 

	
 
        # compare !
 
        rev1 = 'master'
 
        rev2 = 'master'
 
        response = self.app.get(url('compare_url',
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=r2_name,
 
                                    org_ref_type="branch",
 
                                    org_ref_name=rev1,
 
                                    other_ref_type="branch",
 
                                    other_ref_name=rev2,
 
                                    other_repo=r1_name,
kallithea/tests/functional/test_compare_local.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
from kallithea.tests.base import *
 
from kallithea.tests import base
 

	
 

	
 
class TestCompareController(TestController):
 
class TestCompareController(base.TestController):
 

	
 
    def test_compare_tag_hg(self):
 
        self.log_user()
 
        tag1 = 'v0.1.2'
 
        tag2 = 'v0.1.3'
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=HG_REPO,
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=base.HG_REPO,
 
                                    org_ref_type="tag",
 
                                    org_ref_name=tag1,
 
                                    other_ref_type="tag",
 
                                    other_ref_name=tag2,
 
                                    ), status=200)
 
        response.mustcontain('%s@%s' % (HG_REPO, tag1))
 
        response.mustcontain('%s@%s' % (HG_REPO, tag2))
 
        response.mustcontain('%s@%s' % (base.HG_REPO, tag1))
 
        response.mustcontain('%s@%s' % (base.HG_REPO, tag2))
 

	
 
        ## outgoing changesets between tags
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/c5ddebc06eaaba3010c2d66ea6ec9d074eb0f678">r112:c5ddebc06eaa</a>''' % HG_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/70d4cef8a37657ee4cf5aabb3bd9f68879769816">r115:70d4cef8a376</a>''' % HG_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/9749bfbfc0d2eba208d7947de266303b67c87cda">r116:9749bfbfc0d2</a>''' % HG_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/41fda979f02fda216374bf8edac4e83f69e7581c">r117:41fda979f02f</a>''' % HG_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/bb1a3ab98cc45cb934a77dcabf87a5a598b59e97">r118:bb1a3ab98cc4</a>''' % HG_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/36e0fc9d2808c5022a24f49d6658330383ed8666">r119:36e0fc9d2808</a>''' % HG_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/17544fbfcd33ffb439e2b728b5d526b1ef30bfcf">r120:17544fbfcd33</a>''' % HG_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/c5ddebc06eaaba3010c2d66ea6ec9d074eb0f678">r112:c5ddebc06eaa</a>''' % base.HG_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/70d4cef8a37657ee4cf5aabb3bd9f68879769816">r115:70d4cef8a376</a>''' % base.HG_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/9749bfbfc0d2eba208d7947de266303b67c87cda">r116:9749bfbfc0d2</a>''' % base.HG_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/41fda979f02fda216374bf8edac4e83f69e7581c">r117:41fda979f02f</a>''' % base.HG_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/bb1a3ab98cc45cb934a77dcabf87a5a598b59e97">r118:bb1a3ab98cc4</a>''' % base.HG_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/36e0fc9d2808c5022a24f49d6658330383ed8666">r119:36e0fc9d2808</a>''' % base.HG_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/17544fbfcd33ffb439e2b728b5d526b1ef30bfcf">r120:17544fbfcd33</a>''' % base.HG_REPO)
 

	
 
        response.mustcontain('11 files changed with 94 insertions and 64 deletions')
 

	
 
        ## files diff
 
        response.mustcontain(
 
                   '''<span class="node">
 
@@ -77,30 +77,30 @@ class TestCompareController(TestControll
 
                          <a href="#C--3150cb87d4b7">vcs/utils/lazy.py</a>''')
 

	
 
    def test_compare_tag_git(self):
 
        self.log_user()
 
        tag1 = 'v0.1.2'
 
        tag2 = 'v0.1.3'
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=GIT_REPO,
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=base.GIT_REPO,
 
                                    org_ref_type="tag",
 
                                    org_ref_name=tag1,
 
                                    other_ref_type="tag",
 
                                    other_ref_name=tag2,
 
                                    ), status=200)
 
        response.mustcontain('%s@%s' % (GIT_REPO, tag1))
 
        response.mustcontain('%s@%s' % (GIT_REPO, tag2))
 
        response.mustcontain('%s@%s' % (base.GIT_REPO, tag1))
 
        response.mustcontain('%s@%s' % (base.GIT_REPO, tag2))
 

	
 
        ## outgoing changesets between tags
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/794bbdd31545c199f74912709ea350dedcd189a2">r113:794bbdd31545</a>''' % GIT_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/e36d8c5025329bdd4212bd53d4ed8a70ff44985f">r115:e36d8c502532</a>''' % GIT_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/5c9ff4f6d7508db0e72b1d2991c357d0d8e07af2">r116:5c9ff4f6d750</a>''' % GIT_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/b7187fa2b8c1d773ec35e9dee12f01f74808c879">r117:b7187fa2b8c1</a>''' % GIT_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/5f3b74262014a8de2dc7dade1152de9fd0c8efef">r118:5f3b74262014</a>''' % GIT_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/17438a11f72b93f56d0e08e7d1fa79a378578a82">r119:17438a11f72b</a>''' % GIT_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/5a3a8fb005554692b16e21dee62bf02667d8dc3e">r120:5a3a8fb00555</a>''' % GIT_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/794bbdd31545c199f74912709ea350dedcd189a2">r113:794bbdd31545</a>''' % base.GIT_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/e36d8c5025329bdd4212bd53d4ed8a70ff44985f">r115:e36d8c502532</a>''' % base.GIT_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/5c9ff4f6d7508db0e72b1d2991c357d0d8e07af2">r116:5c9ff4f6d750</a>''' % base.GIT_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/b7187fa2b8c1d773ec35e9dee12f01f74808c879">r117:b7187fa2b8c1</a>''' % base.GIT_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/5f3b74262014a8de2dc7dade1152de9fd0c8efef">r118:5f3b74262014</a>''' % base.GIT_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/17438a11f72b93f56d0e08e7d1fa79a378578a82">r119:17438a11f72b</a>''' % base.GIT_REPO)
 
        response.mustcontain('''<a class="changeset_hash" href="/%s/changeset/5a3a8fb005554692b16e21dee62bf02667d8dc3e">r120:5a3a8fb00555</a>''' % base.GIT_REPO)
 

	
 
        response.mustcontain('11 files changed with 94 insertions and 64 deletions')
 

	
 
        # files
 
        response.mustcontain('''<a href="#C--1c5cf9e91c12">docs/api/utils/index.rst</a>''')
 
        response.mustcontain('''<a href="#C--e3305437df55">test_and_report.sh</a>''')
 
@@ -113,120 +113,120 @@ class TestCompareController(TestControll
 
        response.mustcontain('''<a href="#C--7abc741b5052">vcs/utils/annotate.py</a>''')
 
        response.mustcontain('''<a href="#C--2ef0ef106c56">vcs/utils/diffs.py</a>''')
 
        response.mustcontain('''<a href="#C--3150cb87d4b7">vcs/utils/lazy.py</a>''')
 

	
 
    def test_index_branch_hg(self):
 
        self.log_user()
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=HG_REPO,
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=base.HG_REPO,
 
                                    org_ref_type="branch",
 
                                    org_ref_name='default',
 
                                    other_ref_type="branch",
 
                                    other_ref_name='default',
 
                                    ))
 

	
 
        response.mustcontain('%s@default' % (HG_REPO))
 
        response.mustcontain('%s@default' % (HG_REPO))
 
        response.mustcontain('%s@default' % (base.HG_REPO))
 
        response.mustcontain('%s@default' % (base.HG_REPO))
 
        # branch are equal
 
        response.mustcontain('<span class="text-muted">No files</span>')
 
        response.mustcontain('<span class="text-muted">No changesets</span>')
 

	
 
    def test_index_branch_git(self):
 
        self.log_user()
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=GIT_REPO,
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=base.GIT_REPO,
 
                                    org_ref_type="branch",
 
                                    org_ref_name='master',
 
                                    other_ref_type="branch",
 
                                    other_ref_name='master',
 
                                    ))
 

	
 
        response.mustcontain('%s@master' % (GIT_REPO))
 
        response.mustcontain('%s@master' % (GIT_REPO))
 
        response.mustcontain('%s@master' % (base.GIT_REPO))
 
        response.mustcontain('%s@master' % (base.GIT_REPO))
 
        # branch are equal
 
        response.mustcontain('<span class="text-muted">No files</span>')
 
        response.mustcontain('<span class="text-muted">No changesets</span>')
 

	
 
    def test_compare_revisions_hg(self):
 
        self.log_user()
 
        rev1 = 'b986218ba1c9'
 
        rev2 = '3d8f361e72ab'
 

	
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=HG_REPO,
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=base.HG_REPO,
 
                                    org_ref_type="rev",
 
                                    org_ref_name=rev1,
 
                                    other_ref_type="rev",
 
                                    other_ref_name=rev2,
 
                                    ))
 
        response.mustcontain('%s@%s' % (HG_REPO, rev1))
 
        response.mustcontain('%s@%s' % (HG_REPO, rev2))
 
        response.mustcontain('%s@%s' % (base.HG_REPO, rev1))
 
        response.mustcontain('%s@%s' % (base.HG_REPO, rev2))
 

	
 
        ## outgoing changesets between those revisions
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/3d8f361e72ab303da48d799ff1ac40d5ac37c67e">r1:%s</a>""" % (HG_REPO, rev2))
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/3d8f361e72ab303da48d799ff1ac40d5ac37c67e">r1:%s</a>""" % (base.HG_REPO, rev2))
 

	
 
        response.mustcontain('1 file changed with 7 insertions and 0 deletions')
 
        ## files
 
        response.mustcontain("""<a href="#C--c8e92ef85cd1">.hgignore</a>""")
 

	
 
    def test_compare_revisions_git(self):
 
        self.log_user()
 
        rev1 = 'c1214f7e79e02fc37156ff215cd71275450cffc3'
 
        rev2 = '38b5fe81f109cb111f549bfe9bb6b267e10bc557'
 

	
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=GIT_REPO,
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=base.GIT_REPO,
 
                                    org_ref_type="rev",
 
                                    org_ref_name=rev1,
 
                                    other_ref_type="rev",
 
                                    other_ref_name=rev2,
 
                                    ))
 
        response.mustcontain('%s@%s' % (GIT_REPO, rev1))
 
        response.mustcontain('%s@%s' % (GIT_REPO, rev2))
 
        response.mustcontain('%s@%s' % (base.GIT_REPO, rev1))
 
        response.mustcontain('%s@%s' % (base.GIT_REPO, rev2))
 

	
 
        ## outgoing changesets between those revisions
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/38b5fe81f109cb111f549bfe9bb6b267e10bc557">r1:%s</a>""" % (GIT_REPO, rev2[:12]))
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/38b5fe81f109cb111f549bfe9bb6b267e10bc557">r1:%s</a>""" % (base.GIT_REPO, rev2[:12]))
 
        response.mustcontain('1 file changed with 7 insertions and 0 deletions')
 

	
 
        ## files
 
        response.mustcontain("""<a href="#C--c8e92ef85cd1">.hgignore</a>""")
 

	
 
    def test_compare_revisions_hg_is_ajax_preview(self):
 
        self.log_user()
 
        rev1 = 'b986218ba1c9'
 
        rev2 = '3d8f361e72ab'
 

	
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=HG_REPO,
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=base.HG_REPO,
 
                                    org_ref_type="rev",
 
                                    org_ref_name=rev1,
 
                                    other_ref_type="rev",
 
                                    other_ref_name=rev2,
 
                                    is_ajax_preview=True,
 
                                    ),
 
                                extra_environ={'HTTP_X_PARTIAL_XHR': '1'},)
 

	
 
        ## outgoing changesets between those revisions
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/3d8f361e72ab303da48d799ff1ac40d5ac37c67e">r1:%s</a>""" % (HG_REPO, rev2))
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/3d8f361e72ab303da48d799ff1ac40d5ac37c67e">r1:%s</a>""" % (base.HG_REPO, rev2))
 

	
 
        response.mustcontain('Merge Ancestor')
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/b986218ba1c9b0d6a259fac9b050b1724ed8e545">%s</a>""" % (HG_REPO, rev1))
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/b986218ba1c9b0d6a259fac9b050b1724ed8e545">%s</a>""" % (base.HG_REPO, rev1))
 

	
 
    def test_compare_revisions_git_is_ajax_preview(self):
 
        self.log_user()
 
        rev1 = 'c1214f7e79e02fc37156ff215cd71275450cffc3'
 
        rev2 = '38b5fe81f109cb111f549bfe9bb6b267e10bc557'
 

	
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=GIT_REPO,
 
        response = self.app.get(base.url('compare_url',
 
                                    repo_name=base.GIT_REPO,
 
                                    org_ref_type="rev",
 
                                    org_ref_name=rev1,
 
                                    other_ref_type="rev",
 
                                    other_ref_name=rev2,
 
                                    is_ajax_preview=True,
 
                                    ),
 
                                extra_environ={'HTTP_X_PARTIAL_XHR': '1'},)
 
        ## outgoing changesets between those revisions
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/38b5fe81f109cb111f549bfe9bb6b267e10bc557">r1:%s</a>""" % (GIT_REPO, rev2[:12]))
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/38b5fe81f109cb111f549bfe9bb6b267e10bc557">r1:%s</a>""" % (base.GIT_REPO, rev2[:12]))
 

	
 
        response.mustcontain('Merge Ancestor')
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/c1214f7e79e02fc37156ff215cd71275450cffc3">%s</a>""" % (GIT_REPO, rev1[:12]))
 
        response.mustcontain("""<a class="changeset_hash" href="/%s/changeset/c1214f7e79e02fc37156ff215cd71275450cffc3">%s</a>""" % (base.GIT_REPO, rev1[:12]))
kallithea/tests/functional/test_feed.py
Show inline comments
 
from kallithea.tests.base import *
 
from kallithea.tests import base
 

	
 

	
 
class TestFeedController(TestController):
 
class TestFeedController(base.TestController):
 

	
 
    def test_rss(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='feed', action='rss',
 
                                    repo_name=HG_REPO))
 
        response = self.app.get(base.url(controller='feed', action='rss',
 
                                    repo_name=base.HG_REPO))
 

	
 
        assert response.content_type == "application/rss+xml"
 
        assert """<rss version="2.0">""" in response
 

	
 
    def test_atom(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='feed', action='atom',
 
                                    repo_name=HG_REPO))
 
        response = self.app.get(base.url(controller='feed', action='atom',
 
                                    repo_name=base.HG_REPO))
 

	
 
        assert response.content_type == """application/atom+xml"""
 
        assert """<?xml version="1.0" encoding="utf-8"?>""" in response
 
        assert """<feed xmlns="http://www.w3.org/2005/Atom" xml:lang="en-us">""" in response

Changeset was too big and was cut off... Show full diff anyway

0 comments (0 inline, 0 general)