Changeset - 94bbb7eb5b64
[Not reviewed]
default
0 5 0
Lars Kruse - 8 years ago 2017-08-25 14:33:18
devel@sumpfralle.de
codingstyle: replace upper-case variable names with lower-case ones

PEP8 issues reported by flake8.
5 files changed with 56 insertions and 52 deletions:
0 comments (0 inline, 0 general)
kallithea/tests/functional/test_changeset_comments.py
Show inline comments
 
@@ -39,10 +39,11 @@ class TestChangeSetCommentsController(Te
 

	
 
        notification = Notification.query().all()[0]
 

	
 
        ID = ChangesetComment.query().first().comment_id
 
        commit_id = ChangesetComment.query().first().comment_id
 
        assert notification.type_ == Notification.TYPE_CHANGESET_COMMENT
 
        sbj = (u'/%s/changeset/'
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s' % (HG_REPO, ID))
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s'
 
               % (HG_REPO, commit_id))
 
        print "%s vs %s" % (sbj, notification.subject)
 
        assert sbj in notification.subject
 

	
 
@@ -78,10 +79,11 @@ class TestChangeSetCommentsController(Te
 
        assert ChangesetComment.query().count() == 1
 

	
 
        notification = Notification.query().all()[0]
 
        ID = ChangesetComment.query().first().comment_id
 
        commit_id = ChangesetComment.query().first().comment_id
 
        assert notification.type_ == Notification.TYPE_CHANGESET_COMMENT
 
        sbj = (u'/%s/changeset/'
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s' % (HG_REPO, ID))
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s'
 
               % (HG_REPO, commit_id))
 
        print "%s vs %s" % (sbj, notification.subject)
 
        assert sbj in notification.subject
 

	
kallithea/tests/other/test_libs.py
Show inline comments
 
@@ -297,12 +297,12 @@ class TestLibs(TestController):
 
        """
 
        import re
 
        # quickly change expected url[] into a link
 
        URL_PAT = re.compile(r'(?:url\[)(.+?)(?:\])')
 
        url_pattern = re.compile(r'(?:url\[)(.+?)(?:\])')
 

	
 
        def url_func(match_obj):
 
            _url = match_obj.groups()[0]
 
            return tmpl % (url_ or '/repo_name/changeset/%s' % _url, _url)
 
        return URL_PAT.sub(url_func, text)
 
        return url_pattern.sub(url_func, text)
 

	
 
    @parametrize('sample,expected', [
 
      ("",
kallithea/tests/other/test_vcs_operations.py
Show inline comments
 
@@ -83,17 +83,17 @@ def _get_tmp_dir(prefix='vcs_operations-
 
    return tempfile.mkdtemp(dir=TESTS_TMP_PATH, prefix=prefix, suffix=suffix)
 

	
 

	
 
def _add_files_and_push(webserver, vcs, DEST, ignoreReturnCode=False, files_no=3,
 
def _add_files_and_push(webserver, vcs, dest_dir, ignoreReturnCode=False, files_no=3,
 
                        clone_url=None, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS):
 
    """
 
    Generate some files, add it to DEST repo and push back
 
    Generate some files, add it to dest_dir repo and push back
 
    vcs is git or hg and defines what VCS we want to make those files for
 

	
 
    :param vcs:
 
    :param DEST:
 
    :param dest_dir:
 
    """
 
    # commit some stuff into this repo
 
    cwd = os.path.join(DEST)
 
    cwd = os.path.join(dest_dir)
 
    #added_file = '%ssetupążźć.py' % _RandomNameSequence().next()
 
    added_file = '%ssetup.py' % _RandomNameSequence().next()
 
    open(os.path.join(cwd, added_file), 'a').close()
 
@@ -227,29 +227,29 @@ class TestVCSOperations(TestController):
 
        assert 'not found' in stderr
 

	
 
    def test_push_new_file_hg(self, webserver):
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(HG_REPO, fork_name)
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, clone_url=clone_url)
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, clone_url=clone_url)
 

	
 
        assert 'pushing to' in stdout
 
        assert 'Repository size' in stdout
 
        assert 'Last revision is now' in stdout
 

	
 
    def test_push_new_file_git(self, webserver):
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        # commit some stuff into this repo
 
        fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(GIT_REPO, fork_name)
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = _add_files_and_push(webserver, 'git', DEST, clone_url=clone_url)
 
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, clone_url=clone_url)
 
        print [(x.repo_full_path,x.repo_path) for x in Repository.query()] # TODO: what is this for
 
        _check_proper_git_push(stdout, stderr)
 

	
 
@@ -263,14 +263,14 @@ class TestVCSOperations(TestController):
 
        key.cache_active = True
 
        Session().commit()
 

	
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(HG_REPO, fork_name)
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, files_no=1, clone_url=clone_url)
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, files_no=1, clone_url=clone_url)
 

	
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               == fork_name).all()
 
@@ -286,15 +286,15 @@ class TestVCSOperations(TestController):
 
        key.cache_active = True
 
        Session().commit()
 

	
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        # commit some stuff into this repo
 
        fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(GIT_REPO, fork_name)
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = _add_files_and_push(webserver, 'git', DEST, files_no=1, clone_url=clone_url)
 
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, files_no=1, clone_url=clone_url)
 
        _check_proper_git_push(stdout, stderr)
 

	
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
@@ -302,43 +302,45 @@ class TestVCSOperations(TestController):
 
        assert key == []
 

	
 
    def test_push_wrong_credentials_hg(self, webserver):
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, username='bad',
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, username='bad',
 
                                             password='name', ignoreReturnCode=True)
 

	
 
        assert 'abort: authorization failed' in stderr
 

	
 
    def test_push_wrong_credentials_git(self, webserver):
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, 'git', DEST, username='bad',
 
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, username='bad',
 
                                             password='name', ignoreReturnCode=True)
 

	
 
        assert 'fatal: Authentication failed' in stderr
 

	
 
    def test_push_back_to_wrong_url_hg(self, webserver):
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST,
 
                                    clone_url='http://%s:%s/tmp' % (webserver.server_address[0], webserver.server_address[1]),
 
        stdout, stderr = _add_files_and_push(
 
            webserver, 'hg', dest_dir, clone_url='http://%s:%s/tmp' % (
 
                webserver.server_address[0], webserver.server_address[1]),
 
                                    ignoreReturnCode = True)
 

	
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_push_back_to_wrong_url_git(self, webserver):
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, 'git', DEST,
 
                                    clone_url='http://%s:%s/tmp' % (webserver.server_address[0], webserver.server_address[1]),
 
        stdout, stderr = _add_files_and_push(
 
            webserver, 'git', dest_dir, clone_url='http://%s:%s/tmp' % (
 
                webserver.server_address[0], webserver.server_address[1]),
 
                                    ignoreReturnCode = True)
 

	
 
        assert 'not found' in stderr
 
@@ -392,9 +394,9 @@ class TestVCSOperations(TestController):
 

	
 
    def test_push_on_locked_repo_by_other_user_hg(self, webserver):
 
        # clone some temp
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        # lock repo
 
        r = Repository.get_by_repo_name(HG_REPO)
 
@@ -405,7 +407,7 @@ class TestVCSOperations(TestController):
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 

	
 
        # push fails repo is locked by other user !
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST,
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir,
 
                                             username=TEST_USER_REGULAR_LOGIN,
 
                                             password=TEST_USER_REGULAR_PASS,
 
                                             ignoreReturnCode=True)
 
@@ -418,9 +420,9 @@ class TestVCSOperations(TestController):
 
        # for example on Linux if /tmp is mounted noexec.
 

	
 
        # clone some temp
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        # lock repo
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
@@ -431,7 +433,7 @@ class TestVCSOperations(TestController):
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 

	
 
        # push fails repo is locked by other user !
 
        stdout, stderr = _add_files_and_push(webserver, 'git', DEST,
 
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir,
 
                                             username=TEST_USER_REGULAR_LOGIN,
 
                                             password=TEST_USER_REGULAR_PASS,
 
                                             ignoreReturnCode=True)
 
@@ -454,9 +456,9 @@ class TestVCSOperations(TestController):
 
        r.enable_locking = True
 
        Session().commit()
 
        # clone some temp
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        # check for lock repo after clone
 
        r = Repository.get_by_repo_name(fork_name)
 
@@ -464,7 +466,7 @@ class TestVCSOperations(TestController):
 
        assert r.locked[0] == uid
 

	
 
        # push is ok and repo is now unlocked
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, clone_url=clone_url)
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, clone_url=clone_url)
 
        assert str('remote: Released lock on repo `%s`' % fork_name) in stdout
 
        # we need to cleanup the Session Here !
 
        Session.remove()
 
@@ -480,16 +482,16 @@ class TestVCSOperations(TestController):
 
        r.enable_locking = True
 
        Session().commit()
 
        # clone some temp
 
        DEST = _get_tmp_dir()
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        # check for lock repo after clone
 
        r = Repository.get_by_repo_name(fork_name)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 
        # push is ok and repo is now unlocked
 
        stdout, stderr = _add_files_and_push(webserver, 'git', DEST, clone_url=clone_url)
 
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, clone_url=clone_url)
 
        _check_proper_git_push(stdout, stderr)
 

	
 
        assert ('remote: Released lock on repo `%s`' % fork_name) in stderr
kallithea/tests/vcs/test_git.py
Show inline comments
 
@@ -236,7 +236,7 @@ class GitRepositoryTest(unittest.TestCas
 
    def test_changeset10(self):
 

	
 
        chset10 = self.repo.get_changeset(self.repo.revisions[9])
 
        README = """===
 
        readme = """===
 
VCS
 
===
 

	
 
@@ -250,7 +250,7 @@ TODO: To be written...
 
"""
 
        node = chset10.get_node('README.rst')
 
        self.assertEqual(node.kind, NodeKind.FILE)
 
        self.assertEqual(node.content, README)
 
        self.assertEqual(node.content, readme)
 

	
 

	
 
class GitChangesetTest(unittest.TestCase):
kallithea/tests/vcs/test_hg.py
Show inline comments
 
@@ -218,7 +218,7 @@ class MercurialRepositoryTest(unittest.T
 
    def test_changeset10(self):
 

	
 
        chset10 = self.repo.get_changeset(10)
 
        README = """===
 
        readme = """===
 
VCS
 
===
 

	
 
@@ -232,7 +232,7 @@ TODO: To be written...
 
"""
 
        node = chset10.get_node('README.rst')
 
        self.assertEqual(node.kind, NodeKind.FILE)
 
        self.assertEqual(node.content, README)
 
        self.assertEqual(node.content, readme)
 

	
 

	
 
class MercurialChangesetTest(unittest.TestCase):
0 comments (0 inline, 0 general)