Changeset - 29d0ed59e674
[Not reviewed]
default
0 8 0
Thomas De Schampheleire - 10 years ago 2015-06-20 21:40:59
thomas.de.schampheleire@gmail.com
tests: remove hardcoded test_admin username
8 files changed with 22 insertions and 22 deletions:
0 comments (0 inline, 0 general)
kallithea/tests/functional/test_admin_gists.py
Show inline comments
 
from kallithea.tests import *
 
from kallithea.model.gist import GistModel
 
from kallithea.model.meta import Session
 
from kallithea.model.db import User, Gist
 

	
 

	
 
def _create_gist(f_name, content='some gist', lifetime=-1,
 
                 description=u'gist-desc', gist_type='public',
 
                 owner=TEST_USER_ADMIN_LOGIN):
 
    gist_mapping = {
 
        f_name: {'content': content}
 
    }
 
    user = User.get_by_username(owner)
 
    gist = GistModel().create(description, owner=user,
 
                       gist_mapping=gist_mapping, gist_type=gist_type,
 
                       lifetime=lifetime)
 
    Session().commit()
 
    return gist
 

	
 

	
 
class TestGistsController(TestController):
 

	
 
    def tearDown(self):
 
        for g in Gist.get_all():
 
            GistModel().delete(g)
 
        Session().commit()
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('gists'))
 
        # Test response...
 
        response.mustcontain('There are no gists yet')
 

	
 
        g1 = _create_gist('gist1').gist_access_id
 
        g2 = _create_gist('gist2', lifetime=1400).gist_access_id
 
        g3 = _create_gist('gist3', description=u'gist3-desc').gist_access_id
 
        g4 = _create_gist('gist4', gist_type='private').gist_access_id
 
        response = self.app.get(url('gists'))
 
        # Test response...
 
        response.mustcontain('gist: %s' % g1)
 
        response.mustcontain('gist: %s' % g2)
 
        response.mustcontain('Expires: in 23 hours')  # we don't care about the end
 
        response.mustcontain('gist: %s' % g3)
 
        response.mustcontain('gist3-desc')
 
        response.mustcontain(no=['gist: %s' % g4])
 

	
 
    def test_index_private_gists(self):
 
        self.log_user()
 
        gist = _create_gist('gist5', gist_type='private')
 
        response = self.app.get(url('gists', private=1))
 
        # Test response...
 

	
 
        #and privates
 
        response.mustcontain('gist: %s' % gist.gist_access_id)
 

	
 
    def test_create_missing_description(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
                                 params={'lifetime': -1, '_authentication_token': self.authentication_token()},
 
                                 status=200)
 

	
 
        response.mustcontain('Missing value')
 

	
 
    def test_create(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
                                 params={'lifetime': -1,
 
                                         'content': 'gist test',
 
                                         'filename': 'foo',
 
                                         'public': 'public',
 
                                         '_authentication_token': self.authentication_token()},
 
                                 status=302)
 
        response = response.follow()
 
        response.mustcontain('added file: foo')
 
        response.mustcontain('gist test')
 
        response.mustcontain('<div class="btn btn-mini btn-success disabled">Public Gist</div>')
 

	
 
    def test_create_with_path_with_dirs(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
                                 params={'lifetime': -1,
 
                                         'content': 'gist test',
 
                                         'filename': '/home/foo',
 
                                         'public': 'public',
 
                                         '_authentication_token': self.authentication_token()},
 
                                 status=200)
 
        response.mustcontain('Filename cannot be inside a directory')
 

	
 
    def test_access_expired_gist(self):
 
        self.log_user()
 
        gist = _create_gist('never-see-me')
 
        gist.gist_expires = 0  # 1970
 
        Session().add(gist)
 
        Session().commit()
 

	
 
        response = self.app.get(url('gist', gist_id=gist.gist_access_id), status=404)
 

	
 
    def test_create_private(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
                                 params={'lifetime': -1,
 
                                         'content': 'private gist test',
 
                                         'filename': 'private-foo',
 
                                         'private': 'private',
 
                                         '_authentication_token': self.authentication_token()},
 
                                 status=302)
 
        response = response.follow()
 
        response.mustcontain('added file: private-foo<')
 
        response.mustcontain('private gist test')
 
        response.mustcontain('<div class="btn btn-mini btn-warning disabled">Private Gist</div>')
 

	
 
    def test_create_with_description(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
                                 params={'lifetime': -1,
 
                                         'content': 'gist test',
 
                                         'filename': 'foo-desc',
 
                                         'description': 'gist-desc',
 
                                         'public': 'public',
 
                                         '_authentication_token': self.authentication_token()},
 
                                 status=302)
 
        response = response.follow()
 
        response.mustcontain('added file: foo-desc')
 
        response.mustcontain('gist test')
 
        response.mustcontain('gist-desc')
 
        response.mustcontain('<div class="btn btn-mini btn-success disabled">Public Gist</div>')
 

	
 
    def test_new(self):
 
        self.log_user()
 
        response = self.app.get(url('new_gist'))
 

	
 
    def test_update(self):
 
        self.skipTest('not implemented')
 
        response = self.app.put(url('gist', gist_id=1))
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        gist = _create_gist('delete-me')
 
        response = self.app.delete(url('gist', gist_id=gist.gist_id))
 
        self.checkSessionFlash(response, 'Deleted gist %s' % gist.gist_id)
 

	
 
    def test_delete_normal_user_his_gist(self):
 
        self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
 
        gist = _create_gist('delete-me', owner=TEST_USER_REGULAR_LOGIN)
 
        response = self.app.delete(url('gist', gist_id=gist.gist_id))
 
        self.checkSessionFlash(response, 'Deleted gist %s' % gist.gist_id)
 

	
 
    def test_delete_normal_user_not_his_own_gist(self):
 
        self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
 
        gist = _create_gist('delete-me')
 
        response = self.app.delete(url('gist', gist_id=gist.gist_id), status=403)
 

	
 
    def test_show(self):
 
        gist = _create_gist('gist-show-me')
 
        response = self.app.get(url('gist', gist_id=gist.gist_access_id))
 
        response.mustcontain('added file: gist-show-me<')
 
        response.mustcontain('test_admin - created')
 
        response.mustcontain('%s - created' % TEST_USER_ADMIN_LOGIN)
 
        response.mustcontain('gist-desc')
 
        response.mustcontain('<div class="btn btn-mini btn-success disabled">Public Gist</div>')
 

	
 
    def test_show_as_raw(self):
 
        gist = _create_gist('gist-show-me', content='GIST CONTENT')
 
        response = self.app.get(url('formatted_gist',
 
                                    gist_id=gist.gist_access_id, format='raw'))
 
        self.assertEqual(response.body, 'GIST CONTENT')
 

	
 
    def test_show_as_raw_individual_file(self):
 
        gist = _create_gist('gist-show-me-raw', content='GIST BODY')
 
        response = self.app.get(url('formatted_gist_file',
 
                                    gist_id=gist.gist_access_id, format='raw',
 
                                    revision='tip', f_path='gist-show-me-raw'))
 
        self.assertEqual(response.body, 'GIST BODY')
 

	
 
    def test_edit(self):
 
        response = self.app.get(url('edit_gist', gist_id=1))
kallithea/tests/functional/test_changeset_comments.py
Show inline comments
 
from kallithea.tests import *
 
from kallithea.model.db import ChangesetComment, Notification, \
 
    UserNotification
 
from kallithea.model.meta import Session
 

	
 

	
 
class TestChangeSetCommentsController(TestController):
 

	
 
    def setUp(self):
 
        for x in ChangesetComment.query().all():
 
            Session().delete(x)
 
        Session().commit()
 

	
 
        for x in Notification.query().all():
 
            Session().delete(x)
 
        Session().commit()
 

	
 
    def tearDown(self):
 
        for x in ChangesetComment.query().all():
 
            Session().delete(x)
 
        Session().commit()
 

	
 
        for x in Notification.query().all():
 
            Session().delete(x)
 
        Session().commit()
 

	
 
    def test_create(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'CommentOnRevision'
 

	
 
        params = {'text': text, '_authentication_token': self.authentication_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
                                     params=params)
 
        # Test response...
 
        self.assertEqual(response.status, '302 Found')
 
        response.follow()
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        # test DB
 
        self.assertEqual(ChangesetComment.query().count(), 1)
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (0 inline, 1 general)'''
 
        )
 

	
 
        self.assertEqual(Notification.query().count(), 1)
 
        self.assertEqual(ChangesetComment.query().count(), 1)
 

	
 
        notification = Notification.query().all()[0]
 

	
 
        ID = ChangesetComment.query().first().comment_id
 
        self.assertEqual(notification.type_,
 
                         Notification.TYPE_CHANGESET_COMMENT)
 
        sbj = (u'/vcs_test_hg/changeset/'
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s' % ID)
 
        print "%s vs %s" % (sbj, notification.subject)
 
        self.assertTrue(sbj in notification.subject)
 

	
 
    def test_create_inline(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'CommentOnRevision'
 
        f_path = 'vcs/web/simplevcs/views/repository.py'
 
        line = 'n1'
 

	
 
        params = {'text': text, 'f_path': f_path, 'line': line, '_authentication_token': self.authentication_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
                                     params=params)
 
        # Test response...
 
        self.assertEqual(response.status, '302 Found')
 
        response.follow()
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        #test DB
 
        self.assertEqual(ChangesetComment.query().count(), 1)
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (1 inline, 0 general)'''
 
        )
 
        response.mustcontain(
 
            '''<div style="display:none" class="inline-comment-placeholder" '''
 
            '''path="vcs/web/simplevcs/views/repository.py" '''
 
            '''target_id="vcswebsimplevcsviewsrepositorypy">'''
 
        )
 

	
 
        self.assertEqual(Notification.query().count(), 1)
 
        self.assertEqual(ChangesetComment.query().count(), 1)
 

	
 
        notification = Notification.query().all()[0]
 
        ID = ChangesetComment.query().first().comment_id
 
        self.assertEqual(notification.type_,
 
                         Notification.TYPE_CHANGESET_COMMENT)
 
        sbj = (u'/vcs_test_hg/changeset/'
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s' % ID)
 
        print "%s vs %s" % (sbj, notification.subject)
 
        self.assertTrue(sbj in notification.subject)
 

	
 
    def test_create_with_mention(self):
 
        self.log_user()
 

	
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'@test_regular check CommentOnRevision'
 

	
 
        params = {'text': text, '_authentication_token': self.authentication_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
                                     params=params)
 
        # Test response...
 
        self.assertEqual(response.status, '302 Found')
 
        response.follow()
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        # test DB
 
        self.assertEqual(ChangesetComment.query().count(), 1)
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (0 inline, 1 general)'''
 
        )
 

	
 
        self.assertEqual(Notification.query().count(), 2)
 
        users = [x.user.username for x in UserNotification.query().all()]
 

	
 
        # test_regular gets notification by @mention
 
        self.assertEqual(sorted(users), [u'test_admin', u'test_regular'])
 
        self.assertEqual(sorted(users), [TEST_USER_ADMIN_LOGIN, u'test_regular'])
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'CommentOnRevision'
 

	
 
        params = {'text': text, '_authentication_token': self.authentication_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
                                     params=params)
 

	
 
        comments = ChangesetComment.query().all()
 
        self.assertEqual(len(comments), 1)
 
        comment_id = comments[0].comment_id
 

	
 
        self.app.delete(url(controller='changeset',
 
                                    action='delete_comment',
 
                                    repo_name=HG_REPO,
 
                                    comment_id=comment_id))
 

	
 
        comments = ChangesetComment.query().all()
 
        self.assertEqual(len(comments), 0)
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 0 comments (0 inline, 0 general)'''
 
        )
kallithea/tests/functional/test_followers.py
Show inline comments
 
from kallithea.tests import *
 

	
 

	
 
class TestFollowersController(TestController):
 

	
 
    def test_index_hg(self):
 
        self.log_user()
 
        repo_name = HG_REPO
 
        response = self.app.get(url(controller='followers',
 
                                    action='followers',
 
                                    repo_name=repo_name))
 

	
 
        response.mustcontain("""test_admin""")
 
        response.mustcontain(TEST_USER_ADMIN_LOGIN)
 
        response.mustcontain("""Started following""")
 

	
 
    def test_index_git(self):
 
        self.log_user()
 
        repo_name = GIT_REPO
 
        response = self.app.get(url(controller='followers',
 
                                    action='followers',
 
                                    repo_name=repo_name))
 

	
 
        response.mustcontain("""test_admin""")
 
        response.mustcontain(TEST_USER_ADMIN_LOGIN)
 
        response.mustcontain("""Started following""")
kallithea/tests/functional/test_journal.py
Show inline comments
 
from kallithea.tests import *
 
import datetime
 

	
 

	
 
class TestJournalController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='journal', action='index'))
 

	
 
        response.mustcontain("""<div class="journal_day">%s</div>""" % datetime.date.today())
 

	
 
    def test_stop_following_repository(self):
 
        session = self.log_user()
 
#        usr = Session().query(User).filter(User.username == 'test_admin').one()
 
#        usr = Session().query(User).filter(User.username == TEST_USER_ADMIN_LOGIN).one()
 
#        repo = Session().query(Repository).filter(Repository.repo_name == HG_REPO).one()
 
#
 
#        followings = Session().query(UserFollowing)\
 
#            .filter(UserFollowing.user == usr)\
 
#            .filter(UserFollowing.follows_repository == repo).all()
 
#
 
#        assert len(followings) == 1, 'Not following any repository'
 
#
 
#        response = self.app.post(url(controller='journal',
 
#                                     action='toggle_following'),
 
#                                     {'follows_repo_id':repo.repo_id})
 

	
 
    def test_start_following_repository(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='journal', action='index'),)
 

	
 
    def test_public_journal_atom(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='journal', action='public_journal_atom'),)
 

	
 
    def test_public_journal_rss(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='journal', action='public_journal_rss'),)
kallithea/tests/functional/test_login.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
from __future__ import with_statement
 
import mock
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.lib.utils2 import generate_api_key
 
from kallithea.lib.auth import check_password
 
from kallithea.lib import helpers as h
 
from kallithea.model.api_key import ApiKeyModel
 
from kallithea.model import validators
 
from kallithea.model.db import User, Notification
 
from kallithea.model.meta import Session
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestLoginController(TestController):
 

	
 
    def tearDown(self):
 
        for n in Notification.query().all():
 
            Session().delete(n)
 

	
 
        Session().commit()
 
        self.assertEqual(Notification.query().all(), [])
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='login', action='index'))
 
        self.assertEqual(response.status, '200 OK')
 
        # Test response...
 

	
 
    def test_login_admin_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': 'test_admin',
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': 'test12'})
 
        self.assertEqual(response.status, '302 Found')
 
        self.assertEqual(response.session['authuser'].get('username'),
 
                         'test_admin')
 
                         TEST_USER_ADMIN_LOGIN)
 
        response = response.follow()
 
        response.mustcontain('/%s' % HG_REPO)
 

	
 
    def test_login_regular_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': 'test_regular',
 
                                  'password': 'test12'})
 

	
 
        self.assertEqual(response.status, '302 Found')
 
        self.assertEqual(response.session['authuser'].get('username'),
 
                         'test_regular')
 
        response = response.follow()
 
        response.mustcontain('/%s' % HG_REPO)
 

	
 
    def test_login_ok_came_from(self):
 
        test_came_from = '/_admin/users'
 
        response = self.app.post(url(controller='login', action='index',
 
                                     came_from=test_came_from),
 
                                 {'username': 'test_admin',
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': 'test12'})
 
        self.assertEqual(response.status, '302 Found')
 
        response = response.follow()
 

	
 
        self.assertEqual(response.status, '200 OK')
 
        response.mustcontain('Users Administration')
 

	
 
    def test_logout(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': 'test_regular',
 
                                  'password': 'test12'})
 

	
 
        # Verify that a login session has been established.
 
        response = self.app.get(url(controller='login', action='index'))
 
        response = response.follow()
 
        self.assertIn('authuser', response.session)
 

	
 
        response.click('Log Out')
 

	
 
        # Verify that the login session has been terminated.
 
        response = self.app.get(url(controller='login', action='index'))
 
        self.assertNotIn('authuser', response.session)
 

	
 
    @parameterized.expand([
 
          ('data:text/html,<script>window.alert("xss")</script>',),
 
          ('mailto:test@example.com',),
 
          ('file:///etc/passwd',),
 
          ('ftp://some.ftp.server',),
 
          ('http://other.domain/bl%C3%A5b%C3%A6rgr%C3%B8d',),
 
    ])
 
    def test_login_bad_came_froms(self, url_came_from):
 
        response = self.app.post(url(controller='login', action='index',
 
                                     came_from=url_came_from),
 
                                 {'username': 'test_admin',
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': 'test12'})
 
        self.assertEqual(response.status, '302 Found')
 
        self.assertEqual(response._environ['paste.testing_variables']
 
                         ['tmpl_context'].came_from, '/')
 
        response = response.follow()
 

	
 
        self.assertEqual(response.status, '200 OK')
 

	
 
    def test_login_short_password(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': 'test_admin',
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': 'as'})
 
        self.assertEqual(response.status, '200 OK')
 

	
 
        response.mustcontain('Enter 3 characters or more')
 

	
 
    def test_login_wrong_username_password(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': 'error',
 
                                  'password': 'test12'})
 

	
 
        response.mustcontain('invalid user name')
 
        response.mustcontain('invalid password')
 

	
 
    # verify that get arguments are correctly passed along login redirection
 

	
 
    @parameterized.expand([
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå'.encode('utf-8'), 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_redirection_to_login_form_preserves_get_args(self, args, args_encoded):
 
        with fixture.anon_access(False):
 
            response = self.app.get(url(controller='summary', action='index',
 
                                        repo_name=HG_REPO,
 
                                        **args))
 
            self.assertEqual(response.status, '302 Found')
 
            for encoded in args_encoded:
 
                self.assertIn(encoded, response.location)
 

	
 
    @parameterized.expand([
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå'.encode('utf-8'), 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_login_form_preserves_get_args(self, args, args_encoded):
 
        response = self.app.get(url(controller='login', action='index',
 
                                    came_from = '/_admin/users',
 
                                    **args))
 
        for encoded in args_encoded:
 
            self.assertIn(encoded, response.form.action)
 

	
 
    @parameterized.expand([
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå'.encode('utf-8'), 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_redirection_after_successful_login_preserves_get_args(self, args, args_encoded):
 
        response = self.app.post(url(controller='login', action='index',
 
                                     came_from = '/_admin/users',
 
                                     **args),
 
                                 {'username': 'test_admin',
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': 'test12'})
 
        self.assertEqual(response.status, '302 Found')
 
        for encoded in args_encoded:
 
            self.assertIn(encoded, response.location)
 

	
 
    @parameterized.expand([
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå'.encode('utf-8'), 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_login_form_after_incorrect_login_preserves_get_args(self, args, args_encoded):
 
        response = self.app.post(url(controller='login', action='index',
 
                                     came_from = '/_admin/users',
 
                                     **args),
 
                                 {'username': 'error',
 
                                  'password': 'test12'})
 

	
 
        response.mustcontain('invalid user name')
 
        response.mustcontain('invalid password')
 
        for encoded in args_encoded:
 
            self.assertIn(encoded, response.form.action)
 

	
 
    #==========================================================================
 
    # REGISTRATIONS
 
    #==========================================================================
 
    def test_register(self):
 
        response = self.app.get(url(controller='login', action='register'))
 
        response.mustcontain('Sign Up')
 

	
 
    def test_register_err_same_username(self):
 
        uname = 'test_admin'
 
        uname = TEST_USER_ADMIN_LOGIN
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': uname,
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'goodmail@domain.com',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
 

	
 
        msg = validators.ValidUsername()._messages['username_exists']
 
        msg = h.html_escape(msg % {'username': uname})
 
        response.mustcontain(msg)
 

	
 
    def test_register_err_same_email(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': 'test_admin_0',
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'test_admin@mail.com',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
 

	
 
        msg = validators.UniqSystemEmail()()._messages['email_taken']
 
        response.mustcontain(msg)
 

	
 
    def test_register_err_same_email_case_sensitive(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': 'test_admin_1',
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'TesT_Admin@mail.COM',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
 
        msg = validators.UniqSystemEmail()()._messages['email_taken']
 
        response.mustcontain(msg)
 

	
 
    def test_register_err_wrong_data(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': 'xs',
 
                                             'password': 'test',
 
                                             'password_confirmation': 'test',
 
                                             'email': 'goodmailm',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
 
        self.assertEqual(response.status, '200 OK')
 
        response.mustcontain('An email address must contain a single @')
 
        response.mustcontain('Enter a value 6 characters long or more')
 

	
 
    def test_register_err_username(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': 'error user',
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'goodmailm',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
 

	
 
        response.mustcontain('An email address must contain a single @')
 
        response.mustcontain('Username may only contain '
 
                'alphanumeric characters underscores, '
 
                'periods or dashes and must begin with '
 
                'alphanumeric character')
 

	
 
    def test_register_err_case_sensitive(self):
 
        usr = 'Test_Admin'
 
        usr = TEST_USER_ADMIN_LOGIN.title()
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': usr,
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'goodmailm',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
 

	
 
        response.mustcontain('An email address must contain a single @')
 
        msg = validators.ValidUsername()._messages['username_exists']
 
        msg = h.html_escape(msg % {'username': usr})
 
        response.mustcontain(msg)
 

	
 
    def test_register_special_chars(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                        {'username': 'xxxaxn',
 
                                         'password': 'ąćźżąśśśś',
 
                                         'password_confirmation': 'ąćźżąśśśś',
 
                                         'email': 'goodmailm@test.plx',
 
                                         'firstname': 'test',
 
                                         'lastname': 'test'})
 

	
 
        msg = validators.ValidPassword()._messages['invalid_password']
 
        response.mustcontain(msg)
 

	
 
    def test_register_password_mismatch(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': 'xs',
 
                                             'password': '123qwe',
 
                                             'password_confirmation': 'qwe123',
 
                                             'email': 'goodmailm@test.plxa',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
 
        msg = validators.ValidPasswordsMatch()._messages['password_mismatch']
 
        response.mustcontain(msg)
 

	
 
    def test_register_ok(self):
 
        username = 'test_regular4'
 
        password = 'qweqwe'
 
        email = 'username@test.com'
 
        name = 'testname'
 
        lastname = 'testlastname'
 

	
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': username,
 
                                             'password': password,
 
                                             'password_confirmation': password,
 
                                             'email': email,
 
                                             'firstname': name,
 
                                             'lastname': lastname,
 
                                             'admin': True})  # This should be overriden
 
        self.assertEqual(response.status, '302 Found')
 
        self.checkSessionFlash(response, 'You have successfully registered into Kallithea')
 

	
 
        ret = Session().query(User).filter(User.username == 'test_regular4').one()
 
        self.assertEqual(ret.username, username)
 
        self.assertEqual(check_password(password, ret.password), True)
 
        self.assertEqual(ret.email, email)
 
        self.assertEqual(ret.name, name)
 
        self.assertEqual(ret.lastname, lastname)
 
        self.assertNotEqual(ret.api_key, None)
 
        self.assertEqual(ret.admin, False)
 

	
 
    def test_forgot_password_wrong_mail(self):
 
        bad_email = 'username%wrongmail.org'
 
        response = self.app.post(
 
                        url(controller='login', action='password_reset'),
 
                            {'email': bad_email, }
 
        )
 

	
 
        response.mustcontain('An email address must contain a single @')
 

	
 
    def test_forgot_password(self):
 
        response = self.app.get(url(controller='login',
 
                                    action='password_reset'))
 
        self.assertEqual(response.status, '200 OK')
 

	
 
        username = 'test_password_reset_1'
 
        password = 'qweqwe'
 
        email = 'username@python-works.com'
 
        name = 'passwd'
 
        lastname = 'reset'
 

	
 
        new = User()
 
        new.username = username
 
        new.password = password
 
        new.email = email
 
        new.name = name
 
        new.lastname = lastname
 
        new.api_key = generate_api_key(username)
 
        Session().add(new)
 
        Session().commit()
 

	
 
        response = self.app.post(url(controller='login',
 
                                     action='password_reset'),
 
                                 {'email': email, })
 

	
 
        self.checkSessionFlash(response, 'Your password reset link was sent')
 

	
 
        response = response.follow()
 

	
 
        # BAD KEY
 

	
 
        key = "bad"
 
        response = self.app.get(url(controller='login',
 
                                    action='password_reset_confirmation',
 
                                    key=key))
 
        self.assertEqual(response.status, '302 Found')
 
        self.assertTrue(response.location.endswith(url('reset_password')))
 

	
 
        # GOOD KEY
 

	
 
        key = User.get_by_username(username).api_key
 
        response = self.app.get(url(controller='login',
 
                                    action='password_reset_confirmation',
 
                                    key=key))
 
        self.assertEqual(response.status, '302 Found')
 
        self.assertTrue(response.location.endswith(url('login_home')))
 

	
 
        self.checkSessionFlash(response,
 
                               ('Your password reset was successful, '
 
                                'new password has been sent to your email'))
 

	
 
        response = response.follow()
 

	
 
    def _get_api_whitelist(self, values=None):
 
        config = {'api_access_controllers_whitelist': values or []}
 
        return config
 

	
 
    @parameterized.expand([
 
        ('none', None),
 
        ('empty_string', ''),
 
        ('fake_number', '123456'),
 
        ('proper_api_key', None)
 
    ])
 
    def test_access_not_whitelisted_page_via_api_key(self, test_name, api_key):
 
        whitelist = self._get_api_whitelist([])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            self.assertEqual([],
 
                             whitelist['api_access_controllers_whitelist'])
 
            if test_name == 'proper_api_key':
 
                #use builtin if api_key is None
 
                api_key = User.get_first_admin().api_key
 

	
 
            with fixture.anon_access(False):
 
                self.app.get(url(controller='changeset',
 
                                 action='changeset_raw',
 
                                 repo_name=HG_REPO, revision='tip', api_key=api_key),
 
                             status=403)
 

	
 
    @parameterized.expand([
 
        ('none', None, 302),
 
        ('empty_string', '', 302),
 
        ('fake_number', '123456', 302),
 
        ('proper_api_key', None, 200)
 
    ])
 
    def test_access_whitelisted_page_via_api_key(self, test_name, api_key, code):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            self.assertEqual(['ChangesetController:changeset_raw'],
 
                             whitelist['api_access_controllers_whitelist'])
 
            if test_name == 'proper_api_key':
 
                api_key = User.get_first_admin().api_key
 

	
 
            with fixture.anon_access(False):
 
                self.app.get(url(controller='changeset',
 
                                 action='changeset_raw',
 
                                 repo_name=HG_REPO, revision='tip', api_key=api_key),
 
                             status=code)
 

	
 
    def test_access_page_via_extra_api_key(self):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            self.assertEqual(['ChangesetController:changeset_raw'],
 
                             whitelist['api_access_controllers_whitelist'])
 

	
 
            new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
 
            Session().commit()
 
            with fixture.anon_access(False):
 
                self.app.get(url(controller='changeset',
 
                                 action='changeset_raw',
 
                                 repo_name=HG_REPO, revision='tip', api_key=new_api_key.api_key),
 
                             status=200)
 

	
 
    def test_access_page_via_expired_api_key(self):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            self.assertEqual(['ChangesetController:changeset_raw'],
 
                             whitelist['api_access_controllers_whitelist'])
 

	
 
            new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
 
            Session().commit()
kallithea/tests/functional/test_my_account.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
from kallithea.model.db import User, UserFollowing, Repository, UserApiKeys
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.lib import helpers as h
 
from kallithea.model.user import UserModel
 
from kallithea.model.meta import Session
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestMyAccountController(TestController):
 
    test_user_1 = 'testme'
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        if User.get_by_username(cls.test_user_1):
 
            UserModel().delete(cls.test_user_1)
 
            Session().commit()
 

	
 
    def test_my_account(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account'))
 

	
 
        response.mustcontain('value="test_admin')
 
        response.mustcontain('value="%s' % TEST_USER_ADMIN_LOGIN)
 

	
 
    def test_my_account_my_repos(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account_repos'))
 
        cnt = Repository.query().filter(Repository.user ==
 
                           User.get_by_username(TEST_USER_ADMIN_LOGIN)).count()
 
        response.mustcontain('"totalRecords": %s' % cnt)
 

	
 
    def test_my_account_my_watched(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account_watched'))
 

	
 
        cnt = UserFollowing.query().filter(UserFollowing.user ==
 
                            User.get_by_username(TEST_USER_ADMIN_LOGIN)).count()
 
        response.mustcontain('"totalRecords": %s' % cnt)
 

	
 
    def test_my_account_my_emails(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account_emails'))
 
        response.mustcontain('No additional emails specified')
 

	
 
    def test_my_account_my_emails_add_existing_email(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account_emails'))
 
        response.mustcontain('No additional emails specified')
 
        response = self.app.post(url('my_account_emails'),
 
                                 {'new_email': TEST_USER_REGULAR_EMAIL, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'This e-mail address is already taken')
 

	
 
    def test_my_account_my_emails_add_mising_email_in_form(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account_emails'))
 
        response.mustcontain('No additional emails specified')
 
        response = self.app.post(url('my_account_emails'),)
 
        self.checkSessionFlash(response, 'Please enter an email address')
 

	
 
    def test_my_account_my_emails_add_remove(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account_emails'))
 
        response.mustcontain('No additional emails specified')
 

	
 
        response = self.app.post(url('my_account_emails'),
 
                                 {'new_email': 'foo@barz.com', '_authentication_token': self.authentication_token()})
 

	
 
        response = self.app.get(url('my_account_emails'))
 

	
 
        from kallithea.model.db import UserEmailMap
 
        email_id = UserEmailMap.query()\
 
            .filter(UserEmailMap.user == User.get_by_username(TEST_USER_ADMIN_LOGIN))\
 
            .filter(UserEmailMap.email == 'foo@barz.com').one().email_id
 

	
 
        response.mustcontain('foo@barz.com')
 
        response.mustcontain('<input id="del_email_id" name="del_email_id" type="hidden" value="%s" />' % email_id)
 

	
 
        response = self.app.post(url('my_account_emails'),
 
                                 {'del_email_id': email_id, '_method': 'delete', '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'Removed email from user')
 
        response = self.app.get(url('my_account_emails'))
 
        response.mustcontain('No additional emails specified')
 

	
 

	
 
    @parameterized.expand(
 
        [('firstname', {'firstname': 'new_username'}),
 
         ('lastname', {'lastname': 'new_username'}),
 
         ('admin', {'admin': True}),
 
         ('admin', {'admin': False}),
 
         ('extern_type', {'extern_type': 'ldap'}),
 
         ('extern_type', {'extern_type': None}),
 
         #('extern_name', {'extern_name': 'test'}),
 
         #('extern_name', {'extern_name': None}),
 
         ('active', {'active': False}),
 
         ('active', {'active': True}),
 
         ('email', {'email': 'some@email.com'}),
 
        # ('new_password', {'new_password': 'foobar123',
 
        #                   'password_confirmation': 'foobar123'})
 
        ])
 
    def test_my_account_update(self, name, attrs):
 
        usr = fixture.create_user(self.test_user_1, password='qweqwe',
 
                                  email='testme@example.com',
 
                                  extern_type='internal',
 
                                  extern_name=self.test_user_1,
 
                                  skip_if_exists=True)
 
        params = usr.get_api_data(True)  # current user data
 
        user_id = usr.user_id
 
        self.log_user(username=self.test_user_1, password='qweqwe')
 

	
 
        params.update({'password_confirmation': ''})
 
        params.update({'new_password': ''})
 
        params.update({'extern_type': 'internal'})
 
        params.update({'extern_name': self.test_user_1})
 
        params.update({'_authentication_token': self.authentication_token()})
 

	
 
        params.update(attrs)
 
        response = self.app.post(url('my_account'), params)
 

	
 
        self.checkSessionFlash(response,
 
                               'Your account was updated successfully')
 

	
 
        updated_user = User.get_by_username(self.test_user_1)
 
        updated_params = updated_user.get_api_data(True)
 
        updated_params.update({'password_confirmation': ''})
 
        updated_params.update({'new_password': ''})
 

	
 
        params['last_login'] = updated_params['last_login']
 
        if name == 'email':
 
            params['emails'] = [attrs['email']]
 
        if name == 'extern_type':
 
            #cannot update this via form, expected value is original one
 
            params['extern_type'] = "internal"
 
        if name == 'extern_name':
 
            #cannot update this via form, expected value is original one
 
            params['extern_name'] = str(user_id)
 
        if name == 'active':
 
            #my account cannot deactivate account
 
            params['active'] = True
 
        if name == 'admin':
 
            #my account cannot make you an admin !
 
            params['admin'] = False
 

	
 
        params.pop('_authentication_token')
 
        self.assertEqual(params, updated_params)
 

	
 
    def test_my_account_update_err_email_exists(self):
 
        self.log_user()
 

	
 
        new_email = 'test_regular@mail.com'  # already exisitn email
 
        response = self.app.post(url('my_account'),
 
                                params=dict(
 
                                    username='test_admin',
 
                                    username=TEST_USER_ADMIN_LOGIN,
 
                                    new_password='test12',
 
                                    password_confirmation='test122',
 
                                    firstname='NewName',
 
                                    lastname='NewLastname',
 
                                    email=new_email,
 
                                    _authentication_token=self.authentication_token())
 
                                )
 

	
 
        response.mustcontain('This e-mail address is already taken')
 

	
 
    def test_my_account_update_err(self):
 
        self.log_user('test_regular2', 'test12')
 

	
 
        new_email = 'newmail.pl'
 
        response = self.app.post(url('my_account'),
 
                                 params=dict(
 
                                            username='test_admin',
 
                                            username=TEST_USER_ADMIN_LOGIN,
 
                                            new_password='test12',
 
                                            password_confirmation='test122',
 
                                            firstname='NewName',
 
                                            lastname='NewLastname',
 
                                            email=new_email,
 
                                            _authentication_token=self.authentication_token()))
 

	
 
        response.mustcontain('An email address must contain a single @')
 
        from kallithea.model import validators
 
        msg = validators.ValidUsername(edit=False, old_data={})\
 
                ._messages['username_exists']
 
        msg = h.html_escape(msg % {'username': 'test_admin'})
 
        msg = h.html_escape(msg % {'username': TEST_USER_ADMIN_LOGIN})
 
        response.mustcontain(u"%s" % msg)
 

	
 
    def test_my_account_api_keys(self):
 
        usr = self.log_user('test_regular2', 'test12')
 
        user = User.get(usr['user_id'])
 
        response = self.app.get(url('my_account_api_keys'))
 
        response.mustcontain(user.api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
    @parameterized.expand([
 
        ('forever', -1),
 
        ('5mins', 60*5),
 
        ('30days', 60*60*24*30),
 
    ])
 
    def test_my_account_add_api_keys(self, desc, lifetime):
 
        usr = self.log_user('test_regular2', 'test12')
 
        user = User.get(usr['user_id'])
 
        response = self.app.post(url('my_account_api_keys'),
 
                                 {'description': desc, 'lifetime': lifetime, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        try:
 
            response = response.follow()
 
            user = User.get(usr['user_id'])
 
            for api_key in user.api_keys:
 
                response.mustcontain(api_key)
 
        finally:
 
            for api_key in UserApiKeys.query().all():
 
                Session().delete(api_key)
 
                Session().commit()
 

	
 
    def test_my_account_remove_api_key(self):
 
        usr = self.log_user('test_regular2', 'test12')
 
        user = User.get(usr['user_id'])
 
        response = self.app.post(url('my_account_api_keys'),
 
                                 {'description': 'desc', 'lifetime': -1, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        response = response.follow()
 

	
 
        #now delete our key
 
        keys = UserApiKeys.query().all()
 
        self.assertEqual(1, len(keys))
 

	
 
        response = self.app.post(url('my_account_api_keys'),
 
                 {'_method': 'delete', 'del_api_key': keys[0].api_key, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully deleted')
 
        keys = UserApiKeys.query().all()
 
        self.assertEqual(0, len(keys))
 

	
 

	
 
    def test_my_account_reset_main_api_key(self):
 
        usr = self.log_user('test_regular2', 'test12')
 
        user = User.get(usr['user_id'])
 
        api_key = user.api_key
 
        response = self.app.get(url('my_account_api_keys'))
 
        response.mustcontain(api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
        response = self.app.post(url('my_account_api_keys'),
 
                 {'_method': 'delete', 'del_api_key_builtin': api_key, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully reset')
 
        response = response.follow()
 
        response.mustcontain(no=[api_key])
kallithea/tests/functional/test_summary.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.model.db import Repository
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.meta import Session
 
from kallithea.model.scm import ScmModel
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestSummaryController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(HG_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name=HG_REPO))
 

	
 
        #repo type
 
        response.mustcontain(
 
            """<span class="repotag">hg"""
 
        )
 
        #public/private
 
        response.mustcontain(
 
            """<i class="icon-globe">"""
 
        )
 

	
 
        # clone url...
 
        response.mustcontain('''id="clone_url" readonly="readonly" value="http://test_admin@localhost:80/%s"''' % HG_REPO)
 
        response.mustcontain('''id="clone_url_id" readonly="readonly" value="http://test_admin@localhost:80/_%s"''' % ID)
 
        response.mustcontain('''id="clone_url" readonly="readonly" value="http://%s@localhost:80/%s"''' % (TEST_USER_ADMIN_LOGIN, HG_REPO))
 
        response.mustcontain('''id="clone_url_id" readonly="readonly" value="http://%s@localhost:80/_%s"''' % (TEST_USER_ADMIN_LOGIN, ID))
 

	
 
    def test_index_git(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(GIT_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name=GIT_REPO))
 

	
 
        #repo type
 
        response.mustcontain(
 
            """<span class="repotag">git"""
 
        )
 
        #public/private
 
        response.mustcontain(
 
            """<i class="icon-globe">"""
 
        )
 

	
 
        # clone url...
 
        response.mustcontain('''id="clone_url" readonly="readonly" value="http://test_admin@localhost:80/%s"''' % GIT_REPO)
 
        response.mustcontain('''id="clone_url_id" readonly="readonly" value="http://test_admin@localhost:80/_%s"''' % ID)
 
        response.mustcontain('''id="clone_url" readonly="readonly" value="http://%s@localhost:80/%s"''' % (TEST_USER_ADMIN_LOGIN, GIT_REPO))
 
        response.mustcontain('''id="clone_url_id" readonly="readonly" value="http://%s@localhost:80/_%s"''' % (TEST_USER_ADMIN_LOGIN, ID))
 

	
 
    def test_index_by_id_hg(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(HG_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name='_%s' % ID))
 

	
 
        #repo type
 
        response.mustcontain(
 
            """<span class="repotag">hg"""
 
        )
 
        #public/private
 
        response.mustcontain(
 
            """<i class="icon-globe">"""
 
        )
 

	
 
    def test_index_by_repo_having_id_path_in_name_hg(self):
 
        self.log_user()
 
        fixture.create_repo(name='repo_1')
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name='repo_1'))
 

	
 
        try:
 
            response.mustcontain("repo_1")
 
        finally:
 
            RepoModel().delete(Repository.get_by_repo_name('repo_1'))
 
            Session().commit()
 

	
 
    def test_index_by_id_git(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(GIT_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name='_%s' % ID))
 

	
 
        #repo type
 
        response.mustcontain(
 
            """<span class="repotag">git"""
 
        )
 
        #public/private
 
        response.mustcontain(
 
            """<i class="icon-globe">"""
 
        )
 

	
 
    def _enable_stats(self, repo):
 
        r = Repository.get_by_repo_name(repo)
 
        r.enable_statistics = True
 
        Session().add(r)
 
        Session().commit()
 

	
 
    def test_index_trending(self):
 
        self.log_user()
 
        #codes stats
 
        self._enable_stats(HG_REPO)
 

	
 
        ScmModel().mark_for_invalidation(HG_REPO)
 
        response = self.app.get(url(controller='summary', action='index',
 
                                    repo_name=HG_REPO))
 
        response.mustcontain(
 
            '[["py", {"count": 68, "desc": ["Python"]}], '
 
            '["rst", {"count": 16, "desc": ["Rst"]}], '
 
            '["css", {"count": 2, "desc": ["Css"]}], '
 
            '["sh", {"count": 2, "desc": ["Bash"]}], '
 
            '["yml", {"count": 1, "desc": ["Yaml"]}], '
 
            '["makefile", {"count": 1, "desc": ["Makefile", "Makefile"]}], '
 
            '["js", {"count": 1, "desc": ["Javascript"]}], '
 
            '["cfg", {"count": 1, "desc": ["Ini"]}], '
 
            '["ini", {"count": 1, "desc": ["Ini"]}], '
 
            '["html", {"count": 1, "desc": ["EvoqueHtml", "Html"]}]];'
 
        )
 

	
 
    def test_index_statistics(self):
 
        self.log_user()
 
        #codes stats
 
        self._enable_stats(HG_REPO)
 

	
 
        ScmModel().mark_for_invalidation(HG_REPO)
 
        response = self.app.get(url(controller='summary', action='statistics',
 
                                    repo_name=HG_REPO))
 

	
 
    def test_index_trending_git(self):
 
        self.log_user()
 
        #codes stats
 
        self._enable_stats(GIT_REPO)
 

	
 
        ScmModel().mark_for_invalidation(GIT_REPO)
 
        response = self.app.get(url(controller='summary', action='index',
 
                                    repo_name=GIT_REPO))
 
        response.mustcontain(
 
            '[["py", {"count": 68, "desc": ["Python"]}], '
 
            '["rst", {"count": 16, "desc": ["Rst"]}], '
 
            '["css", {"count": 2, "desc": ["Css"]}], '
 
            '["sh", {"count": 2, "desc": ["Bash"]}], '
 
            '["makefile", {"count": 1, "desc": ["Makefile", "Makefile"]}], '
 
            '["js", {"count": 1, "desc": ["Javascript"]}], '
 
            '["cfg", {"count": 1, "desc": ["Ini"]}], '
 
            '["ini", {"count": 1, "desc": ["Ini"]}], '
 
            '["html", {"count": 1, "desc": ["EvoqueHtml", "Html"]}], '
 
            '["bat", {"count": 1, "desc": ["Batch"]}]];'
 
        )
 

	
 
    def test_index_statistics_git(self):
 
        self.log_user()
 
        #codes stats
 
        self._enable_stats(GIT_REPO)
 

	
 
        ScmModel().mark_for_invalidation(GIT_REPO)
 
        response = self.app.get(url(controller='summary', action='statistics',
 
                                    repo_name=GIT_REPO))
kallithea/tests/scripts/manual_test_concurrency.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.tests.test_hg_operations
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Test suite for making push/pull operations
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Dec 30, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
import os
 
import sys
 
import shutil
 
import logging
 
from os.path import join as jn
 
from os.path import dirname as dn
 

	
 
from tempfile import _RandomNameSequence
 
from subprocess import Popen, PIPE
 

	
 
from paste.deploy import appconfig
 
from sqlalchemy import engine_from_config
 

	
 
from kallithea.lib.utils import add_cache
 
from kallithea.model import init_model
 
from kallithea.model import meta
 
from kallithea.model.db import User, Repository
 
from kallithea.lib.auth import get_crypt_password
 

	
 
from kallithea.tests import TESTS_TMP_PATH, HG_REPO
 
from kallithea.config.environment import load_environment
 

	
 
rel_path = dn(dn(dn(dn(os.path.abspath(__file__)))))
 
conf = appconfig('config:development.ini', relative_to=rel_path)
 
load_environment(conf.global_conf, conf.local_conf)
 

	
 
add_cache(conf)
 

	
 
USER = 'test_admin'
 
USER = TEST_USER_ADMIN_LOGIN
 
PASS = 'test12'
 
HOST = 'server.local'
 
METHOD = 'pull'
 
DEBUG = True
 
log = logging.getLogger(__name__)
 

	
 

	
 
class Command(object):
 

	
 
    def __init__(self, cwd):
 
        self.cwd = cwd
 

	
 
    def execute(self, cmd, *args):
 
        """Runs command on the system with given ``args``.
 
        """
 

	
 
        command = cmd + ' ' + ' '.join(args)
 
        log.debug('Executing %s' % command)
 
        if DEBUG:
 
            print command
 
        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
 
        stdout, stderr = p.communicate()
 
        if DEBUG:
 
            print stdout, stderr
 
        return stdout, stderr
 

	
 

	
 
def get_session():
 
    engine = engine_from_config(conf, 'sqlalchemy.db1.')
 
    init_model(engine)
 
    sa = meta.Session
 
    return sa
 

	
 

	
 
def create_test_user(force=True):
 
    print 'creating test user'
 
    sa = get_session()
 

	
 
    user = sa.query(User).filter(User.username == USER).scalar()
 

	
 
    if force and user is not None:
 
        print 'removing current user'
 
        for repo in sa.query(Repository).filter(Repository.user == user).all():
 
            sa.delete(repo)
 
        sa.delete(user)
 
        sa.commit()
 

	
 
    if user is None or force:
 
        print 'creating new one'
 
        new_usr = User()
 
        new_usr.username = USER
 
        new_usr.password = get_crypt_password(PASS)
 
        new_usr.email = 'mail@mail.com'
 
        new_usr.name = 'test'
 
        new_usr.lastname = 'lasttestname'
 
        new_usr.active = True
 
        new_usr.admin = True
 
        sa.add(new_usr)
 
        sa.commit()
 

	
 
    print 'done'
 

	
 

	
 
def create_test_repo(force=True):
 
    print 'creating test repo'
 
    from kallithea.model.repo import RepoModel
 
    sa = get_session()
 

	
 
    user = sa.query(User).filter(User.username == USER).scalar()
 
    if user is None:
 
        raise Exception('user not found')
 

	
 
    repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
 

	
 
    if repo is None:
 
        print 'repo not found creating'
 

	
 
        form_data = {'repo_name': HG_REPO,
 
                     'repo_type': 'hg',
 
                     'private':False,
 
                     'clone_uri': '' }
 
        rm = RepoModel(sa)
 
        rm.base_path = '/home/hg'
 
        rm.create(form_data, user)
 

	
 
    print 'done'
 

	
 

	
 
def set_anonymous_access(enable=True):
 
    sa = get_session()
 
    user = sa.query(User).filter(User.username == 'default').one()
 
    user.active = enable
 
    sa.add(user)
 
    sa.commit()
 

	
 

	
 
def get_anonymous_access():
 
    sa = get_session()
 
    return sa.query(User).filter(User.username == 'default').one().active
 

	
 

	
 
#==============================================================================
 
# TESTS
 
#==============================================================================
 
def test_clone_with_credentials(no_errors=False, repo=HG_REPO, method=METHOD,
 
                                seq=None, backend='hg'):
 
    cwd = path = jn(TESTS_TMP_PATH, repo)
 

	
 
    if seq is None:
 
        seq = _RandomNameSequence().next()
 

	
 
    try:
 
        shutil.rmtree(path, ignore_errors=True)
 
        os.makedirs(path)
 
        #print 'made dirs %s' % jn(path)
 
    except OSError:
 
        raise
 

	
 
    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
 
                  {'user': USER,
 
                   'pass': PASS,
 
                   'host': HOST,
 
                   'cloned_repo': repo, }
 

	
 
    dest = path + seq
 
    if method == 'pull':
 
        stdout, stderr = Command(cwd).execute(backend, method, '--cwd', dest, clone_url)
 
    else:
 
        stdout, stderr = Command(cwd).execute(backend, method, clone_url, dest)
 
        print stdout,'sdasdsadsa'
 
        if not no_errors:
 
            if backend == 'hg':
 
                assert """adding file changes""" in stdout, 'no messages about cloning'
 
                assert """abort""" not in stderr , 'got error from clone'
 
            elif backend == 'git':
 
                assert """Cloning into""" in stdout, 'no messages about cloning'
 

	
 
if __name__ == '__main__':
 
    try:
 
        create_test_user(force=False)
 
        seq = None
 
        import time
 

	
 
        try:
 
            METHOD = sys.argv[3]
 
        except IndexError:
 
            pass
 

	
 
        try:
 
            backend = sys.argv[4]
 
        except IndexError:
 
            backend = 'hg'
 

	
 
        if METHOD == 'pull':
 
            seq = _RandomNameSequence().next()
 
            test_clone_with_credentials(repo=sys.argv[1], method='clone',
 
                                        seq=seq, backend=backend)
 
        s = time.time()
 
        for i in range(1, int(sys.argv[2]) + 1):
 
            print 'take', i
 
            test_clone_with_credentials(repo=sys.argv[1], method=METHOD,
 
                                        seq=seq, backend=backend)
 
        print 'time taken %.3f' % (time.time() - s)
 
    except Exception, e:
 
        sys.exit('stop on %s' % e)
0 comments (0 inline, 0 general)