Changeset - 94bbb7eb5b64
[Not reviewed]
default
0 5 0
Lars Kruse - 8 years ago 2017-08-25 14:33:18
devel@sumpfralle.de
codingstyle: replace upper-case variable names with lower-case ones

PEP8 issues reported by flake8.
5 files changed with 56 insertions and 52 deletions:
0 comments (0 inline, 0 general)
kallithea/tests/functional/test_changeset_comments.py
Show inline comments
 
@@ -18,91 +18,93 @@ class TestChangeSetCommentsController(Te
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'CommentOnRevision'
 

	
 
        params = {'text': text, '_authentication_token': self.authentication_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        # Test response...
 
        assert response.status == '200 OK'
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        # test DB
 
        assert ChangesetComment.query().count() == 1
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (0 inline, 1 general)'''
 
        )
 

	
 
        assert Notification.query().count() == 1
 
        assert ChangesetComment.query().count() == 1
 

	
 
        notification = Notification.query().all()[0]
 

	
 
        ID = ChangesetComment.query().first().comment_id
 
        commit_id = ChangesetComment.query().first().comment_id
 
        assert notification.type_ == Notification.TYPE_CHANGESET_COMMENT
 
        sbj = (u'/%s/changeset/'
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s' % (HG_REPO, ID))
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s'
 
               % (HG_REPO, commit_id))
 
        print "%s vs %s" % (sbj, notification.subject)
 
        assert sbj in notification.subject
 

	
 
    def test_create_inline(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'CommentOnRevision'
 
        f_path = 'vcs/web/simplevcs/views/repository.py'
 
        line = 'n1'
 

	
 
        params = {'text': text, 'f_path': f_path, 'line': line, '_authentication_token': self.authentication_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        # Test response...
 
        assert response.status == '200 OK'
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        # test DB
 
        assert ChangesetComment.query().count() == 1
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (1 inline, 0 general)'''
 
        )
 
        response.mustcontain(
 
            '''<div class="comments-list-chunk" '''
 
            '''data-f_path="vcs/web/simplevcs/views/repository.py" '''
 
            '''data-line_no="n1" data-target-id="vcswebsimplevcsviewsrepositorypy_n1">'''
 
        )
 

	
 
        assert Notification.query().count() == 1
 
        assert ChangesetComment.query().count() == 1
 

	
 
        notification = Notification.query().all()[0]
 
        ID = ChangesetComment.query().first().comment_id
 
        commit_id = ChangesetComment.query().first().comment_id
 
        assert notification.type_ == Notification.TYPE_CHANGESET_COMMENT
 
        sbj = (u'/%s/changeset/'
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s' % (HG_REPO, ID))
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s'
 
               % (HG_REPO, commit_id))
 
        print "%s vs %s" % (sbj, notification.subject)
 
        assert sbj in notification.subject
 

	
 
    def test_create_with_mention(self):
 
        self.log_user()
 

	
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'@%s check CommentOnRevision' % TEST_USER_REGULAR_LOGIN
 

	
 
        params = {'text': text, '_authentication_token': self.authentication_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        # Test response...
 
        assert response.status == '200 OK'
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        # test DB
 
        assert ChangesetComment.query().count() == 1
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (0 inline, 1 general)'''
 
        )
kallithea/tests/other/test_libs.py
Show inline comments
 
@@ -276,54 +276,54 @@ class TestLibs(TestController):
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'user'}, '/prefix/', 'http://user@vps1:8000/prefix/group/repo1'),
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'username'}, '/prefix/', 'http://username@vps1:8000/prefix/group/repo1'),
 
        ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', {}, '', 'http://vps1:8000/_23'),
 
        ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', {'user': 'username'}, '', 'http://username@vps1:8000/_23'),
 
        ('http://{user}@{netloc}/_{repoid}', 'group/repo1', {'user': 'username'}, '', 'http://username@vps1:8000/_23'),
 
        ('http://{netloc}/_{repoid}', 'group/repo1', {'user': 'username'}, '', 'http://vps1:8000/_23'),
 
        ('https://{user}@proxy1.example.com/{repo}', 'group/repo1', {'user': 'username'}, '', 'https://username@proxy1.example.com/group/repo1'),
 
        ('https://{user}@proxy1.example.com/{repo}', 'group/repo1', {}, '', 'https://proxy1.example.com/group/repo1'),
 
        ('https://proxy1.example.com/{user}/{repo}', 'group/repo1', {'user': 'username'}, '', 'https://proxy1.example.com/username/group/repo1'),
 
    ])
 
    def test_clone_url_generator(self, tmpl, repo_name, overrides, prefix, expected):
 
        from kallithea.lib.utils2 import get_clone_url
 
        clone_url = get_clone_url(uri_tmpl=tmpl, qualified_home_url='http://vps1:8000'+prefix,
 
                                  repo_name=repo_name, repo_id=23, **overrides)
 
        assert clone_url == expected
 

	
 
    def _quick_url(self, text, tmpl="""<a class="changeset_hash" href="%s">%s</a>""", url_=None):
 
        """
 
        Changes `some text url[foo]` => `some text <a href="/">foo</a>
 

	
 
        :param text:
 
        """
 
        import re
 
        # quickly change expected url[] into a link
 
        URL_PAT = re.compile(r'(?:url\[)(.+?)(?:\])')
 
        url_pattern = re.compile(r'(?:url\[)(.+?)(?:\])')
 

	
 
        def url_func(match_obj):
 
            _url = match_obj.groups()[0]
 
            return tmpl % (url_ or '/repo_name/changeset/%s' % _url, _url)
 
        return URL_PAT.sub(url_func, text)
 
        return url_pattern.sub(url_func, text)
 

	
 
    @parametrize('sample,expected', [
 
      ("",
 
       ""),
 
      ("git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68",
 
       """git-svn-id: <a href="https://svn.apache.org/repos/asf/libcloud/trunk@1441655">https://svn.apache.org/repos/asf/libcloud/trunk@1441655</a> 13f79535-47bb-0310-9956-ffa450edef68"""),
 
      ("from rev 000000000000",
 
       """from rev url[000000000000]"""),
 
      ("from rev 000000000000123123 also rev 000000000000",
 
       """from rev url[000000000000123123] also rev url[000000000000]"""),
 
      ("this should-000 00",
 
       """this should-000 00"""),
 
      ("longtextffffffffff rev 123123123123",
 
       """longtextffffffffff rev url[123123123123]"""),
 
      ("rev ffffffffffffffffffffffffffffffffffffffffffffffffff",
 
       """rev ffffffffffffffffffffffffffffffffffffffffffffffffff"""),
 
      ("ffffffffffff some text traalaa",
 
       """url[ffffffffffff] some text traalaa"""),
 
       ("""Multi line
 
       123123123123
 
       some text 123123123123
 
       sometimes !
 
       """,
 
       """Multi line<br/>"""
kallithea/tests/other/test_vcs_operations.py
Show inline comments
 
@@ -62,59 +62,59 @@ class Command(object):
 
        if DEBUG:
 
            print '*** CMD %s ***' % command
 
        testenv = dict(os.environ)
 
        testenv['LANG'] = 'en_US.UTF-8'
 
        testenv['LANGUAGE'] = 'en_US:en'
 
        testenv['HGPLAIN'] = ''
 
        testenv['HGRCPATH'] = ''
 
        testenv.update(environ)
 
        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd, env=testenv)
 
        stdout, stderr = p.communicate()
 
        if DEBUG:
 
            if stdout:
 
                print 'stdout:', repr(stdout)
 
            if stderr:
 
                print 'stderr:', repr(stderr)
 
        if not ignoreReturnCode:
 
            assert p.returncode == 0
 
        return stdout, stderr
 

	
 

	
 
def _get_tmp_dir(prefix='vcs_operations-', suffix=''):
 
    return tempfile.mkdtemp(dir=TESTS_TMP_PATH, prefix=prefix, suffix=suffix)
 

	
 

	
 
def _add_files_and_push(webserver, vcs, DEST, ignoreReturnCode=False, files_no=3,
 
def _add_files_and_push(webserver, vcs, dest_dir, ignoreReturnCode=False, files_no=3,
 
                        clone_url=None, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS):
 
    """
 
    Generate some files, add it to DEST repo and push back
 
    Generate some files, add it to dest_dir repo and push back
 
    vcs is git or hg and defines what VCS we want to make those files for
 

	
 
    :param vcs:
 
    :param DEST:
 
    :param dest_dir:
 
    """
 
    # commit some stuff into this repo
 
    cwd = os.path.join(DEST)
 
    cwd = os.path.join(dest_dir)
 
    #added_file = '%ssetupążźć.py' % _RandomNameSequence().next()
 
    added_file = '%ssetup.py' % _RandomNameSequence().next()
 
    open(os.path.join(cwd, added_file), 'a').close()
 
    Command(cwd).execute('%s add %s' % (vcs, added_file))
 

	
 
    email = 'me@example.com'
 
    if os.name == 'nt':
 
        author_str = 'User <%s>' % email
 
    else:
 
        author_str = 'User ǝɯɐᴎ <%s>' % email
 
    for i in xrange(files_no):
 
        cmd = """echo "added_line%s" >> %s""" % (i, added_file)
 
        Command(cwd).execute(cmd)
 
        if vcs == 'hg':
 
            cmd = """hg commit -m "committed new %s" -u "%s" "%s" """ % (
 
                i, author_str, added_file
 
            )
 
        elif vcs == 'git':
 
            cmd = """git commit -m "committed new %s" --author "%s" "%s" """ % (
 
                i, author_str, added_file
 
            )
 
        # git commit needs EMAIL on some machines
 
        Command(cwd).execute(cmd, EMAIL=email)
 

	
 
@@ -206,160 +206,162 @@ class TestVCSOperations(TestController):
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'fatal: Authentication failed' in stderr
 

	
 
    def test_clone_git_dir_as_hg(self, webserver):
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_clone_hg_repo_as_git(self, webserver):
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'not found' in stderr
 

	
 
    def test_clone_non_existing_path_hg(self, webserver):
 
        clone_url = webserver.repo_url('trololo')
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_clone_non_existing_path_git(self, webserver):
 
        clone_url = webserver.repo_url('trololo')
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'not found' in stderr
 

	
 
    def test_push_new_file_hg(self, webserver):
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(HG_REPO, fork_name)
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, clone_url=clone_url)
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, clone_url=clone_url)
 

	
 
        assert 'pushing to' in stdout
 
        assert 'Repository size' in stdout
 
        assert 'Last revision is now' in stdout
 

	
 
    def test_push_new_file_git(self, webserver):
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        # commit some stuff into this repo
 
        fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(GIT_REPO, fork_name)
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = _add_files_and_push(webserver, 'git', DEST, clone_url=clone_url)
 
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, clone_url=clone_url)
 
        print [(x.repo_full_path,x.repo_path) for x in Repository.query()] # TODO: what is this for
 
        _check_proper_git_push(stdout, stderr)
 

	
 
    def test_push_invalidates_cache_hg(self, webserver):
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               == HG_REPO).scalar()
 
        if not key:
 
            key = CacheInvalidation(HG_REPO, HG_REPO)
 
            Session().add(key)
 

	
 
        key.cache_active = True
 
        Session().commit()
 

	
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(HG_REPO, fork_name)
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, files_no=1, clone_url=clone_url)
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, files_no=1, clone_url=clone_url)
 

	
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               == fork_name).all()
 
        assert key == []
 

	
 
    def test_push_invalidates_cache_git(self, webserver):
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               == GIT_REPO).scalar()
 
        if not key:
 
            key = CacheInvalidation(GIT_REPO, GIT_REPO)
 
            Session().add(key)
 

	
 
        key.cache_active = True
 
        Session().commit()
 

	
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        # commit some stuff into this repo
 
        fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(GIT_REPO, fork_name)
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = _add_files_and_push(webserver, 'git', DEST, files_no=1, clone_url=clone_url)
 
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, files_no=1, clone_url=clone_url)
 
        _check_proper_git_push(stdout, stderr)
 

	
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               == fork_name).all()
 
        assert key == []
 

	
 
    def test_push_wrong_credentials_hg(self, webserver):
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, username='bad',
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, username='bad',
 
                                             password='name', ignoreReturnCode=True)
 

	
 
        assert 'abort: authorization failed' in stderr
 

	
 
    def test_push_wrong_credentials_git(self, webserver):
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, 'git', DEST, username='bad',
 
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, username='bad',
 
                                             password='name', ignoreReturnCode=True)
 

	
 
        assert 'fatal: Authentication failed' in stderr
 

	
 
    def test_push_back_to_wrong_url_hg(self, webserver):
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST,
 
                                    clone_url='http://%s:%s/tmp' % (webserver.server_address[0], webserver.server_address[1]),
 
        stdout, stderr = _add_files_and_push(
 
            webserver, 'hg', dest_dir, clone_url='http://%s:%s/tmp' % (
 
                webserver.server_address[0], webserver.server_address[1]),
 
                                    ignoreReturnCode = True)
 

	
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_push_back_to_wrong_url_git(self, webserver):
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, 'git', DEST,
 
                                    clone_url='http://%s:%s/tmp' % (webserver.server_address[0], webserver.server_address[1]),
 
        stdout, stderr = _add_files_and_push(
 
            webserver, 'git', dest_dir, clone_url='http://%s:%s/tmp' % (
 
                webserver.server_address[0], webserver.server_address[1]),
 
                                    ignoreReturnCode = True)
 

	
 
        assert 'not found' in stderr
 

	
 
    def test_clone_and_create_lock_hg(self, webserver):
 
        # enable locking
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        r.enable_locking = True
 
        Session().commit()
 
        # clone
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir())
 

	
 
        # check if lock was made
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 
    def test_clone_and_create_lock_git(self, webserver):
 
        # enable locking
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        r.enable_locking = True
 
        Session().commit()
 
        # clone
 
        clone_url = webserver.repo_url(GIT_REPO)
 
@@ -371,146 +373,146 @@ class TestVCSOperations(TestController):
 

	
 
    def test_clone_after_repo_was_locked_hg(self, webserver):
 
        # lock repo
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 
        # pull fails since repo is locked
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
 
        assert msg in stderr
 

	
 
    def test_clone_after_repo_was_locked_git(self, webserver):
 
        # lock repo
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 
        # pull fails since repo is locked
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        msg = ("""The requested URL returned error: 423""")
 
        assert msg in stderr
 

	
 
    def test_push_on_locked_repo_by_other_user_hg(self, webserver):
 
        # clone some temp
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        # lock repo
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        # let this user actually push !
 
        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
 
                                          perm='repository.write')
 
        Session().commit()
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 

	
 
        # push fails repo is locked by other user !
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST,
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir,
 
                                             username=TEST_USER_REGULAR_LOGIN,
 
                                             password=TEST_USER_REGULAR_PASS,
 
                                             ignoreReturnCode=True)
 
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
 
        assert msg in stderr
 

	
 
    def test_push_on_locked_repo_by_other_user_git(self, webserver):
 
        # Note: Git hooks must be executable on unix. This test will thus fail
 
        # for example on Linux if /tmp is mounted noexec.
 

	
 
        # clone some temp
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        # lock repo
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        # let this user actually push !
 
        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
 
                                          perm='repository.write')
 
        Session().commit()
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 

	
 
        # push fails repo is locked by other user !
 
        stdout, stderr = _add_files_and_push(webserver, 'git', DEST,
 
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir,
 
                                             username=TEST_USER_REGULAR_LOGIN,
 
                                             password=TEST_USER_REGULAR_PASS,
 
                                             ignoreReturnCode=True)
 
        err = 'Repository `%s` locked by user `%s`' % (GIT_REPO, TEST_USER_ADMIN_LOGIN)
 
        assert err in stderr
 

	
 
        # TODO: fix this somehow later on Git, Git is stupid and even if we throw
 
        # back 423 to it, it makes ANOTHER request and we fail there with 405 :/
 

	
 
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
                % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
 
        #msg = "405 Method Not Allowed"
 
        #assert msg in stderr
 

	
 
    def test_push_unlocks_repository_hg(self, webserver):
 
        # enable locking
 
        fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(HG_REPO, fork_name)
 
        r = Repository.get_by_repo_name(fork_name)
 
        r.enable_locking = True
 
        Session().commit()
 
        # clone some temp
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        # check for lock repo after clone
 
        r = Repository.get_by_repo_name(fork_name)
 
        uid = User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 
        assert r.locked[0] == uid
 

	
 
        # push is ok and repo is now unlocked
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, clone_url=clone_url)
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, clone_url=clone_url)
 
        assert str('remote: Released lock on repo `%s`' % fork_name) in stdout
 
        # we need to cleanup the Session Here !
 
        Session.remove()
 
        r = Repository.get_by_repo_name(fork_name)
 
        assert r.locked == [None, None]
 

	
 
    # TODO: fix me ! somehow during tests hooks don't get called on Git
 
    def test_push_unlocks_repository_git(self, webserver):
 
        # enable locking
 
        fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(GIT_REPO, fork_name)
 
        r = Repository.get_by_repo_name(fork_name)
 
        r.enable_locking = True
 
        Session().commit()
 
        # clone some temp
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        # check for lock repo after clone
 
        r = Repository.get_by_repo_name(fork_name)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 
        # push is ok and repo is now unlocked
 
        stdout, stderr = _add_files_and_push(webserver, 'git', DEST, clone_url=clone_url)
 
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, clone_url=clone_url)
 
        _check_proper_git_push(stdout, stderr)
 

	
 
        assert ('remote: Released lock on repo `%s`' % fork_name) in stderr
 
        # we need to cleanup the Session Here !
 
        Session.remove()
 
        r = Repository.get_by_repo_name(fork_name)
 
        assert r.locked == [None, None]
 

	
 
    def test_ip_restriction_hg(self, webserver):
 
        user_model = UserModel()
 
        try:
 
            user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
 
            Session().commit()
 
            clone_url = webserver.repo_url(HG_REPO)
 
            stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
            assert 'abort: HTTP Error 403: Forbidden' in stderr
 
        finally:
 
            # release IP restrictions
 
            for ip in UserIpMap.query():
 
                UserIpMap.delete(ip.ip_id)
 
            Session().commit()
 

	
 
        # IP permissions are cached, need to wait for the cache in the server process to expire
 
        time.sleep(1.5)
kallithea/tests/vcs/test_git.py
Show inline comments
 
@@ -215,63 +215,63 @@ class GitRepositoryTest(unittest.TestCas
 
        for path in ('', 'vcs', 'vcs/backends'):
 
            self.assertTrue(isinstance(init_chset.get_node(path), DirNode))
 

	
 
        self.assertRaises(NodeDoesNotExistError, init_chset.get_node, path='foobar')
 

	
 
        node = init_chset.get_node('vcs/')
 
        self.assertTrue(hasattr(node, 'kind'))
 
        self.assertEqual(node.kind, NodeKind.DIR)
 

	
 
        node = init_chset.get_node('vcs')
 
        self.assertTrue(hasattr(node, 'kind'))
 
        self.assertEqual(node.kind, NodeKind.DIR)
 

	
 
        node = init_chset.get_node('vcs/__init__.py')
 
        self.assertTrue(hasattr(node, 'kind'))
 
        self.assertEqual(node.kind, NodeKind.FILE)
 

	
 
    def test_not_existing_changeset(self):
 
        self.assertRaises(RepositoryError, self.repo.get_changeset,
 
            'f' * 40)
 

	
 
    def test_changeset10(self):
 

	
 
        chset10 = self.repo.get_changeset(self.repo.revisions[9])
 
        README = """===
 
        readme = """===
 
VCS
 
===
 

	
 
Various Version Control System management abstraction layer for Python.
 

	
 
Introduction
 
------------
 

	
 
TODO: To be written...
 

	
 
"""
 
        node = chset10.get_node('README.rst')
 
        self.assertEqual(node.kind, NodeKind.FILE)
 
        self.assertEqual(node.content, README)
 
        self.assertEqual(node.content, readme)
 

	
 

	
 
class GitChangesetTest(unittest.TestCase):
 

	
 
    def setUp(self):
 
        self.repo = GitRepository(TEST_GIT_REPO)
 

	
 
    def test_default_changeset(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(tip, self.repo.get_changeset(None))
 
        self.assertEqual(tip, self.repo.get_changeset('tip'))
 

	
 
    def test_root_node(self):
 
        tip = self.repo.get_changeset()
 
        self.assertTrue(tip.root is tip.get_node(''))
 

	
 
    def test_lazy_fetch(self):
 
        """
 
        Test if changeset's nodes expands and are cached as we walk through
 
        the revision. This test is somewhat hard to write as order of tests
 
        is a key here. Written by running command after command in a shell.
 
        """
 
        hex = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
 
        self.assertTrue(hex in self.repo.revisions)
kallithea/tests/vcs/test_hg.py
Show inline comments
 
@@ -197,63 +197,63 @@ class MercurialRepositoryTest(unittest.T
 

	
 
        node = init_chset.get_node('vcs/__init__.py')
 
        self.assertTrue(hasattr(node, 'kind'))
 
        self.assertEqual(node.kind, NodeKind.FILE)
 

	
 
    def test_not_existing_changeset(self):
 
        # rawid
 
        self.assertRaises(RepositoryError, self.repo.get_changeset,
 
            'abcd' * 10)
 
        # shortid
 
        self.assertRaises(RepositoryError, self.repo.get_changeset,
 
            'erro' * 4)
 
        # numeric
 
        self.assertRaises(RepositoryError, self.repo.get_changeset,
 
            self.repo.count() + 1)
 

	
 

	
 
        # Small chance we ever get to this one
 
        revision = pow(2, 30)
 
        self.assertRaises(RepositoryError, self.repo.get_changeset, revision)
 

	
 
    def test_changeset10(self):
 

	
 
        chset10 = self.repo.get_changeset(10)
 
        README = """===
 
        readme = """===
 
VCS
 
===
 

	
 
Various Version Control System management abstraction layer for Python.
 

	
 
Introduction
 
------------
 

	
 
TODO: To be written...
 

	
 
"""
 
        node = chset10.get_node('README.rst')
 
        self.assertEqual(node.kind, NodeKind.FILE)
 
        self.assertEqual(node.content, README)
 
        self.assertEqual(node.content, readme)
 

	
 

	
 
class MercurialChangesetTest(unittest.TestCase):
 

	
 
    def setUp(self):
 
        self.repo = MercurialRepository(safe_str(TEST_HG_REPO))
 

	
 
    def _test_equality(self, changeset):
 
        revision = changeset.revision
 
        self.assertEqual(changeset, self.repo.get_changeset(revision))
 

	
 
    def test_equality(self):
 
        self.setUp()
 
        revs = [0, 10, 20]
 
        changesets = [self.repo.get_changeset(rev) for rev in revs]
 
        for changeset in changesets:
 
            self._test_equality(changeset)
 

	
 
    def test_default_changeset(self):
 
        tip = self.repo.get_changeset('tip')
 
        self.assertEqual(tip, self.repo.get_changeset())
 
        self.assertEqual(tip, self.repo.get_changeset(revision=None))
 
        self.assertEqual(tip, list(self.repo[-1:])[0])
 

	
0 comments (0 inline, 0 general)