Changeset - c76615e17a27
[Not reviewed]
default
0 2 0
timeless@gmail.com - 10 years ago 2016-05-03 07:08:23
timeless@gmail.com
spelling: committed
2 files changed with 6 insertions and 6 deletions:
0 comments (0 inline, 0 general)
kallithea/tests/functional/test_files.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
import os
 
import posixpath
 
import mimetypes
 
from kallithea.tests import *
 
from kallithea.model.db import Repository
 
from kallithea.model.meta import Session
 
from kallithea.tests.fixture import Fixture
 

	
 
fixture = Fixture()
 

	
 
ARCHIVE_SPECS = {
 
    '.tar.bz2': ('application/x-bzip2', 'tbz2', ''),
 
    '.tar.gz': ('application/x-gzip', 'tgz', ''),
 
    '.zip': ('application/zip', 'zip', ''),
 
}
 

	
 
HG_NODE_HISTORY = fixture.load_resource('hg_node_history_response.json')
 
GIT_NODE_HISTORY = fixture.load_resource('git_node_history_response.json')
 

	
 

	
 
def _set_downloads(repo_name, set_to):
 
    repo = Repository.get_by_repo_name(repo_name)
 
    repo.enable_downloads = set_to
 
    Session().add(repo)
 
    Session().commit()
 

	
 

	
 
class TestFilesController(TestControllerPytest):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=HG_REPO,
 
                                    revision='tip',
 
                                    f_path='/'))
 
        # Test response...
 
        response.mustcontain('<a class="browser-dir ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/docs"><i class="icon-folder-open"></i><span>docs</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-dir ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/vcs"><i class="icon-folder-open"></i><span>vcs</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/.gitignore"><i class="icon-doc"></i><span>.gitignore</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/.hgignore"><i class="icon-doc"></i><span>.hgignore</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/.hgtags"><i class="icon-doc"></i><span>.hgtags</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/.travis.yml"><i class="icon-doc"></i><span>.travis.yml</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/MANIFEST.in"><i class="icon-doc"></i><span>MANIFEST.in</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/README.rst"><i class="icon-doc"></i><span>README.rst</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/run_test_and_report.sh"><i class="icon-doc"></i><span>run_test_and_report.sh</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/setup.cfg"><i class="icon-doc"></i><span>setup.cfg</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/setup.py"><i class="icon-doc"></i><span>setup.py</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/test_and_report.sh"><i class="icon-doc"></i><span>test_and_report.sh</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/tox.ini"><i class="icon-doc"></i><span>tox.ini</span></a>' % HG_REPO)
 

	
 
    def test_index_revision(self):
 
        self.log_user()
 

	
 
        response = self.app.get(
 
            url(controller='files', action='index',
 
                repo_name=HG_REPO,
 
                revision='7ba66bec8d6dbba14a2155be32408c435c5f4492',
 
                f_path='/')
 
        )
 

	
 
        #Test response...
 

	
 
        response.mustcontain('<a class="browser-dir ypjax-link" href="/%s/files/7ba66bec8d6dbba14a2155be32408c435c5f4492/docs"><i class="icon-folder-open"></i><span>docs</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-dir ypjax-link" href="/%s/files/7ba66bec8d6dbba14a2155be32408c435c5f4492/tests"><i class="icon-folder-open"></i><span>tests</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/7ba66bec8d6dbba14a2155be32408c435c5f4492/README.rst"><i class="icon-doc"></i><span>README.rst</span></a>' % HG_REPO)
 
        response.mustcontain('1.1 KiB')
 

	
 
    def test_index_different_branch(self):
 
        self.log_user()
 

	
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=HG_REPO,
 
                                    revision='97e8b885c04894463c51898e14387d80c30ed1ee',
 
                                    f_path='/'))
 

	
 
        response.mustcontain("""<option selected="selected" value="97e8b885c04894463c51898e14387d80c30ed1ee">git at 97e8b885c048</option>""")
 

	
 
    def test_index_paging(self):
 
        self.log_user()
 

	
 
        for r in [(73, 'a066b25d5df7016b45a41b7e2a78c33b57adc235'),
 
                  (92, 'cc66b61b8455b264a7a8a2d8ddc80fcfc58c221e'),
 
                  (109, '75feb4c33e81186c87eac740cee2447330288412'),
 
                  (1, '3d8f361e72ab303da48d799ff1ac40d5ac37c67e'),
 
                  (0, 'b986218ba1c9b0d6a259fac9b050b1724ed8e545')]:
 

	
 
            response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=HG_REPO,
 
                                    revision=r[1],
 
                                    f_path='/'))
 

	
 
            response.mustcontain("""@ r%s:%s""" % (r[0], r[1][:12]))
 

	
 
    def test_file_source(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=HG_REPO,
 
                                    revision='8911406ad776fdd3d0b9932a2e89677e57405a48',
 
                                    f_path='vcs/nodes.py'))
 

	
 
        response.mustcontain("""<div class="commit">Partially implemented <a class="issue-tracker-link" href="https://issues.example.com/vcs_test_hg/issue/16">#16</a>. filecontent/commit message/author/node name are safe_unicode now.
 
In addition some other __str__ are unicode as well
 
Added test for unicode
 
Improved test to clone into uniq repository.
 
removed extra unicode conversion in diff.</div>
 
""")
 

	
 
        response.mustcontain("""<option selected="selected" value="8911406ad776fdd3d0b9932a2e89677e57405a48">default at 8911406ad776</option>""")
 

	
 
    def test_file_source_history(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='history',
 
                                    repo_name=HG_REPO,
 
                                    revision='tip',
 
                                    f_path='vcs/nodes.py'),
 
                                extra_environ={'HTTP_X_PARTIAL_XHR': '1'},)
 
        self.assertEqual(response.body, HG_NODE_HISTORY)
 

	
 
    def test_file_source_history_git(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='history',
 
                                    repo_name=GIT_REPO,
 
                                    revision='master',
 
                                    f_path='vcs/nodes.py'),
 
                                extra_environ={'HTTP_X_PARTIAL_XHR': '1'},)
 
        self.assertEqual(response.body, GIT_NODE_HISTORY)
 

	
 
    def test_file_annotation(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=HG_REPO,
 
                                    revision='tip',
 
                                    f_path='vcs/nodes.py',
 
                                    annotate=True))
 

	
 
        response.mustcontain("""r356:25213a5fbb04""")
 

	
 
    def test_file_annotation_git(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=GIT_REPO,
 
                                    revision='master',
 
                                    f_path='vcs/nodes.py',
 
                                    annotate=True))
 
        response.mustcontain("""r345:c994f0de03b2""")
 

	
 
    def test_file_annotation_history(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='history',
 
                                    repo_name=HG_REPO,
 
                                    revision='tip',
 
                                    f_path='vcs/nodes.py',
 
                                    annotate=True),
 
                                extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 

	
 
        self.assertEqual(response.body, HG_NODE_HISTORY)
 

	
 
    def test_file_annotation_history_git(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='history',
 
                                    repo_name=GIT_REPO,
 
                                    revision='master',
 
                                    f_path='vcs/nodes.py',
 
                                    annotate=True),
 
                                extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 

	
 
        self.assertEqual(response.body, GIT_NODE_HISTORY)
 

	
 
    def test_file_authors(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='authors',
 
                                    repo_name=HG_REPO,
 
                                    revision='tip',
 
                                    f_path='vcs/nodes.py',
 
                                    annotate=True))
 
        response.mustcontain('Marcin Kuzminski')
 
        response.mustcontain('Lukasz Balcerzak')
 

	
 
    def test_file_authors_git(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='authors',
 
                                    repo_name=GIT_REPO,
 
                                    revision='master',
 
                                    f_path='vcs/nodes.py',
 
                                    annotate=True))
 
        response.mustcontain('Marcin Kuzminski')
 
        response.mustcontain('Lukasz Balcerzak')
 

	
 
    def test_archival(self):
 
        self.log_user()
 
        _set_downloads(HG_REPO, set_to=True)
 
        for arch_ext, info in ARCHIVE_SPECS.items():
 
            short = '27cd5cce30c9%s' % arch_ext
 
            fname = '27cd5cce30c96924232dffcd24178a07ffeb5dfc%s' % arch_ext
 
            filename = '%s-%s' % (HG_REPO, short)
 
            response = self.app.get(url(controller='files',
 
                                        action='archivefile',
 
                                        repo_name=HG_REPO,
 
                                        fname=fname))
 

	
 
            self.assertEqual(response.status, '200 OK')
 
            heads = [
 
                ('Pragma', 'no-cache'),
 
                ('Cache-Control', 'no-cache'),
 
                ('Content-Disposition', 'attachment; filename=%s' % filename),
 
                ('Content-Type', '%s; charset=utf-8' % info[0]),
 
            ]
 
            self.assertEqual(response.response._headers.items(), heads)
 

	
 
    def test_archival_wrong_ext(self):
 
        self.log_user()
 
        _set_downloads(HG_REPO, set_to=True)
 
        for arch_ext in ['tar', 'rar', 'x', '..ax', '.zipz']:
 
            fname = '27cd5cce30c96924232dffcd24178a07ffeb5dfc%s' % arch_ext
 

	
 
            response = self.app.get(url(controller='files',
 
                                        action='archivefile',
 
                                        repo_name=HG_REPO,
 
                                        fname=fname))
 
            response.mustcontain('Unknown archive type')
 

	
 
    def test_archival_wrong_revision(self):
 
        self.log_user()
 
        _set_downloads(HG_REPO, set_to=True)
 
        for rev in ['00x000000', 'tar', 'wrong', '@##$@$42413232', '232dffcd']:
 
            fname = '%s.zip' % rev
 

	
 
            response = self.app.get(url(controller='files',
 
                                        action='archivefile',
 
                                        repo_name=HG_REPO,
 
                                        fname=fname))
 
            response.mustcontain('Unknown revision')
 

	
 
    #==========================================================================
 
    # RAW FILE
 
    #==========================================================================
 
    def test_raw_file_ok(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='rawfile',
 
                                    repo_name=HG_REPO,
 
                                    revision='27cd5cce30c96924232dffcd24178a07ffeb5dfc',
 
                                    f_path='vcs/nodes.py'))
 

	
 
        self.assertEqual(response.content_disposition, "attachment; filename=nodes.py")
 
        self.assertEqual(response.content_type, mimetypes.guess_type("nodes.py")[0])
 

	
 
    def test_raw_file_wrong_cs(self):
 
        self.log_user()
 
        rev = u'ERRORce30c96924232dffcd24178a07ffeb5dfc'
 
        f_path = 'vcs/nodes.py'
 

	
 
        response = self.app.get(url(controller='files', action='rawfile',
 
                                    repo_name=HG_REPO,
 
                                    revision=rev,
 
                                    f_path=f_path), status=404)
 

	
 
        msg = """Such revision does not exist for this repository"""
 
        response.mustcontain(msg)
 

	
 
    def test_raw_file_wrong_f_path(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        f_path = 'vcs/ERRORnodes.py'
 
        response = self.app.get(url(controller='files', action='rawfile',
 
                                    repo_name=HG_REPO,
 
                                    revision=rev,
 
                                    f_path=f_path), status=404)
 

	
 
        msg = "There is no file nor directory at the given path: &#39;%s&#39; at revision %s" % (f_path, rev[:12])
 
        response.mustcontain(msg)
 

	
 
    #==========================================================================
 
    # RAW RESPONSE - PLAIN
 
    #==========================================================================
 
    def test_raw_ok(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='raw',
 
                                    repo_name=HG_REPO,
 
                                    revision='27cd5cce30c96924232dffcd24178a07ffeb5dfc',
 
                                    f_path='vcs/nodes.py'))
 

	
 
        self.assertEqual(response.content_type, "text/plain")
 

	
 
    def test_raw_wrong_cs(self):
 
        self.log_user()
 
        rev = u'ERRORcce30c96924232dffcd24178a07ffeb5dfc'
 
        f_path = 'vcs/nodes.py'
 

	
 
        response = self.app.get(url(controller='files', action='raw',
 
                                    repo_name=HG_REPO,
 
                                    revision=rev,
 
                                    f_path=f_path), status=404)
 

	
 
        msg = """Such revision does not exist for this repository"""
 
        response.mustcontain(msg)
 

	
 
    def test_raw_wrong_f_path(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        f_path = 'vcs/ERRORnodes.py'
 
        response = self.app.get(url(controller='files', action='raw',
 
                                    repo_name=HG_REPO,
 
                                    revision=rev,
 
                                    f_path=f_path), status=404)
 
        msg = "There is no file nor directory at the given path: &#39;%s&#39; at revision %s" % (f_path, rev[:12])
 
        response.mustcontain(msg)
 

	
 
    def test_ajaxed_files_list(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        response = self.app.get(
 
            url('files_nodelist_home', repo_name=HG_REPO, f_path='/',
 
                revision=rev),
 
            extra_environ={'HTTP_X_PARTIAL_XHR': '1'},
 
        )
 
        response.mustcontain("vcs/web/simplevcs/views/repository.py")
 

	
 
    # Hg - ADD FILE
 
    def test_add_file_view_hg(self):
 
        self.log_user()
 
        response = self.app.get(url('files_add_home',
 
                                      repo_name=HG_REPO,
 
                                      revision='tip', f_path='/'))
 

	
 
    def test_add_file_into_hg_missing_content(self):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=HG_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': '',
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'No content')
 

	
 
    def test_add_file_into_hg_missing_filename(self):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=HG_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'No filename')
 

	
 
    @parametrize('location,filename', [
 
        ('/abs', 'foo'),
 
        ('../rel', 'foo'),
 
        ('file/../foo', 'foo'),
 
    ])
 
    def test_add_file_into_hg_bad_filenames(self, location, filename):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=HG_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'Location must be relative path and must not contain .. in path')
 

	
 
    @parametrize('cnt,location,filename', [
 
        (1, '', 'foo.txt'),
 
        (2, 'dir', 'foo.rst'),
 
        (3, 'rel/dir', 'foo.bar'),
 
    ])
 
    def test_add_file_into_hg(self, cnt, location, filename):
 
        self.log_user()
 
        repo = fixture.create_repo(u'commit-test-%s' % cnt, repo_type='hg')
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % posixpath.join(location, filename))
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    # Git - add file
 
    def test_add_file_view_git(self):
 
        self.log_user()
 
        response = self.app.get(url('files_add_home',
 
                                      repo_name=GIT_REPO,
 
                                      revision='tip', f_path='/'))
 

	
 
    def test_add_file_into_git_missing_content(self):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=GIT_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                     'content': '',
 
                                     '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        self.checkSessionFlash(response, 'No content')
 

	
 
    def test_add_file_into_git_missing_filename(self):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=GIT_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'No filename')
 

	
 
    @parametrize('location,filename', [
 
        ('/abs', 'foo'),
 
        ('../rel', 'foo'),
 
        ('file/../foo', 'foo'),
 
    ])
 
    def test_add_file_into_git_bad_filenames(self, location, filename):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=GIT_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'Location must be relative path and must not contain .. in path')
 

	
 
    @parametrize('cnt,location,filename', [
 
        (1, '', 'foo.txt'),
 
        (2, 'dir', 'foo.rst'),
 
        (3, 'rel/dir', 'foo.bar'),
 
    ])
 
    def test_add_file_into_git(self, cnt, location, filename):
 
        self.log_user()
 
        repo = fixture.create_repo(u'commit-test-%s' % cnt, repo_type='git')
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % posixpath.join(location, filename))
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    # Hg - EDIT
 
    def test_edit_file_view_hg(self):
 
        self.log_user()
 
        response = self.app.get(url('files_edit_home',
 
                                      repo_name=HG_REPO,
 
                                      revision='tip', f_path='vcs/nodes.py'))
 

	
 
    def test_edit_file_view_not_on_branch_hg(self):
 
        self.log_user()
 
        repo = fixture.create_repo(u'test-edit-repo', repo_type='hg')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % posixpath.join(location, filename))
 
            response = self.app.get(url('files_edit_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision='tip', f_path=posixpath.join(location, filename)),
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                'You can only edit files with revision being a valid branch')
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    def test_edit_file_view_commit_changes_hg(self):
 
        self.log_user()
 
        repo = fixture.create_repo(u'test-edit-repo', repo_type='hg')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip',
 
                                      f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % posixpath.join(location, filename))
 
            response = self.app.post(url('files_edit_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision=repo.scm_instance.DEFAULT_BRANCH_NAME,
 
                                          f_path=posixpath.join(location, filename)),
 
                                     params={
 
                                        'content': "def py():\n print 'hello world'\n",
 
                                        'message': 'i commited',
 
                                        'message': 'i committed',
 
                                        '_authentication_token': self.authentication_token(),
 
                                     },
 
                                    status=302)
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % posixpath.join(location, filename))
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    # Git - edit
 
    def test_edit_file_view_git(self):
 
        self.log_user()
 
        response = self.app.get(url('files_edit_home',
 
                                      repo_name=GIT_REPO,
 
                                      revision='tip', f_path='vcs/nodes.py'))
 

	
 
    def test_edit_file_view_not_on_branch_git(self):
 
        self.log_user()
 
        repo = fixture.create_repo(u'test-edit-repo', repo_type='git')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % posixpath.join(location, filename))
 
            response = self.app.get(url('files_edit_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision='tip', f_path=posixpath.join(location, filename)),
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                'You can only edit files with revision being a valid branch')
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    def test_edit_file_view_commit_changes_git(self):
 
        self.log_user()
 
        repo = fixture.create_repo(u'test-edit-repo', repo_type='git')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip',
 
                                      f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % posixpath.join(location, filename))
 
            response = self.app.post(url('files_edit_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision=repo.scm_instance.DEFAULT_BRANCH_NAME,
 
                                          f_path=posixpath.join(location, filename)),
 
                                     params={
 
                                        'content': "def py():\n print 'hello world'\n",
 
                                        'message': 'i commited',
 
                                        'message': 'i committed',
 
                                        '_authentication_token': self.authentication_token(),
 
                                     },
 
                                    status=302)
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % posixpath.join(location, filename))
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    # Hg - delete
 
    def test_delete_file_view_hg(self):
 
        self.log_user()
 
        response = self.app.get(url('files_delete_home',
 
                                     repo_name=HG_REPO,
 
                                     revision='tip', f_path='vcs/nodes.py'))
 

	
 
    def test_delete_file_view_not_on_branch_hg(self):
 
        self.log_user()
 
        repo = fixture.create_repo(u'test-delete-repo', repo_type='hg')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % posixpath.join(location, filename))
 
            response = self.app.get(url('files_delete_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision='tip', f_path=posixpath.join(location, filename)),
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                'You can only delete files with revision being a valid branch')
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    def test_delete_file_view_commit_changes_hg(self):
 
        self.log_user()
 
        repo = fixture.create_repo(u'test-delete-repo', repo_type='hg')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip',
 
                                      f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % posixpath.join(location, filename))
 
            response = self.app.post(url('files_delete_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision=repo.scm_instance.DEFAULT_BRANCH_NAME,
 
                                          f_path=posixpath.join(location, filename)),
 
                                     params={
 
                                        'message': 'i commited',
 
                                        'message': 'i committed',
 
                                        '_authentication_token': self.authentication_token(),
 
                                     },
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                                   'Successfully deleted file %s' % posixpath.join(location, filename))
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    # Git - delete
 
    def test_delete_file_view_git(self):
 
        self.log_user()
 
        response = self.app.get(url('files_delete_home',
 
                                     repo_name=HG_REPO,
 
                                     revision='tip', f_path='vcs/nodes.py'))
 

	
 
    def test_delete_file_view_not_on_branch_git(self):
 
        self.log_user()
 
        repo = fixture.create_repo(u'test-delete-repo', repo_type='git')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % posixpath.join(location, filename))
 
            response = self.app.get(url('files_delete_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision='tip', f_path=posixpath.join(location, filename)),
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                'You can only delete files with revision being a valid branch')
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    def test_delete_file_view_commit_changes_git(self):
 
        self.log_user()
 
        repo = fixture.create_repo(u'test-delete-repo', repo_type='git')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip',
 
                                      f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % posixpath.join(location, filename))
 
            response = self.app.post(url('files_delete_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision=repo.scm_instance.DEFAULT_BRANCH_NAME,
 
                                          f_path=posixpath.join(location, filename)),
 
                                     params={
 
                                        'message': 'i commited',
 
                                        'message': 'i committed',
 
                                        '_authentication_token': self.authentication_token(),
 
                                     },
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                                   'Successfully deleted file %s' % posixpath.join(location, filename))
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
kallithea/tests/other/manual_test_vcs_operations.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.tests.other.manual_test_vcs_operations
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Test suite for making push/pull operations.
 

	
 
Run it in two terminals::
 
 paster serve kallithea/tests/test.ini
 
 KALLITHEA_WHOOSH_TEST_DISABLE=1 KALLITHEA_NO_TMP_PATH=1 nosetests kallithea/tests/other/manual_test_vcs_operations.py
 

	
 
You must have git > 1.8.1 for tests to work fine
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Dec 30, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
import re
 
import tempfile
 
import time
 
from os.path import join as jn
 

	
 
from tempfile import _RandomNameSequence
 
from subprocess import Popen, PIPE
 

	
 
from kallithea.tests import *
 
from kallithea.model.db import User, Repository, UserIpMap, CacheInvalidation
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.user import UserModel
 

	
 
DEBUG = True
 
HOST = '127.0.0.1:4999'  # test host
 

	
 

	
 
class Command(object):
 

	
 
    def __init__(self, cwd):
 
        self.cwd = cwd
 

	
 
    def execute(self, cmd, *args):
 
        """
 
        Runs command on the system with given ``args``.
 
        """
 

	
 
        command = cmd + ' ' + ' '.join(args)
 
        if DEBUG:
 
            print '*** CMD %s ***' % command
 
        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
 
        stdout, stderr = p.communicate()
 
        if DEBUG:
 
            print 'stdout:', repr(stdout)
 
            print 'stderr:', repr(stderr)
 
        return stdout, stderr
 

	
 

	
 
def _get_tmp_dir():
 
    return tempfile.mkdtemp(prefix='rc_integration_test')
 

	
 

	
 
def _construct_url(repo, dest=None, **kwargs):
 
    if dest is None:
 
        #make temp clone
 
        dest = _get_tmp_dir()
 
    params = {
 
        'user': TEST_USER_ADMIN_LOGIN,
 
        'passwd': TEST_USER_ADMIN_PASS,
 
        'host': HOST,
 
        'cloned_repo': repo,
 
        'dest': dest
 
    }
 
    params.update(**kwargs)
 
    if params['user'] and params['passwd']:
 
        _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params
 
    else:
 
        _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params
 
    return _url
 

	
 

	
 
def _add_files_and_push(vcs, DEST, **kwargs):
 
    """
 
    Generate some files, add it to DEST repo and push back
 
    vcs is git or hg and defines what VCS we want to make those files for
 

	
 
    :param vcs:
 
    :param DEST:
 
    """
 
    # commit some stuff into this repo
 
    cwd = path = jn(DEST)
 
    #added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
 
    added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next())
 
    Command(cwd).execute('touch %s' % added_file)
 
    Command(cwd).execute('%s add %s' % (vcs, added_file))
 

	
 
    for i in xrange(kwargs.get('files_no', 3)):
 
        cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
 
        Command(cwd).execute(cmd)
 
        author_str = 'User ǝɯɐᴎ <me@example.com>'
 
        if vcs == 'hg':
 
            cmd = """hg commit -m 'commited new %s' -u '%s' %s """ % (
 
            cmd = """hg commit -m 'committed new %s' -u '%s' %s """ % (
 
                i, author_str, added_file
 
            )
 
        elif vcs == 'git':
 
            cmd = """EMAIL="me@example.com" git commit -m 'commited new %s' --author '%s' %s """ % (
 
            cmd = """EMAIL="me@example.com" git commit -m 'committed new %s' --author '%s' %s """ % (
 
                i, author_str, added_file
 
            )
 
        Command(cwd).execute(cmd)
 

	
 
    # PUSH it back
 
    _REPO = None
 
    if vcs == 'hg':
 
        _REPO = HG_REPO
 
    elif vcs == 'git':
 
        _REPO = GIT_REPO
 

	
 
    kwargs['dest'] = ''
 
    clone_url = _construct_url(_REPO, **kwargs)
 
    if 'clone_url' in kwargs:
 
        clone_url = kwargs['clone_url']
 
    stdout = stderr = None
 
    if vcs == 'hg':
 
        stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
 
    elif vcs == 'git':
 
        stdout, stderr = Command(cwd).execute('git push --verbose', clone_url + " master")
 

	
 
    return stdout, stderr
 

	
 

	
 
def set_anonymous_access(enable=True):
 
    user = User.get_by_username(User.DEFAULT_USER)
 
    user.active = enable
 
    Session().add(user)
 
    Session().commit()
 
    print '\tanonymous access is now:', enable
 
    if enable != User.get_by_username(User.DEFAULT_USER).active:
 
        raise Exception('Cannot set anonymous access')
 

	
 

	
 
#==============================================================================
 
# TESTS
 
#==============================================================================
 

	
 

	
 
def _check_proper_git_push(stdout, stderr):
 
    #WTF Git stderr is output ?!
 
    assert 'fatal' not in stderr
 
    assert 'rejected' not in stderr
 
    assert 'Pushing to' in stderr
 
    assert 'master -> master' in stderr
 

	
 

	
 
class TestVCSOperations(BaseTestCase):
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        #DISABLE ANONYMOUS ACCESS
 
        set_anonymous_access(False)
 

	
 
    def setUp(self):
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        Repository.unlock(r)
 
        r.enable_locking = False
 
        Session().add(r)
 
        Session().commit()
 

	
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        Repository.unlock(r)
 
        r.enable_locking = False
 
        Session().add(r)
 
        Session().commit()
 

	
 
    def test_clone_hg_repo_by_admin(self):
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        assert 'requesting all changes' in stdout
 
        assert 'adding changesets' in stdout
 
        assert 'adding manifests' in stdout
 
        assert 'adding file changes' in stdout
 

	
 
        assert stderr == ''
 

	
 
    def test_clone_git_repo_by_admin(self):
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        assert 'Cloning into' in stdout + stderr
 
        assert stderr == '' or stdout == ''
 

	
 
    def test_clone_wrong_credentials_hg(self):
 
        clone_url = _construct_url(HG_REPO, passwd='bad!')
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 
        assert 'abort: authorization failed' in stderr
 

	
 
    def test_clone_wrong_credentials_git(self):
 
        clone_url = _construct_url(GIT_REPO, passwd='bad!')
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 
        assert 'fatal: Authentication failed' in stderr
 

	
 
    def test_clone_git_dir_as_hg(self):
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_clone_hg_repo_as_git(self):
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 
        assert 'not found' in stderr
 

	
 
    def test_clone_non_existing_path_hg(self):
 
        clone_url = _construct_url('trololo')
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_clone_non_existing_path_git(self):
 
        clone_url = _construct_url('trololo')
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 
        assert 'not found' in stderr
 

	
 
    def test_push_new_file_hg(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('hg', DEST)
 

	
 
        assert 'pushing to' in stdout
 
        assert 'Repository size' in stdout
 
        assert 'Last revision is now' in stdout
 

	
 
    def test_push_new_file_git(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        # commit some stuff into this repo
 
        stdout, stderr = _add_files_and_push('git', DEST)
 

	
 
        print [(x.repo_full_path,x.repo_path) for x in Repository.get_all()]
 
        _check_proper_git_push(stdout, stderr)
 

	
 
    def test_push_invalidates_cache_hg(self):
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               ==HG_REPO).scalar()
 
        if not key:
 
            key = CacheInvalidation(HG_REPO, HG_REPO)
 

	
 
        key.cache_active = True
 
        Session().add(key)
 
        Session().commit()
 

	
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('hg', DEST, files_no=1)
 

	
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               ==HG_REPO).all()
 
        self.assertEqual(key, [])
 

	
 
    def test_push_invalidates_cache_git(self):
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               ==GIT_REPO).scalar()
 
        if not key:
 
            key = CacheInvalidation(GIT_REPO, GIT_REPO)
 

	
 
        key.cache_active = True
 
        Session().add(key)
 
        Session().commit()
 

	
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        # commit some stuff into this repo
 
        stdout, stderr = _add_files_and_push('git', DEST, files_no=1)
 
        _check_proper_git_push(stdout, stderr)
 

	
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               ==GIT_REPO).all()
 
        self.assertEqual(key, [])
 

	
 
    def test_push_wrong_credentials_hg(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('hg', DEST, user='bad',
 
                                             passwd='name')
 

	
 
        assert 'abort: authorization failed' in stderr
 

	
 
    def test_push_wrong_credentials_git(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('git', DEST, user='bad',
 
                                             passwd='name')
 

	
 
        assert 'fatal: Authentication failed' in stderr
 

	
 
    def test_push_back_to_wrong_url_hg(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('hg', DEST,
 
                                    clone_url='http://%s/tmp' % HOST)
 

	
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_push_back_to_wrong_url_git(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('git', DEST,
 
                                    clone_url='http://%s/tmp' % HOST)
 

	
 
        assert 'not found' in stderr
 

	
 
    def test_clone_and_create_lock_hg(self):
 
        # enable locking
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        # clone
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        #check if lock was made
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 
    def test_clone_and_create_lock_git(self):
 
        # enable locking
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        # clone
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        #check if lock was made
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 
    def test_clone_after_repo_was_locked_hg(self):
 
        #lock repo
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 
        #pull fails since repo is locked
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
 
        assert msg in stderr
 

	
 
    def test_clone_after_repo_was_locked_git(self):
 
        #lock repo
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 
        #pull fails since repo is locked
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 
        msg = ("""The requested URL returned error: 423""")
 
        assert msg in stderr
 

	
 
    def test_push_on_locked_repo_by_other_user_hg(self):
 
        #clone some temp
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        #lock repo
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        # let this user actually push !
 
        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
 
                                          perm='repository.write')
 
        Session().commit()
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 

	
 
        #push fails repo is locked by other user !
 
        stdout, stderr = _add_files_and_push('hg', DEST,
 
                                             user=TEST_USER_REGULAR_LOGIN,
 
                                             passwd=TEST_USER_REGULAR_PASS)
 
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
 
        assert msg in stderr
 

	
 
    def test_push_on_locked_repo_by_other_user_git(self):
 
        #clone some temp
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        #lock repo
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        # let this user actually push !
 
        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
 
                                          perm='repository.write')
 
        Session().commit()
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 

	
 
        #push fails repo is locked by other user !
 
        stdout, stderr = _add_files_and_push('git', DEST,
 
                                             user=TEST_USER_REGULAR_LOGIN,
 
                                             passwd=TEST_USER_REGULAR_PASS)
 
        err = 'Repository `%s` locked by user `%s`' % (GIT_REPO, TEST_USER_ADMIN_LOGIN)
 
        assert err in stderr
 

	
 
        #TODO: fix this somehow later on Git, Git is stupid and even if we throw
 
        #back 423 to it, it makes ANOTHER request and we fail there with 405 :/
 

	
 
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
                % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
 
        #msg = "405 Method Not Allowed"
 
        #assert msg in stderr
 

	
 
    def test_push_unlocks_repository_hg(self):
 
        # enable locking
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        #clone some temp
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        #check for lock repo after clone
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        uid = User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 
        assert r.locked[0] == uid
 

	
 
        #push is ok and repo is now unlocked
 
        stdout, stderr = _add_files_and_push('hg', DEST)
 
        assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout
 
        #we need to cleanup the Session Here !
 
        Session.remove()
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        assert r.locked == [None, None]
 

	
 
    #TODO: fix me ! somehow during tests hooks don't get called on Git
 
    def test_push_unlocks_repository_git(self):
 
        # enable locking
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        #clone some temp
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        #check for lock repo after clone
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 
        #push is ok and repo is now unlocked
 
        stdout, stderr = _add_files_and_push('git', DEST)
 
        _check_proper_git_push(stdout, stderr)
 

	
 
        #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout
 
        #we need to cleanup the Session Here !
 
        Session.remove()
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        assert r.locked == [None, None]
 

	
 
    def test_ip_restriction_hg(self):
 
        user_model = UserModel()
 
        try:
 
            user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
 
            Session().commit()
 
            clone_url = _construct_url(HG_REPO)
 
            stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 
            assert 'abort: HTTP Error 403: Forbidden' in stderr
 
        finally:
 
            #release IP restrictions
 
            for ip in UserIpMap.getAll():
 
                UserIpMap.delete(ip.ip_id)
 
            Session().commit()
 

	
 
        time.sleep(2)
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        assert 'requesting all changes' in stdout
 
        assert 'adding changesets' in stdout
 
        assert 'adding manifests' in stdout
 
        assert 'adding file changes' in stdout
 

	
 
        assert stderr == ''
 

	
 
    def test_ip_restriction_git(self):
 
        user_model = UserModel()
 
        try:
 
            user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
 
            Session().commit()
 
            clone_url = _construct_url(GIT_REPO)
 
            stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 
            # The message apparently changed in Git 1.8.3, so match it loosely.
 
            assert re.search(r'\b403\b', stderr)
 
        finally:
 
            #release IP restrictions
 
            for ip in UserIpMap.getAll():
 
                UserIpMap.delete(ip.ip_id)
 
            Session().commit()
 

	
 
        time.sleep(2)
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        assert 'Cloning into' in stdout + stderr
 
        assert stderr == '' or stdout == ''
0 comments (0 inline, 0 general)