Changeset - 94bbb7eb5b64
[Not reviewed]
default
0 5 0
Lars Kruse - 8 years ago 2017-08-25 14:33:18
devel@sumpfralle.de
codingstyle: replace upper-case variable names with lower-case ones

PEP8 issues reported by flake8.
5 files changed with 58 insertions and 54 deletions:
0 comments (0 inline, 0 general)
kallithea/tests/functional/test_changeset_comments.py
Show inline comments
 
@@ -36,16 +36,17 @@ class TestChangeSetCommentsController(Te
 

	
 
        assert Notification.query().count() == 1
 
        assert ChangesetComment.query().count() == 1
 

	
 
        notification = Notification.query().all()[0]
 

	
 
        ID = ChangesetComment.query().first().comment_id
 
        commit_id = ChangesetComment.query().first().comment_id
 
        assert notification.type_ == Notification.TYPE_CHANGESET_COMMENT
 
        sbj = (u'/%s/changeset/'
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s' % (HG_REPO, ID))
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s'
 
               % (HG_REPO, commit_id))
 
        print "%s vs %s" % (sbj, notification.subject)
 
        assert sbj in notification.subject
 

	
 
    def test_create_inline(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
@@ -75,16 +76,17 @@ class TestChangeSetCommentsController(Te
 
        )
 

	
 
        assert Notification.query().count() == 1
 
        assert ChangesetComment.query().count() == 1
 

	
 
        notification = Notification.query().all()[0]
 
        ID = ChangesetComment.query().first().comment_id
 
        commit_id = ChangesetComment.query().first().comment_id
 
        assert notification.type_ == Notification.TYPE_CHANGESET_COMMENT
 
        sbj = (u'/%s/changeset/'
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s' % (HG_REPO, ID))
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s'
 
               % (HG_REPO, commit_id))
 
        print "%s vs %s" % (sbj, notification.subject)
 
        assert sbj in notification.subject
 

	
 
    def test_create_with_mention(self):
 
        self.log_user()
 

	
kallithea/tests/other/test_libs.py
Show inline comments
 
@@ -294,18 +294,18 @@ class TestLibs(TestController):
 
        Changes `some text url[foo]` => `some text <a href="/">foo</a>
 

	
 
        :param text:
 
        """
 
        import re
 
        # quickly change expected url[] into a link
 
        URL_PAT = re.compile(r'(?:url\[)(.+?)(?:\])')
 
        url_pattern = re.compile(r'(?:url\[)(.+?)(?:\])')
 

	
 
        def url_func(match_obj):
 
            _url = match_obj.groups()[0]
 
            return tmpl % (url_ or '/repo_name/changeset/%s' % _url, _url)
 
        return URL_PAT.sub(url_func, text)
 
        return url_pattern.sub(url_func, text)
 

	
 
    @parametrize('sample,expected', [
 
      ("",
 
       ""),
 
      ("git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68",
 
       """git-svn-id: <a href="https://svn.apache.org/repos/asf/libcloud/trunk@1441655">https://svn.apache.org/repos/asf/libcloud/trunk@1441655</a> 13f79535-47bb-0310-9956-ffa450edef68"""),
kallithea/tests/other/test_vcs_operations.py
Show inline comments
 
@@ -80,23 +80,23 @@ class Command(object):
 

	
 

	
 
def _get_tmp_dir(prefix='vcs_operations-', suffix=''):
 
    return tempfile.mkdtemp(dir=TESTS_TMP_PATH, prefix=prefix, suffix=suffix)
 

	
 

	
 
def _add_files_and_push(webserver, vcs, DEST, ignoreReturnCode=False, files_no=3,
 
def _add_files_and_push(webserver, vcs, dest_dir, ignoreReturnCode=False, files_no=3,
 
                        clone_url=None, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS):
 
    """
 
    Generate some files, add it to DEST repo and push back
 
    Generate some files, add it to dest_dir repo and push back
 
    vcs is git or hg and defines what VCS we want to make those files for
 

	
 
    :param vcs:
 
    :param DEST:
 
    :param dest_dir:
 
    """
 
    # commit some stuff into this repo
 
    cwd = os.path.join(DEST)
 
    cwd = os.path.join(dest_dir)
 
    #added_file = '%ssetupążźć.py' % _RandomNameSequence().next()
 
    added_file = '%ssetup.py' % _RandomNameSequence().next()
 
    open(os.path.join(cwd, added_file), 'a').close()
 
    Command(cwd).execute('%s add %s' % (vcs, added_file))
 

	
 
    email = 'me@example.com'
 
@@ -224,35 +224,35 @@ class TestVCSOperations(TestController):
 
    def test_clone_non_existing_path_git(self, webserver):
 
        clone_url = webserver.repo_url('trololo')
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'not found' in stderr
 

	
 
    def test_push_new_file_hg(self, webserver):
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(HG_REPO, fork_name)
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, clone_url=clone_url)
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, clone_url=clone_url)
 

	
 
        assert 'pushing to' in stdout
 
        assert 'Repository size' in stdout
 
        assert 'Last revision is now' in stdout
 

	
 
    def test_push_new_file_git(self, webserver):
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        # commit some stuff into this repo
 
        fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(GIT_REPO, fork_name)
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = _add_files_and_push(webserver, 'git', DEST, clone_url=clone_url)
 
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, clone_url=clone_url)
 
        print [(x.repo_full_path,x.repo_path) for x in Repository.query()] # TODO: what is this for
 
        _check_proper_git_push(stdout, stderr)
 

	
 
    def test_push_invalidates_cache_hg(self, webserver):
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               == HG_REPO).scalar()
 
@@ -260,20 +260,20 @@ class TestVCSOperations(TestController):
 
            key = CacheInvalidation(HG_REPO, HG_REPO)
 
            Session().add(key)
 

	
 
        key.cache_active = True
 
        Session().commit()
 

	
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(HG_REPO, fork_name)
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, files_no=1, clone_url=clone_url)
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, files_no=1, clone_url=clone_url)
 

	
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               == fork_name).all()
 
        assert key == []
 

	
 
    def test_push_invalidates_cache_git(self, webserver):
 
@@ -283,66 +283,68 @@ class TestVCSOperations(TestController):
 
            key = CacheInvalidation(GIT_REPO, GIT_REPO)
 
            Session().add(key)
 

	
 
        key.cache_active = True
 
        Session().commit()
 

	
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        # commit some stuff into this repo
 
        fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(GIT_REPO, fork_name)
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = _add_files_and_push(webserver, 'git', DEST, files_no=1, clone_url=clone_url)
 
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, files_no=1, clone_url=clone_url)
 
        _check_proper_git_push(stdout, stderr)
 

	
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               == fork_name).all()
 
        assert key == []
 

	
 
    def test_push_wrong_credentials_hg(self, webserver):
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, username='bad',
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, username='bad',
 
                                             password='name', ignoreReturnCode=True)
 

	
 
        assert 'abort: authorization failed' in stderr
 

	
 
    def test_push_wrong_credentials_git(self, webserver):
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, 'git', DEST, username='bad',
 
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, username='bad',
 
                                             password='name', ignoreReturnCode=True)
 

	
 
        assert 'fatal: Authentication failed' in stderr
 

	
 
    def test_push_back_to_wrong_url_hg(self, webserver):
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST,
 
                                    clone_url='http://%s:%s/tmp' % (webserver.server_address[0], webserver.server_address[1]),
 
                                    ignoreReturnCode = True)
 
        stdout, stderr = _add_files_and_push(
 
            webserver, 'hg', dest_dir, clone_url='http://%s:%s/tmp' % (
 
                webserver.server_address[0], webserver.server_address[1]),
 
            ignoreReturnCode=True)
 

	
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_push_back_to_wrong_url_git(self, webserver):
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, 'git', DEST,
 
                                    clone_url='http://%s:%s/tmp' % (webserver.server_address[0], webserver.server_address[1]),
 
                                    ignoreReturnCode = True)
 
        stdout, stderr = _add_files_and_push(
 
            webserver, 'git', dest_dir, clone_url='http://%s:%s/tmp' % (
 
                webserver.server_address[0], webserver.server_address[1]),
 
            ignoreReturnCode=True)
 

	
 
        assert 'not found' in stderr
 

	
 
    def test_clone_and_create_lock_hg(self, webserver):
 
        # enable locking
 
        r = Repository.get_by_repo_name(HG_REPO)
 
@@ -389,52 +391,52 @@ class TestVCSOperations(TestController):
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        msg = ("""The requested URL returned error: 423""")
 
        assert msg in stderr
 

	
 
    def test_push_on_locked_repo_by_other_user_hg(self, webserver):
 
        # clone some temp
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        # lock repo
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        # let this user actually push !
 
        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
 
                                          perm='repository.write')
 
        Session().commit()
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 

	
 
        # push fails repo is locked by other user !
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST,
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir,
 
                                             username=TEST_USER_REGULAR_LOGIN,
 
                                             password=TEST_USER_REGULAR_PASS,
 
                                             ignoreReturnCode=True)
 
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
 
        assert msg in stderr
 

	
 
    def test_push_on_locked_repo_by_other_user_git(self, webserver):
 
        # Note: Git hooks must be executable on unix. This test will thus fail
 
        # for example on Linux if /tmp is mounted noexec.
 

	
 
        # clone some temp
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        # lock repo
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        # let this user actually push !
 
        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
 
                                          perm='repository.write')
 
        Session().commit()
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 

	
 
        # push fails repo is locked by other user !
 
        stdout, stderr = _add_files_and_push(webserver, 'git', DEST,
 
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir,
 
                                             username=TEST_USER_REGULAR_LOGIN,
 
                                             password=TEST_USER_REGULAR_PASS,
 
                                             ignoreReturnCode=True)
 
        err = 'Repository `%s` locked by user `%s`' % (GIT_REPO, TEST_USER_ADMIN_LOGIN)
 
        assert err in stderr
 

	
 
@@ -451,23 +453,23 @@ class TestVCSOperations(TestController):
 
        fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(HG_REPO, fork_name)
 
        r = Repository.get_by_repo_name(fork_name)
 
        r.enable_locking = True
 
        Session().commit()
 
        # clone some temp
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        # check for lock repo after clone
 
        r = Repository.get_by_repo_name(fork_name)
 
        uid = User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 
        assert r.locked[0] == uid
 

	
 
        # push is ok and repo is now unlocked
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, clone_url=clone_url)
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, clone_url=clone_url)
 
        assert str('remote: Released lock on repo `%s`' % fork_name) in stdout
 
        # we need to cleanup the Session Here !
 
        Session.remove()
 
        r = Repository.get_by_repo_name(fork_name)
 
        assert r.locked == [None, None]
 

	
 
@@ -477,22 +479,22 @@ class TestVCSOperations(TestController):
 
        fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(GIT_REPO, fork_name)
 
        r = Repository.get_by_repo_name(fork_name)
 
        r.enable_locking = True
 
        Session().commit()
 
        # clone some temp
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        # check for lock repo after clone
 
        r = Repository.get_by_repo_name(fork_name)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 
        # push is ok and repo is now unlocked
 
        stdout, stderr = _add_files_and_push(webserver, 'git', DEST, clone_url=clone_url)
 
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, clone_url=clone_url)
 
        _check_proper_git_push(stdout, stderr)
 

	
 
        assert ('remote: Released lock on repo `%s`' % fork_name) in stderr
 
        # we need to cleanup the Session Here !
 
        Session.remove()
 
        r = Repository.get_by_repo_name(fork_name)
kallithea/tests/vcs/test_git.py
Show inline comments
 
@@ -233,13 +233,13 @@ class GitRepositoryTest(unittest.TestCas
 
        self.assertRaises(RepositoryError, self.repo.get_changeset,
 
            'f' * 40)
 

	
 
    def test_changeset10(self):
 

	
 
        chset10 = self.repo.get_changeset(self.repo.revisions[9])
 
        README = """===
 
        readme = """===
 
VCS
 
===
 

	
 
Various Version Control System management abstraction layer for Python.
 

	
 
Introduction
 
@@ -247,13 +247,13 @@ Introduction
 

	
 
TODO: To be written...
 

	
 
"""
 
        node = chset10.get_node('README.rst')
 
        self.assertEqual(node.kind, NodeKind.FILE)
 
        self.assertEqual(node.content, README)
 
        self.assertEqual(node.content, readme)
 

	
 

	
 
class GitChangesetTest(unittest.TestCase):
 

	
 
    def setUp(self):
 
        self.repo = GitRepository(TEST_GIT_REPO)
kallithea/tests/vcs/test_hg.py
Show inline comments
 
@@ -215,13 +215,13 @@ class MercurialRepositoryTest(unittest.T
 
        revision = pow(2, 30)
 
        self.assertRaises(RepositoryError, self.repo.get_changeset, revision)
 

	
 
    def test_changeset10(self):
 

	
 
        chset10 = self.repo.get_changeset(10)
 
        README = """===
 
        readme = """===
 
VCS
 
===
 

	
 
Various Version Control System management abstraction layer for Python.
 

	
 
Introduction
 
@@ -229,13 +229,13 @@ Introduction
 

	
 
TODO: To be written...
 

	
 
"""
 
        node = chset10.get_node('README.rst')
 
        self.assertEqual(node.kind, NodeKind.FILE)
 
        self.assertEqual(node.content, README)
 
        self.assertEqual(node.content, readme)
 

	
 

	
 
class MercurialChangesetTest(unittest.TestCase):
 

	
 
    def setUp(self):
 
        self.repo = MercurialRepository(safe_str(TEST_HG_REPO))
0 comments (0 inline, 0 general)