Changeset - be1d366f461c
[Not reviewed]
default
0 16 0
Thomas De Schampheleire - 10 years ago 2016-05-14 21:04:26
thomas.de.schampheleire@gmail.com
pytest migration: functional: switch to standard assert statements

Use unittest2pytest to replace unittest-style assert statements (e.g.
assertEqual) with standard Python assert statements to benefit from pytest's
improved reporting on assert failures.

The conversion by unittest2pytest was correct, except for line wrapping
problems.
16 files changed with 204 insertions and 239 deletions:
0 comments (0 inline, 0 general)
kallithea/tests/functional/test_admin_auth_settings.py
Show inline comments
 
@@ -44,14 +44,13 @@ class TestAuthSettingsController(TestCon
 
                       action='auth_settings')
 

	
 
        response = self.app.post(url=test_url, params=params)
 
        self.checkSessionFlash(response, 'Auth settings updated successfully')
 

	
 
        new_settings = Setting.get_auth_settings()
 
        self.assertEqual(new_settings['auth_ldap_host'], u'dc.example.com',
 
                         'fail db write compare')
 
        assert new_settings['auth_ldap_host'] == u'dc.example.com', 'fail db write compare'
 

	
 
    @skipif(not ldap_lib_installed, reason='skipping due to missing ldap lib')
 
    def test_ldap_error_form_wrong_port_number(self):
 
        self.log_user()
 

	
 
        params = self._enable_plugins('kallithea.lib.auth_modules.auth_internal,kallithea.lib.auth_modules.auth_ldap')
 
@@ -157,15 +156,15 @@ class TestAuthSettingsController(TestCon
 
            extra_environ={'THE_USER_NAME': 'johnd',
 
                           'THE_USER_EMAIL': 'john@example.org',
 
                           'THE_USER_FIRSTNAME': 'John',
 
                           'THE_USER_LASTNAME': 'Doe',
 
                           }
 
        )
 
        self.assertEqual(response.form['email'].value, 'john@example.org')
 
        self.assertEqual(response.form['firstname'].value, 'John')
 
        self.assertEqual(response.form['lastname'].value, 'Doe')
 
        assert response.form['email'].value == 'john@example.org'
 
        assert response.form['firstname'].value == 'John'
 
        assert response.form['lastname'].value == 'Doe'
 

	
 

	
 
    def test_container_auth_login_fallback_header(self):
 
        self._container_auth_setup(
 
            auth_container_header='THE_USER_NAME',
 
            auth_container_email_header='',
 
@@ -217,13 +216,13 @@ class TestAuthSettingsController(TestCon
 
            auth_container_clean_username='True',
 
        )
 
        response = self.app.get(
 
            url=url(controller='admin/my_account', action='my_account'),
 
            extra_environ={'REMOTE_USER': 'john'},
 
        )
 
        self.assertNotIn('Log Out', response.normal_body)
 
        assert 'Log Out' not in response.normal_body
 

	
 
    def test_crowd_save_settings(self):
 
        self.log_user()
 

	
 
        params = self._enable_plugins('kallithea.lib.auth_modules.auth_internal,kallithea.lib.auth_modules.auth_crowd')
 
        params.update({'auth_crowd_host': ' hostname ',
 
@@ -236,14 +235,13 @@ class TestAuthSettingsController(TestCon
 
                       action='auth_settings')
 

	
 
        response = self.app.post(url=test_url, params=params)
 
        self.checkSessionFlash(response, 'Auth settings updated successfully')
 

	
 
        new_settings = Setting.get_auth_settings()
 
        self.assertEqual(new_settings['auth_crowd_host'], u'hostname',
 
                         'fail db write compare')
 
        assert new_settings['auth_crowd_host'] == u'hostname', 'fail db write compare'
 

	
 
    @skipif(not pam_lib_installed, reason='skipping due to missing pam lib')
 
    def test_pam_save_settings(self):
 
        self.log_user()
 

	
 
        params = self._enable_plugins('kallithea.lib.auth_modules.auth_internal,kallithea.lib.auth_modules.auth_pam')
 
@@ -254,8 +252,7 @@ class TestAuthSettingsController(TestCon
 
                       action='auth_settings')
 

	
 
        response = self.app.post(url=test_url, params=params)
 
        self.checkSessionFlash(response, 'Auth settings updated successfully')
 

	
 
        new_settings = Setting.get_auth_settings()
 
        self.assertEqual(new_settings['auth_pam_service'], u'kallithea',
 
                         'fail db write compare')
 
        assert new_settings['auth_pam_service'] == u'kallithea', 'fail db write compare'
kallithea/tests/functional/test_admin_defaults.py
Show inline comments
 
@@ -37,13 +37,13 @@ class TestDefaultsController(TestControl
 
        }
 
        response = self.app.put(url('default', id='default'), params=params)
 
        self.checkSessionFlash(response, 'Default settings updated successfully')
 

	
 
        params.pop('_authentication_token')
 
        defs = Setting.get_default_repo_settings()
 
        self.assertEqual(params, defs)
 
        assert params == defs
 

	
 
    def test_update_params_false_git(self):
 
        self.log_user()
 
        params = {
 
            'default_repo_enable_locking': False,
 
            'default_repo_enable_downloads': False,
 
@@ -54,13 +54,13 @@ class TestDefaultsController(TestControl
 
        }
 
        response = self.app.put(url('default', id='default'), params=params)
 
        self.checkSessionFlash(response, 'Default settings updated successfully')
 

	
 
        params.pop('_authentication_token')
 
        defs = Setting.get_default_repo_settings()
 
        self.assertEqual(params, defs)
 
        assert params == defs
 

	
 
    def test_update_browser_fakeout(self):
 
        response = self.app.post(url('default', id=1), params=dict(_method='put', _authentication_token=self.authentication_token()))
 

	
 
    def test_delete(self):
 
        # Not possible due to CSRF protection.
kallithea/tests/functional/test_admin_gists.py
Show inline comments
 
@@ -160,17 +160,17 @@ class TestGistsController(TestController
 
        response.mustcontain('<div class="btn btn-mini btn-success disabled">Public Gist</div>')
 

	
 
    def test_show_as_raw(self):
 
        gist = _create_gist('gist-show-me', content='GIST CONTENT')
 
        response = self.app.get(url('formatted_gist',
 
                                    gist_id=gist.gist_access_id, format='raw'))
 
        self.assertEqual(response.body, 'GIST CONTENT')
 
        assert response.body == 'GIST CONTENT'
 

	
 
    def test_show_as_raw_individual_file(self):
 
        gist = _create_gist('gist-show-me-raw', content='GIST BODY')
 
        response = self.app.get(url('formatted_gist_file',
 
                                    gist_id=gist.gist_access_id, format='raw',
 
                                    revision='tip', f_path='gist-show-me-raw'))
 
        self.assertEqual(response.body, 'GIST BODY')
 
        assert response.body == 'GIST BODY'
 

	
 
    def test_edit(self):
 
        response = self.app.get(url('edit_gist', gist_id=1))
kallithea/tests/functional/test_admin_notifications.py
Show inline comments
 
@@ -49,24 +49,24 @@ class TestNotificationsController(TestCo
 
        Session().commit()
 
        u1 = User.get(u1.user_id)
 
        u2 = User.get(u2.user_id)
 

	
 
        # check DB
 
        get_notif = lambda un: [x.notification for x in un]
 
        self.assertEqual(get_notif(cur_user.notifications), [notification])
 
        self.assertEqual(get_notif(u1.notifications), [notification])
 
        self.assertEqual(get_notif(u2.notifications), [notification])
 
        assert get_notif(cur_user.notifications) == [notification]
 
        assert get_notif(u1.notifications) == [notification]
 
        assert get_notif(u2.notifications) == [notification]
 
        cur_usr_id = cur_user.user_id
 

	
 
        response = self.app.post(
 
            url('notification', notification_id=notification.notification_id),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
        self.assertEqual(response.body, 'ok')
 
        assert response.body == 'ok'
 

	
 
        cur_user = User.get(cur_usr_id)
 
        self.assertEqual(cur_user.notifications, [])
 
        assert cur_user.notifications == []
 

	
 
    def test_show(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 
        u1 = UserModel().create_or_update(username='u1', password='qweqwe',
 
                                          email='u1@example.com',
 
@@ -95,31 +95,25 @@ class TestNotificationsController(TestCo
 
        notify_body = u'hi there'
 
        notification = NotificationModel().create(created_by = cur_user,
 
                                                  subject    = subject,
 
                                                  body       = notify_body)
 

	
 
        description = NotificationModel().make_description(notification)
 
        self.assertEqual(
 
            description,
 
            "{0} sent message {1}".format(
 
        assert description == "{0} sent message {1}".format(
 
                cur_user.username,
 
                h.age(notification.created_on)
 
                )
 
            )
 

	
 
    def test_description_with_datetime(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 
        subject = u'test'
 
        notify_body = u'hi there'
 
        notification = NotificationModel().create(created_by = cur_user,
 
                                                  subject    = subject,
 
                                                  body       = notify_body)
 

	
 
        description = NotificationModel().make_description(notification, False)
 
        self.assertEqual(
 
            description,
 
            "{0} sent message at {1}".format(
 
        assert description == "{0} sent message at {1}".format(
 
                cur_user.username,
 
                h.fmt_date(notification.created_on)
 
                )
 
            )
kallithea/tests/functional/test_admin_repos.py
Show inline comments
 
@@ -50,23 +50,23 @@ class _BaseTestCase(TestControllerPytest
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                _authentication_token=self.authentication_token()))
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name))
 
        self.assertEqual(response.json, {u'result': True})
 
        assert response.json == {u'result': True}
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name, repo_name))
 

	
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name)
 
        self.assertEqual(new_repo.description, description)
 
        assert new_repo.repo_name == repo_name
 
        assert new_repo.description == description
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name))
 
        response.mustcontain(repo_name)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
@@ -98,32 +98,32 @@ class _BaseTestCase(TestControllerPytest
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                repo_group=gr.group_id,
 
                                                _authentication_token=self.authentication_token()))
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
 
        self.assertEqual(response.json, {u'result': True})
 
        assert response.json == {u'result': True}
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name_full, repo_name_full))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name_full).one()
 
        new_repo_id = new_repo.repo_id
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_full)
 
        self.assertEqual(new_repo.description, description)
 
        assert new_repo.repo_name == repo_name_full
 
        assert new_repo.description == description
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        self.assertEqual(len(inherited_perms), 1)
 
        assert len(inherited_perms) == 1
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full)))
 
        except vcs.exceptions.VCSError:
 
            RepoGroupModel().delete(group_name)
 
@@ -190,32 +190,32 @@ class _BaseTestCase(TestControllerPytest
 
                                                repo_description=description,
 
                                                repo_group=gr_allowed.group_id,
 
                                                _authentication_token=authentication_token))
 

	
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
 
        self.assertEqual(response.json, {u'result': True})
 
        assert response.json == {u'result': True}
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name_full, repo_name_full))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name_full).one()
 
        new_repo_id = new_repo.repo_id
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_full)
 
        self.assertEqual(new_repo.description, description)
 
        assert new_repo.repo_name == repo_name_full
 
        assert new_repo.description == description
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        self.assertEqual(len(inherited_perms), 1)
 
        assert len(inherited_perms) == 1
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full)))
 
        except vcs.exceptions.VCSError:
 
            RepoGroupModel().delete(group_name)
 
@@ -260,14 +260,14 @@ class _BaseTestCase(TestControllerPytest
 
                               % (repo_name_full, repo_name_full))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name_full).one()
 
        new_repo_id = new_repo.repo_id
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_full)
 
        self.assertEqual(new_repo.description, description)
 
        assert new_repo.repo_name == repo_name_full
 
        assert new_repo.description == description
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
@@ -279,18 +279,18 @@ class _BaseTestCase(TestControllerPytest
 
            Session().commit()
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        #check if inherited permissiona are applied
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        self.assertEqual(len(inherited_perms), 2)
 
        assert len(inherited_perms) == 2
 

	
 
        self.assertTrue(TEST_USER_REGULAR_LOGIN in [x.user.username
 
                                                    for x in inherited_perms])
 
        self.assertTrue('repository.write' in [x.permission.permission_name
 
                                               for x in inherited_perms])
 
        assert TEST_USER_REGULAR_LOGIN in [x.user.username
 
                                                    for x in inherited_perms]
 
        assert 'repository.write' in [x.permission.permission_name
 
                                               for x in inherited_perms]
 

	
 
        RepoModel().delete(repo_name_full)
 
        RepoGroupModel().delete(group_name)
 
        Session().commit()
 

	
 
    def test_create_remote_repo_wrong_clone_uri(self):
 
@@ -337,14 +337,14 @@ class _BaseTestCase(TestControllerPytest
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name, repo_name))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name)
 
        self.assertEqual(new_repo.description, description)
 
        assert new_repo.repo_name == repo_name
 
        assert new_repo.description == description
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name))
 
        response.mustcontain(repo_name)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
@@ -362,16 +362,15 @@ class _BaseTestCase(TestControllerPytest
 
        response.follow()
 

	
 
        #check if repo was deleted from db
 
        deleted_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name).scalar()
 

	
 
        self.assertEqual(deleted_repo, None)
 
        assert deleted_repo == None
 

	
 
        self.assertEqual(os.path.isdir(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name)),
 
                                  False)
 
        assert os.path.isdir(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name)) == False
 

	
 
    def test_delete_non_ascii(self):
 
        self.log_user()
 
        non_ascii = "ąęł"
 
        repo_name = "%s%s" % (safe_str(self.NEW_REPO), non_ascii)
 
        repo_name_unicode = safe_unicode(repo_name)
 
@@ -382,22 +381,22 @@ class _BaseTestCase(TestControllerPytest
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                _authentication_token=self.authentication_token()))
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name))
 
        self.assertEqual(response.json, {u'result': True})
 
        assert response.json == {u'result': True}
 
        self.checkSessionFlash(response,
 
                               u'Created repository <a href="/%s">%s</a>'
 
                               % (urllib.quote(repo_name), repo_name_unicode))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name_unicode).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_unicode)
 
        self.assertEqual(new_repo.description, description_unicode)
 
        assert new_repo.repo_name == repo_name_unicode
 
        assert new_repo.description == description_unicode
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name))
 
        response.mustcontain(repo_name)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
@@ -413,16 +412,15 @@ class _BaseTestCase(TestControllerPytest
 
        response.follow()
 

	
 
        #check if repo was deleted from db
 
        deleted_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name_unicode).scalar()
 

	
 
        self.assertEqual(deleted_repo, None)
 
        assert deleted_repo == None
 

	
 
        self.assertEqual(os.path.isdir(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_unicode)),
 
                                  False)
 
        assert os.path.isdir(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_unicode)) == False
 

	
 
    def test_delete_repo_with_group(self):
 
        #TODO:
 
        pass
 

	
 
    def test_delete_browser_fakeout(self):
 
@@ -437,45 +435,45 @@ class _BaseTestCase(TestControllerPytest
 
        response = self.app.get(url('edit_repo', repo_name=self.REPO))
 

	
 
    def test_set_private_flag_sets_default_to_none(self):
 
        self.log_user()
 
        #initially repository perm should be read
 
        perm = _get_permission_for_user(user='default', repo=self.REPO)
 
        self.assertTrue(len(perm), 1)
 
        self.assertEqual(perm[0].permission.permission_name, 'repository.read')
 
        self.assertEqual(Repository.get_by_repo_name(self.REPO).private, False)
 
        assert len(perm), 1
 
        assert perm[0].permission.permission_name == 'repository.read'
 
        assert Repository.get_by_repo_name(self.REPO).private == False
 

	
 
        response = self.app.put(url('put_repo', repo_name=self.REPO),
 
                        fixture._get_repo_create_params(repo_private=1,
 
                                                repo_name=self.REPO,
 
                                                repo_type=self.REPO_TYPE,
 
                                                user=TEST_USER_ADMIN_LOGIN,
 
                                                _authentication_token=self.authentication_token()))
 
        self.checkSessionFlash(response,
 
                               msg='Repository %s updated successfully' % (self.REPO))
 
        self.assertEqual(Repository.get_by_repo_name(self.REPO).private, True)
 
        assert Repository.get_by_repo_name(self.REPO).private == True
 

	
 
        #now the repo default permission should be None
 
        perm = _get_permission_for_user(user='default', repo=self.REPO)
 
        self.assertTrue(len(perm), 1)
 
        self.assertEqual(perm[0].permission.permission_name, 'repository.none')
 
        assert len(perm), 1
 
        assert perm[0].permission.permission_name == 'repository.none'
 

	
 
        response = self.app.put(url('put_repo', repo_name=self.REPO),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=self.REPO,
 
                                                repo_type=self.REPO_TYPE,
 
                                                user=TEST_USER_ADMIN_LOGIN,
 
                                                _authentication_token=self.authentication_token()))
 
        self.checkSessionFlash(response,
 
                               msg='Repository %s updated successfully' % (self.REPO))
 
        self.assertEqual(Repository.get_by_repo_name(self.REPO).private, False)
 
        assert Repository.get_by_repo_name(self.REPO).private == False
 

	
 
        #we turn off private now the repo default permission should stay None
 
        perm = _get_permission_for_user(user='default', repo=self.REPO)
 
        self.assertTrue(len(perm), 1)
 
        self.assertEqual(perm[0].permission.permission_name, 'repository.none')
 
        assert len(perm), 1
 
        assert perm[0].permission.permission_name == 'repository.none'
 

	
 
        #update this permission back
 
        perm[0].permission = Permission.get_by_key('repository.read')
 
        Session().add(perm[0])
 
        Session().commit()
 

	
 
@@ -588,16 +586,16 @@ class _BaseTestCase(TestControllerPytest
 
                                                _authentication_token=self.authentication_token()))
 

	
 
        self.checkSessionFlash(response,
 
                               'Error creating repository %s' % repo_name)
 
        # repo must not be in db
 
        repo = Repository.get_by_repo_name(repo_name)
 
        self.assertEqual(repo, None)
 
        assert repo == None
 

	
 
        # repo must not be in filesystem !
 
        self.assertFalse(os.path.isdir(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name)))
 
        assert not os.path.isdir(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name))
 

	
 
class TestAdminReposControllerGIT(_BaseTestCase):
 
    REPO = GIT_REPO
 
    REPO_TYPE = 'git'
 
    NEW_REPO = NEW_GIT_REPO
 
    OTHER_TYPE_REPO = HG_REPO
kallithea/tests/functional/test_admin_settings.py
Show inline comments
 
@@ -86,14 +86,13 @@ class TestAdminSettingsController(TestCo
 
                                 captcha_public_key='',
 
                                 _authentication_token=self.authentication_token(),
 
                                 ))
 

	
 
        self.checkSessionFlash(response, 'Updated application settings')
 

	
 
        self.assertEqual(Setting
 
                         .get_app_settings()['ga_code'], new_ga_code)
 
        assert Setting.get_app_settings()['ga_code'] == new_ga_code
 

	
 
        response = response.follow()
 
        response.mustcontain("""_gaq.push(['_setAccount', '%s']);""" % new_ga_code)
 

	
 
    def test_ga_code_inactive(self):
 
        self.log_user()
 
@@ -107,14 +106,13 @@ class TestAdminSettingsController(TestCo
 
                                 captcha_private_key='',
 
                                 captcha_public_key='',
 
                                 _authentication_token=self.authentication_token(),
 
                                 ))
 

	
 
        self.checkSessionFlash(response, 'Updated application settings')
 
        self.assertEqual(Setting
 
                        .get_app_settings()['ga_code'], new_ga_code)
 
        assert Setting.get_app_settings()['ga_code'] == new_ga_code
 

	
 
        response = response.follow()
 
        response.mustcontain(no=["_gaq.push(['_setAccount', '%s']);" % new_ga_code])
 

	
 
    def test_captcha_activate(self):
 
        self.log_user()
 
@@ -128,14 +126,13 @@ class TestAdminSettingsController(TestCo
 
                                 captcha_private_key='1234567890',
 
                                 captcha_public_key='1234567890',
 
                                 _authentication_token=self.authentication_token(),
 
                                 ))
 

	
 
        self.checkSessionFlash(response, 'Updated application settings')
 
        self.assertEqual(Setting
 
                        .get_app_settings()['captcha_private_key'], '1234567890')
 
        assert Setting.get_app_settings()['captcha_private_key'] == '1234567890'
 

	
 
        response = self.app.get(url('register'))
 
        response.mustcontain('captcha')
 

	
 
    def test_captcha_deactivate(self):
 
        self.log_user()
 
@@ -149,14 +146,13 @@ class TestAdminSettingsController(TestCo
 
                                 captcha_private_key='',
 
                                 captcha_public_key='1234567890',
 
                                 _authentication_token=self.authentication_token(),
 
                                 ))
 

	
 
        self.checkSessionFlash(response, 'Updated application settings')
 
        self.assertEqual(Setting
 
                        .get_app_settings()['captcha_private_key'], '')
 
        assert Setting.get_app_settings()['captcha_private_key'] == ''
 

	
 
        response = self.app.get(url('register'))
 
        response.mustcontain(no=['captcha'])
 

	
 
    def test_title_change(self):
 
        self.log_user()
 
@@ -172,12 +168,10 @@ class TestAdminSettingsController(TestCo
 
                                 captcha_private_key='',
 
                                 captcha_public_key='',
 
                                 _authentication_token=self.authentication_token(),
 
                                ))
 

	
 
            self.checkSessionFlash(response, 'Updated application settings')
 
            self.assertEqual(Setting
 
                             .get_app_settings()['title'],
 
                             new_title.decode('utf-8'))
 
            assert Setting.get_app_settings()['title'] == new_title.decode('utf-8')
 

	
 
            response = response.follow()
 
            response.mustcontain("""<div class="branding">%s</div>""" % new_title)
kallithea/tests/functional/test_admin_user_groups.py
Show inline comments
 
@@ -57,13 +57,13 @@ class TestAdminUsersGroupsController(Tes
 
        response = self.app.post(url('users_group', id=gr.users_group_id),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 

	
 
        gr = Session().query(UserGroup) \
 
            .filter(UserGroup.users_group_name == users_group_name).scalar()
 

	
 
        self.assertEqual(gr, None)
 
        assert gr == None
 

	
 
    def test_default_perms_enable_repository_read_on_group(self):
 
        self.log_user()
 
        users_group_name = TEST_USER_GROUP + 'another2'
 
        response = self.app.post(url('users_groups'),
 
                                 {'users_group_name': users_group_name,
 
@@ -87,17 +87,15 @@ class TestAdminUsersGroupsController(Tes
 
        p3 = Permission.get_by_key('hg.fork.none')
 
        # check if user has this perms, they should be here since
 
        # defaults are on
 
        perms = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group == ug).all()
 

	
 
        self.assertEqual(
 
            sorted([[x.users_group_id, x.permission_id, ] for x in perms]),
 
            sorted([[ug.users_group_id, p.permission_id],
 
        assert sorted([[x.users_group_id, x.permission_id, ] for x in perms]) == sorted([[ug.users_group_id, p.permission_id],
 
                    [ug.users_group_id, p2.permission_id],
 
                    [ug.users_group_id, p3.permission_id]]))
 
                    [ug.users_group_id, p3.permission_id]])
 

	
 
        ## DISABLE REPO CREATE ON A GROUP
 
        response = self.app.put(
 
            url('edit_user_group_default_perms', id=ug.users_group_id),
 
            params={'_authentication_token': self.authentication_token()})
 

	
 
@@ -109,34 +107,32 @@ class TestAdminUsersGroupsController(Tes
 

	
 
        # check if user has this perms, they should be here since
 
        # defaults are on
 
        perms = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group == ug).all()
 

	
 
        self.assertEqual(
 
            sorted([[x.users_group_id, x.permission_id, ] for x in perms]),
 
            sorted([[ug.users_group_id, p.permission_id],
 
        assert sorted([[x.users_group_id, x.permission_id, ] for x in perms]) == sorted([[ug.users_group_id, p.permission_id],
 
                    [ug.users_group_id, p2.permission_id],
 
                    [ug.users_group_id, p3.permission_id]]))
 
                    [ug.users_group_id, p3.permission_id]])
 

	
 
        # DELETE !
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        ugid = ug.users_group_id
 
        response = self.app.post(url('users_group', id=ug.users_group_id),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
        response = response.follow()
 
        gr = Session().query(UserGroup) \
 
            .filter(UserGroup.users_group_name == users_group_name).scalar()
 

	
 
        self.assertEqual(gr, None)
 
        assert gr == None
 
        p = Permission.get_by_key('hg.create.repository')
 
        perms = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group_id == ugid).all()
 
        perms = [[x.users_group_id,
 
                  x.permission_id, ] for x in perms]
 
        self.assertEqual(perms, [])
 
        assert perms == []
 

	
 
    def test_default_perms_enable_repository_fork_on_group(self):
 
        self.log_user()
 
        users_group_name = TEST_USER_GROUP + 'another2'
 
        response = self.app.post(url('users_groups'),
 
                                 {'users_group_name': users_group_name,
 
@@ -160,17 +156,15 @@ class TestAdminUsersGroupsController(Tes
 
        p3 = Permission.get_by_key('hg.fork.repository')
 
        # check if user has this perms, they should be here since
 
        # defaults are on
 
        perms = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group == ug).all()
 

	
 
        self.assertEqual(
 
            sorted([[x.users_group_id, x.permission_id, ] for x in perms]),
 
            sorted([[ug.users_group_id, p.permission_id],
 
        assert sorted([[x.users_group_id, x.permission_id, ] for x in perms]) == sorted([[ug.users_group_id, p.permission_id],
 
                    [ug.users_group_id, p2.permission_id],
 
                    [ug.users_group_id, p3.permission_id]]))
 
                    [ug.users_group_id, p3.permission_id]])
 

	
 
        ## DISABLE REPO CREATE ON A GROUP
 
        response = self.app.put(url('edit_user_group_default_perms', id=ug.users_group_id),
 
            params={'_authentication_token': self.authentication_token()})
 

	
 
        response.follow()
 
@@ -180,38 +174,33 @@ class TestAdminUsersGroupsController(Tes
 
        p3 = Permission.get_by_key('hg.fork.none')
 
        # check if user has this perms, they should be here since
 
        # defaults are on
 
        perms = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group == ug).all()
 

	
 
        self.assertEqual(
 
            sorted([[x.users_group_id, x.permission_id, ] for x in perms]),
 
            sorted([[ug.users_group_id, p.permission_id],
 
        assert sorted([[x.users_group_id, x.permission_id, ] for x in perms]) == sorted([[ug.users_group_id, p.permission_id],
 
                    [ug.users_group_id, p2.permission_id],
 
                    [ug.users_group_id, p3.permission_id]]))
 
                    [ug.users_group_id, p3.permission_id]])
 

	
 
        # DELETE !
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        ugid = ug.users_group_id
 
        response = self.app.post(url('users_group', id=ug.users_group_id),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
        response = response.follow()
 
        gr = Session().query(UserGroup) \
 
                           .filter(UserGroup.users_group_name ==
 
                                   users_group_name).scalar()
 

	
 
        self.assertEqual(gr, None)
 
        assert gr == None
 
        p = Permission.get_by_key('hg.fork.repository')
 
        perms = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group_id == ugid).all()
 
        perms = [[x.users_group_id,
 
                  x.permission_id, ] for x in perms]
 
        self.assertEqual(
 
            perms,
 
            []
 
        )
 
        assert perms == []
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('users_group', id=1),
 
                                 params=dict(_method='delete', _authentication_token=self.authentication_token()))
 

	
 
    def test_show(self):
kallithea/tests/functional/test_admin_users.py
Show inline comments
 
@@ -67,17 +67,17 @@ class TestAdminUsersController(TestContr
 
        self.checkSessionFlash(response, '''Created user <a href="/_admin/users/''')
 
        self.checkSessionFlash(response, '''/edit">%s</a>''' % (username))
 

	
 
        new_user = Session().query(User). \
 
            filter(User.username == username).one()
 

	
 
        self.assertEqual(new_user.username, username)
 
        self.assertEqual(check_password(password, new_user.password), True)
 
        self.assertEqual(new_user.name, name)
 
        self.assertEqual(new_user.lastname, lastname)
 
        self.assertEqual(new_user.email, email)
 
        assert new_user.username == username
 
        assert check_password(password, new_user.password) == True
 
        assert new_user.name == name
 
        assert new_user.lastname == lastname
 
        assert new_user.email == email
 

	
 
        response.follow()
 
        response = response.follow()
 
        response.mustcontain("""newtestuser""")
 

	
 
    def test_create_err(self):
 
@@ -102,13 +102,14 @@ class TestAdminUsersController(TestContr
 
        response.mustcontain("""<span class="error-message">Please enter a value</span>""")
 
        response.mustcontain("""<span class="error-message">An email address must contain a single @</span>""")
 

	
 
        def get_user():
 
            Session().query(User).filter(User.username == username).one()
 

	
 
        self.assertRaises(NoResultFound, get_user), 'found user in database'
 
        with pytest.raises(NoResultFound):
 
            get_user(), 'found user in database'
 

	
 
    def test_new(self):
 
        self.log_user()
 
        response = self.app.get(url('new_user'))
 

	
 
    @parametrize('name,attrs',
 
@@ -157,13 +158,13 @@ class TestAdminUsersController(TestContr
 

	
 
        updated_user = User.get_by_username(self.test_user_1)
 
        updated_params = updated_user.get_api_data(True)
 
        updated_params.update({'password_confirmation': ''})
 
        updated_params.update({'new_password': ''})
 

	
 
        self.assertEqual(params, updated_params)
 
        assert params == updated_params
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        username = 'newtestuserdeleteme'
 

	
 
        fixture.create_user(name=username)
 
@@ -274,26 +275,26 @@ class TestAdminUsersController(TestContr
 
                                            lastname=u'b')
 
        Session().commit()
 
        uid = user.user_id
 

	
 
        try:
 
            #User should have None permission on creation repository
 
            self.assertEqual(UserModel().has_perm(user, perm_none), False)
 
            self.assertEqual(UserModel().has_perm(user, perm_create), False)
 
            assert UserModel().has_perm(user, perm_none) == False
 
            assert UserModel().has_perm(user, perm_create) == False
 

	
 
            response = self.app.post(url('edit_user_perms', id=uid),
 
                                     params=dict(_method='put',
 
                                                 create_repo_perm=True,
 
                                                 _authentication_token=self.authentication_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            #User should have None permission on creation repository
 
            self.assertEqual(UserModel().has_perm(uid, perm_none), False)
 
            self.assertEqual(UserModel().has_perm(uid, perm_create), True)
 
            assert UserModel().has_perm(uid, perm_none) == False
 
            assert UserModel().has_perm(uid, perm_create) == True
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_revoke_perm_create_repo(self):
 
        self.log_user()
 
@@ -305,24 +306,24 @@ class TestAdminUsersController(TestContr
 
                                            lastname=u'b')
 
        Session().commit()
 
        uid = user.user_id
 

	
 
        try:
 
            #User should have None permission on creation repository
 
            self.assertEqual(UserModel().has_perm(user, perm_none), False)
 
            self.assertEqual(UserModel().has_perm(user, perm_create), False)
 
            assert UserModel().has_perm(user, perm_none) == False
 
            assert UserModel().has_perm(user, perm_create) == False
 

	
 
            response = self.app.post(url('edit_user_perms', id=uid),
 
                                     params=dict(_method='put', _authentication_token=self.authentication_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            #User should have None permission on creation repository
 
            self.assertEqual(UserModel().has_perm(uid, perm_none), True)
 
            self.assertEqual(UserModel().has_perm(uid, perm_create), False)
 
            assert UserModel().has_perm(uid, perm_none) == True
 
            assert UserModel().has_perm(uid, perm_create) == False
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_add_perm_fork_repo(self):
 
        self.log_user()
 
@@ -334,26 +335,26 @@ class TestAdminUsersController(TestContr
 
                                            lastname=u'b')
 
        Session().commit()
 
        uid = user.user_id
 

	
 
        try:
 
            #User should have None permission on creation repository
 
            self.assertEqual(UserModel().has_perm(user, perm_none), False)
 
            self.assertEqual(UserModel().has_perm(user, perm_fork), False)
 
            assert UserModel().has_perm(user, perm_none) == False
 
            assert UserModel().has_perm(user, perm_fork) == False
 

	
 
            response = self.app.post(url('edit_user_perms', id=uid),
 
                                     params=dict(_method='put',
 
                                                 create_repo_perm=True,
 
                                                 _authentication_token=self.authentication_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            #User should have None permission on creation repository
 
            self.assertEqual(UserModel().has_perm(uid, perm_none), False)
 
            self.assertEqual(UserModel().has_perm(uid, perm_create), True)
 
            assert UserModel().has_perm(uid, perm_none) == False
 
            assert UserModel().has_perm(uid, perm_create) == True
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_revoke_perm_fork_repo(self):
 
        self.log_user()
 
@@ -365,24 +366,24 @@ class TestAdminUsersController(TestContr
 
                                            lastname=u'b')
 
        Session().commit()
 
        uid = user.user_id
 

	
 
        try:
 
            #User should have None permission on creation repository
 
            self.assertEqual(UserModel().has_perm(user, perm_none), False)
 
            self.assertEqual(UserModel().has_perm(user, perm_fork), False)
 
            assert UserModel().has_perm(user, perm_none) == False
 
            assert UserModel().has_perm(user, perm_fork) == False
 

	
 
            response = self.app.post(url('edit_user_perms', id=uid),
 
                                     params=dict(_method='put', _authentication_token=self.authentication_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            #User should have None permission on creation repository
 
            self.assertEqual(UserModel().has_perm(uid, perm_none), True)
 
            self.assertEqual(UserModel().has_perm(uid, perm_create), False)
 
            assert UserModel().has_perm(uid, perm_none) == True
 
            assert UserModel().has_perm(uid, perm_create) == False
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_ips(self):
 
        self.log_user()
 
@@ -484,19 +485,19 @@ class TestAdminUsersController(TestContr
 
                {'description': 'desc', 'lifetime': -1, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        response = response.follow()
 

	
 
        #now delete our key
 
        keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
 
        self.assertEqual(1, len(keys))
 
        assert 1 == len(keys)
 

	
 
        response = self.app.post(url('edit_user_api_keys', id=user_id),
 
                 {'_method': 'delete', 'del_api_key': keys[0].api_key, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully deleted')
 
        keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
 
        self.assertEqual(0, len(keys))
 
        assert 0 == len(keys)
 

	
 
    def test_reset_main_api_key(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 
        api_key = user.api_key
kallithea/tests/functional/test_changelog.py
Show inline comments
 
@@ -130,28 +130,28 @@ class TestChangelogController(TestContro
 

	
 
    def test_index_hg_with_filenode_that_is_dirnode(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
                                    revision='tip', f_path='/tests',
 
                                    repo_name=HG_REPO))
 
        self.assertEqual(response.status, '302 Found')
 
        assert response.status == '302 Found'
 

	
 
    def test_index_git_with_filenode_that_is_dirnode(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
                                    revision='tip', f_path='/tests',
 
                                    repo_name=GIT_REPO))
 
        self.assertEqual(response.status, '302 Found')
 
        assert response.status == '302 Found'
 

	
 
    def test_index_hg_with_filenode_not_existing(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
                                    revision='tip', f_path='/wrong_path',
 
                                    repo_name=HG_REPO))
 
        self.assertEqual(response.status, '302 Found')
 
        assert response.status == '302 Found'
 

	
 
    def test_index_git_with_filenode_not_existing(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
                                    revision='tip', f_path='/wrong_path',
 
                                    repo_name=GIT_REPO))
 
        self.assertEqual(response.status, '302 Found')
 
        assert response.status == '302 Found'
kallithea/tests/functional/test_changeset_comments.py
Show inline comments
 
@@ -20,35 +20,34 @@ class TestChangeSetCommentsController(Te
 

	
 
        params = {'text': text, '_authentication_token': self.authentication_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        # Test response...
 
        self.assertEqual(response.status, '200 OK')
 
        assert response.status == '200 OK'
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        # test DB
 
        self.assertEqual(ChangesetComment.query().count(), 1)
 
        assert ChangesetComment.query().count() == 1
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (0 inline, 1 general)'''
 
        )
 

	
 
        self.assertEqual(Notification.query().count(), 1)
 
        self.assertEqual(ChangesetComment.query().count(), 1)
 
        assert Notification.query().count() == 1
 
        assert ChangesetComment.query().count() == 1
 

	
 
        notification = Notification.query().all()[0]
 

	
 
        ID = ChangesetComment.query().first().comment_id
 
        self.assertEqual(notification.type_,
 
                         Notification.TYPE_CHANGESET_COMMENT)
 
        assert notification.type_ == Notification.TYPE_CHANGESET_COMMENT
 
        sbj = (u'/%s/changeset/'
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s' % (HG_REPO, ID))
 
        print "%s vs %s" % (sbj, notification.subject)
 
        self.assertTrue(sbj in notification.subject)
 
        assert sbj in notification.subject
 

	
 
    def test_create_inline(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'CommentOnRevision'
 
        f_path = 'vcs/web/simplevcs/views/repository.py'
 
@@ -56,90 +55,89 @@ class TestChangeSetCommentsController(Te
 

	
 
        params = {'text': text, 'f_path': f_path, 'line': line, '_authentication_token': self.authentication_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        # Test response...
 
        self.assertEqual(response.status, '200 OK')
 
        assert response.status == '200 OK'
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        #test DB
 
        self.assertEqual(ChangesetComment.query().count(), 1)
 
        assert ChangesetComment.query().count() == 1
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (1 inline, 0 general)'''
 
        )
 
        response.mustcontain(
 
            '''<div class="comments-list-chunk" '''
 
            '''data-f_path="vcs/web/simplevcs/views/repository.py" '''
 
            '''data-line_no="n1" data-target-id="vcswebsimplevcsviewsrepositorypy_n1">'''
 
        )
 

	
 
        self.assertEqual(Notification.query().count(), 1)
 
        self.assertEqual(ChangesetComment.query().count(), 1)
 
        assert Notification.query().count() == 1
 
        assert ChangesetComment.query().count() == 1
 

	
 
        notification = Notification.query().all()[0]
 
        ID = ChangesetComment.query().first().comment_id
 
        self.assertEqual(notification.type_,
 
                         Notification.TYPE_CHANGESET_COMMENT)
 
        assert notification.type_ == Notification.TYPE_CHANGESET_COMMENT
 
        sbj = (u'/%s/changeset/'
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s' % (HG_REPO, ID))
 
        print "%s vs %s" % (sbj, notification.subject)
 
        self.assertTrue(sbj in notification.subject)
 
        assert sbj in notification.subject
 

	
 
    def test_create_with_mention(self):
 
        self.log_user()
 

	
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'@%s check CommentOnRevision' % TEST_USER_REGULAR_LOGIN
 

	
 
        params = {'text': text, '_authentication_token': self.authentication_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        # Test response...
 
        self.assertEqual(response.status, '200 OK')
 
        assert response.status == '200 OK'
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        # test DB
 
        self.assertEqual(ChangesetComment.query().count(), 1)
 
        assert ChangesetComment.query().count() == 1
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (0 inline, 1 general)'''
 
        )
 

	
 
        self.assertEqual(Notification.query().count(), 2)
 
        assert Notification.query().count() == 2
 
        users = [x.user.username for x in UserNotification.query().all()]
 

	
 
        # test_regular gets notification by @mention
 
        self.assertEqual(sorted(users), [TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN])
 
        assert sorted(users) == [TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN]
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'CommentOnRevision'
 

	
 
        params = {'text': text, '_authentication_token': self.authentication_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 

	
 
        comments = ChangesetComment.query().all()
 
        self.assertEqual(len(comments), 1)
 
        assert len(comments) == 1
 
        comment_id = comments[0].comment_id
 

	
 
        self.app.post(url(controller='changeset',
 
                                    action='delete_comment',
 
                                    repo_name=HG_REPO,
 
                                    comment_id=comment_id),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 

	
 
        comments = ChangesetComment.query().all()
 
        self.assertEqual(len(comments), 0)
 
        assert len(comments) == 0
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 0 comments (0 inline, 0 general)'''
kallithea/tests/functional/test_compare.py
Show inline comments
 
@@ -452,21 +452,21 @@ class TestCompareController(TestControll
 
        self.r1_id = repo1.repo_id
 
        r1_name = repo1.repo_name
 

	
 
        cs0 = fixture.commit_change(repo=r1_name, filename='file1',
 
                content='line1', message='commit1', vcs_type='hg', newfile=True)
 
        Session().commit()
 
        self.assertEqual(repo1.scm_instance.revisions, [cs0.raw_id])
 
        assert repo1.scm_instance.revisions == [cs0.raw_id]
 
        #fork the repo1
 
        repo2 = fixture.create_repo(u'one-fork', repo_type='hg',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN,
 
                                    clone_uri=repo1.repo_full_path,
 
                                    fork_of='one')
 
        Session().commit()
 
        self.assertEqual(repo2.scm_instance.revisions, [cs0.raw_id])
 
        assert repo2.scm_instance.revisions == [cs0.raw_id]
 
        self.r2_id = repo2.repo_id
 
        r2_name = repo2.repo_name
 

	
 

	
 
        cs1 = fixture.commit_change(repo=r2_name, filename='file1-fork',
 
                content='file1-line1-from-fork', message='commit1-fork',
 
@@ -532,21 +532,21 @@ class TestCompareController(TestControll
 
        r1_name = repo1.repo_name
 

	
 
        cs0 = fixture.commit_change(repo=r1_name, filename='file1',
 
                content='line1', message='commit1', vcs_type='git',
 
                newfile=True)
 
        Session().commit()
 
        self.assertEqual(repo1.scm_instance.revisions, [cs0.raw_id])
 
        assert repo1.scm_instance.revisions == [cs0.raw_id]
 
        #fork the repo1
 
        repo2 = fixture.create_repo(u'one-git-fork', repo_type='git',
 
                                    repo_description='diff-test',
 
                                    cur_user=TEST_USER_ADMIN_LOGIN,
 
                                    clone_uri=repo1.repo_full_path,
 
                                    fork_of='one-git')
 
        Session().commit()
 
        self.assertEqual(repo2.scm_instance.revisions, [cs0.raw_id])
 
        assert repo2.scm_instance.revisions == [cs0.raw_id]
 
        self.r2_id = repo2.repo_id
 
        r2_name = repo2.repo_name
 

	
 

	
 
        cs1 = fixture.commit_change(repo=r2_name, filename='file1-fork',
 
                content='file1-line1-from-fork', message='commit1-fork',
kallithea/tests/functional/test_files.py
Show inline comments
 
@@ -112,22 +112,22 @@ removed extra unicode conversion in diff
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='history',
 
                                    repo_name=HG_REPO,
 
                                    revision='tip',
 
                                    f_path='vcs/nodes.py'),
 
                                extra_environ={'HTTP_X_PARTIAL_XHR': '1'},)
 
        self.assertEqual(response.body, HG_NODE_HISTORY)
 
        assert response.body == HG_NODE_HISTORY
 

	
 
    def test_file_source_history_git(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='history',
 
                                    repo_name=GIT_REPO,
 
                                    revision='master',
 
                                    f_path='vcs/nodes.py'),
 
                                extra_environ={'HTTP_X_PARTIAL_XHR': '1'},)
 
        self.assertEqual(response.body, GIT_NODE_HISTORY)
 
        assert response.body == GIT_NODE_HISTORY
 

	
 
    def test_file_annotation(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=HG_REPO,
 
                                    revision='tip',
 
@@ -151,24 +151,24 @@ removed extra unicode conversion in diff
 
                                    repo_name=HG_REPO,
 
                                    revision='tip',
 
                                    f_path='vcs/nodes.py',
 
                                    annotate=True),
 
                                extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 

	
 
        self.assertEqual(response.body, HG_NODE_HISTORY)
 
        assert response.body == HG_NODE_HISTORY
 

	
 
    def test_file_annotation_history_git(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='history',
 
                                    repo_name=GIT_REPO,
 
                                    revision='master',
 
                                    f_path='vcs/nodes.py',
 
                                    annotate=True),
 
                                extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 

	
 
        self.assertEqual(response.body, GIT_NODE_HISTORY)
 
        assert response.body == GIT_NODE_HISTORY
 

	
 
    def test_file_authors(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='authors',
 
                                    repo_name=HG_REPO,
 
                                    revision='tip',
 
@@ -196,20 +196,20 @@ removed extra unicode conversion in diff
 
            filename = '%s-%s' % (HG_REPO, short)
 
            response = self.app.get(url(controller='files',
 
                                        action='archivefile',
 
                                        repo_name=HG_REPO,
 
                                        fname=fname))
 

	
 
            self.assertEqual(response.status, '200 OK')
 
            assert response.status == '200 OK'
 
            heads = [
 
                ('Pragma', 'no-cache'),
 
                ('Cache-Control', 'no-cache'),
 
                ('Content-Disposition', 'attachment; filename=%s' % filename),
 
                ('Content-Type', '%s; charset=utf-8' % info[0]),
 
            ]
 
            self.assertEqual(response.response._headers.items(), heads)
 
            assert response.response._headers.items() == heads
 

	
 
    def test_archival_wrong_ext(self):
 
        self.log_user()
 
        _set_downloads(HG_REPO, set_to=True)
 
        for arch_ext in ['tar', 'rar', 'x', '..ax', '.zipz']:
 
            fname = '27cd5cce30c96924232dffcd24178a07ffeb5dfc%s' % arch_ext
 
@@ -239,14 +239,14 @@ removed extra unicode conversion in diff
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='rawfile',
 
                                    repo_name=HG_REPO,
 
                                    revision='27cd5cce30c96924232dffcd24178a07ffeb5dfc',
 
                                    f_path='vcs/nodes.py'))
 

	
 
        self.assertEqual(response.content_disposition, "attachment; filename=nodes.py")
 
        self.assertEqual(response.content_type, mimetypes.guess_type("nodes.py")[0])
 
        assert response.content_disposition == "attachment; filename=nodes.py"
 
        assert response.content_type == mimetypes.guess_type("nodes.py")[0]
 

	
 
    def test_raw_file_wrong_cs(self):
 
        self.log_user()
 
        rev = u'ERRORce30c96924232dffcd24178a07ffeb5dfc'
 
        f_path = 'vcs/nodes.py'
 

	
 
@@ -277,13 +277,13 @@ removed extra unicode conversion in diff
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='raw',
 
                                    repo_name=HG_REPO,
 
                                    revision='27cd5cce30c96924232dffcd24178a07ffeb5dfc',
 
                                    f_path='vcs/nodes.py'))
 

	
 
        self.assertEqual(response.content_type, "text/plain")
 
        assert response.content_type == "text/plain"
 

	
 
    def test_raw_wrong_cs(self):
 
        self.log_user()
 
        rev = u'ERRORcce30c96924232dffcd24178a07ffeb5dfc'
 
        f_path = 'vcs/nodes.py'
 

	
kallithea/tests/functional/test_forks.py
Show inline comments
 
@@ -118,14 +118,14 @@ class _BaseTestCase(TestControllerPytest
 
                % (repo_name, fork_name_full, fork_name_full))
 

	
 
        #test if the fork was created in the database
 
        fork_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == fork_name_full).one()
 

	
 
        self.assertEqual(fork_repo.repo_name, fork_name_full)
 
        self.assertEqual(fork_repo.fork.repo_name, repo_name)
 
        assert fork_repo.repo_name == fork_name_full
 
        assert fork_repo.fork.repo_name == repo_name
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=fork_name_full))
 
        response.mustcontain(fork_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 
        response.mustcontain('Fork of "<a href="/%s">%s</a>"' % (repo_name, repo_name))
 
@@ -161,14 +161,14 @@ class _BaseTestCase(TestControllerPytest
 
                % (repo_name, fork_name, fork_name))
 

	
 
        #test if the fork was created in the database
 
        fork_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == fork_name).one()
 

	
 
        self.assertEqual(fork_repo.repo_name, fork_name)
 
        self.assertEqual(fork_repo.fork.repo_name, repo_name)
 
        assert fork_repo.repo_name == fork_name
 
        assert fork_repo.fork.repo_name == repo_name
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=fork_name))
 
        response.mustcontain(fork_name)
 
        response.mustcontain(self.REPO_TYPE)
 
        response.mustcontain('Fork of "<a href="/%s">%s</a>"' % (repo_name, repo_name))
 
@@ -177,13 +177,13 @@ class _BaseTestCase(TestControllerPytest
 
        usr = self.log_user(self.username, self.password)['user_id']
 
        repo_name = self.REPO
 

	
 
        forks = Repository.query() \
 
            .filter(Repository.repo_type == self.REPO_TYPE) \
 
            .filter(Repository.fork_id != None).all()
 
        self.assertEqual(1, len(forks))
 
        assert 1 == len(forks)
 

	
 
        # set read permissions for this
 
        RepoModel().grant_user_permission(repo=forks[0],
 
                                          user=usr,
 
                                          perm='repository.read')
 
        Session().commit()
 
@@ -197,13 +197,13 @@ class _BaseTestCase(TestControllerPytest
 
        usr = self.log_user(self.username, self.password)['user_id']
 
        repo_name = self.REPO
 

	
 
        forks = Repository.query() \
 
            .filter(Repository.repo_type == self.REPO_TYPE) \
 
            .filter(Repository.fork_id != None).all()
 
        self.assertEqual(1, len(forks))
 
        assert 1 == len(forks)
 

	
 
        # set none
 
        RepoModel().grant_user_permission(repo=forks[0],
 
                                          user=usr, perm='repository.none')
 
        Session().commit()
 
        # fork shouldn't be there
kallithea/tests/functional/test_login.py
Show inline comments
 
@@ -19,100 +19,98 @@ from kallithea.model.user import UserMod
 
fixture = Fixture()
 

	
 

	
 
class TestLoginController(TestControllerPytest):
 
    def setup_method(self, method):
 
        remove_all_notifications()
 
        self.assertEqual(Notification.query().all(), [])
 
        assert Notification.query().all() == []
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='login', action='index'))
 
        self.assertEqual(response.status, '200 OK')
 
        assert response.status == '200 OK'
 
        # Test response...
 

	
 
    def test_login_admin_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': TEST_USER_ADMIN_PASS})
 
        self.assertEqual(response.status, '302 Found')
 
        assert response.status == '302 Found'
 
        self.assert_authenticated_user(response, TEST_USER_ADMIN_LOGIN)
 

	
 
        response = response.follow()
 
        response.mustcontain('/%s' % HG_REPO)
 

	
 
    def test_login_regular_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS})
 

	
 
        self.assertEqual(response.status, '302 Found')
 
        assert response.status == '302 Found'
 
        self.assert_authenticated_user(response, TEST_USER_REGULAR_LOGIN)
 

	
 
        response = response.follow()
 
        response.mustcontain('/%s' % HG_REPO)
 

	
 
    def test_login_regular_email_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_EMAIL,
 
                                  'password': TEST_USER_REGULAR_PASS})
 

	
 
        self.assertEqual(response.status, '302 Found')
 
        assert response.status == '302 Found'
 
        self.assert_authenticated_user(response, TEST_USER_REGULAR_LOGIN)
 

	
 
        response = response.follow()
 
        response.mustcontain('/%s' % HG_REPO)
 

	
 
    def test_login_ok_came_from(self):
 
        test_came_from = '/_admin/users'
 
        response = self.app.post(url(controller='login', action='index',
 
                                     came_from=test_came_from),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': TEST_USER_ADMIN_PASS})
 
        self.assertEqual(response.status, '302 Found')
 
        assert response.status == '302 Found'
 
        response = response.follow()
 

	
 
        self.assertEqual(response.status, '200 OK')
 
        assert response.status == '200 OK'
 
        response.mustcontain('Users Administration')
 

	
 
    def test_login_do_not_remember(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS,
 
                                  'remember': False})
 

	
 
        self.assertIn('Set-Cookie', response.headers)
 
        assert 'Set-Cookie' in response.headers
 
        for cookie in response.headers.getall('Set-Cookie'):
 
            self.assertFalse(re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE),
 
                'Cookie %r has expiration date, but should be a session cookie' % cookie)
 
            assert not re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE), 'Cookie %r has expiration date, but should be a session cookie' % cookie
 

	
 
    def test_login_remember(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS,
 
                                  'remember': True})
 

	
 
        self.assertIn('Set-Cookie', response.headers)
 
        assert 'Set-Cookie' in response.headers
 
        for cookie in response.headers.getall('Set-Cookie'):
 
            self.assertTrue(re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE),
 
                'Cookie %r should have expiration date, but is a session cookie' % cookie)
 
            assert re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE), 'Cookie %r should have expiration date, but is a session cookie' % cookie
 

	
 
    def test_logout(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS})
 

	
 
        # Verify that a login session has been established.
 
        response = self.app.get(url(controller='login', action='index'))
 
        response = response.follow()
 
        self.assertIn('authuser', response.session)
 
        assert 'authuser' in response.session
 

	
 
        response.click('Log Out')
 

	
 
        # Verify that the login session has been terminated.
 
        response = self.app.get(url(controller='login', action='index'))
 
        self.assertNotIn('authuser', response.session)
 
        assert 'authuser' not in response.session
 

	
 
    @parametrize('url_came_from', [
 
          ('data:text/html,<script>window.alert("xss")</script>',),
 
          ('mailto:test@example.com',),
 
          ('file:///etc/passwd',),
 
          ('ftp://ftp.example.com',),
 
@@ -130,13 +128,13 @@ class TestLoginController(TestController
 
                                 status=400)
 

	
 
    def test_login_short_password(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': 'as'})
 
        self.assertEqual(response.status, '200 OK')
 
        assert response.status == '200 OK'
 

	
 
        response.mustcontain('Enter 3 characters or more')
 

	
 
    def test_login_wrong_username_password(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': 'error',
 
@@ -153,43 +151,43 @@ class TestLoginController(TestController
 
    ])
 
    def test_redirection_to_login_form_preserves_get_args(self, args, args_encoded):
 
        with fixture.anon_access(False):
 
            response = self.app.get(url(controller='summary', action='index',
 
                                        repo_name=HG_REPO,
 
                                        **args))
 
            self.assertEqual(response.status, '302 Found')
 
            assert response.status == '302 Found'
 
            came_from = urlparse.parse_qs(urlparse.urlparse(response.location).query)['came_from'][0]
 
            came_from_qs = urlparse.parse_qsl(urlparse.urlparse(came_from).query)
 
            for encoded in args_encoded:
 
                self.assertIn(encoded, came_from_qs)
 
                assert encoded in came_from_qs
 

	
 
    @parametrize('args,args_encoded', [
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå', 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_login_form_preserves_get_args(self, args, args_encoded):
 
        response = self.app.get(url(controller='login', action='index',
 
                                    came_from=url('/_admin/users', **args)))
 
        came_from = urlparse.parse_qs(urlparse.urlparse(response.form.action).query)['came_from'][0]
 
        for encoded in args_encoded:
 
            self.assertIn(encoded, came_from)
 
            assert encoded in came_from
 

	
 
    @parametrize('args,args_encoded', [
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå', 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_redirection_after_successful_login_preserves_get_args(self, args, args_encoded):
 
        response = self.app.post(url(controller='login', action='index',
 
                                     came_from = url('/_admin/users', **args)),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': TEST_USER_ADMIN_PASS})
 
        self.assertEqual(response.status, '302 Found')
 
        assert response.status == '302 Found'
 
        for encoded in args_encoded:
 
            self.assertIn(encoded, response.location)
 
            assert encoded in response.location
 

	
 
    @parametrize('args,args_encoded', [
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå', 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
@@ -199,13 +197,13 @@ class TestLoginController(TestController
 
                                 {'username': 'error',
 
                                  'password': 'test12'})
 

	
 
        response.mustcontain('Invalid username or password')
 
        came_from = urlparse.parse_qs(urlparse.urlparse(response.form.action).query)['came_from'][0]
 
        for encoded in args_encoded:
 
            self.assertIn(encoded, came_from)
 
            assert encoded in came_from
 

	
 
    #==========================================================================
 
    # REGISTRATIONS
 
    #==========================================================================
 
    def test_register(self):
 
        response = self.app.get(url(controller='login', action='register'))
 
@@ -253,13 +251,13 @@ class TestLoginController(TestController
 
                                            {'username': 'xs',
 
                                             'password': 'test',
 
                                             'password_confirmation': 'test',
 
                                             'email': 'goodmailm',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
 
        self.assertEqual(response.status, '200 OK')
 
        assert response.status == '200 OK'
 
        response.mustcontain('An email address must contain a single @')
 
        response.mustcontain('Enter a value 6 characters long or more')
 

	
 
    def test_register_err_username(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': 'error user',
 
@@ -325,23 +323,23 @@ class TestLoginController(TestController
 
                                             'password': password,
 
                                             'password_confirmation': password,
 
                                             'email': email,
 
                                             'firstname': name,
 
                                             'lastname': lastname,
 
                                             'admin': True})  # This should be overridden
 
        self.assertEqual(response.status, '302 Found')
 
        assert response.status == '302 Found'
 
        self.checkSessionFlash(response, 'You have successfully registered into Kallithea')
 

	
 
        ret = Session().query(User).filter(User.username == 'test_regular4').one()
 
        self.assertEqual(ret.username, username)
 
        self.assertEqual(check_password(password, ret.password), True)
 
        self.assertEqual(ret.email, email)
 
        self.assertEqual(ret.name, name)
 
        self.assertEqual(ret.lastname, lastname)
 
        self.assertNotEqual(ret.api_key, None)
 
        self.assertEqual(ret.admin, False)
 
        assert ret.username == username
 
        assert check_password(password, ret.password) == True
 
        assert ret.email == email
 
        assert ret.name == name
 
        assert ret.lastname == lastname
 
        assert ret.api_key != None
 
        assert ret.admin == False
 

	
 
    #==========================================================================
 
    # PASSWORD RESET
 
    #==========================================================================
 

	
 
    def test_forgot_password_wrong_mail(self):
 
@@ -353,13 +351,13 @@ class TestLoginController(TestController
 

	
 
        response.mustcontain('An email address must contain a single @')
 

	
 
    def test_forgot_password(self):
 
        response = self.app.get(url(controller='login',
 
                                    action='password_reset'))
 
        self.assertEqual(response.status, '200 OK')
 
        assert response.status == '200 OK'
 

	
 
        username = 'test_password_reset_1'
 
        password = 'qweqwe'
 
        email = 'username@example.com'
 
        name = u'passwd'
 
        lastname = u'reset'
 
@@ -392,13 +390,13 @@ class TestLoginController(TestController
 
                                 {'email': email,
 
                                  'timestamp': timestamp,
 
                                  'password': "p@ssw0rd",
 
                                  'password_confirm': "p@ssw0rd",
 
                                  'token': token,
 
                                 })
 
        self.assertEqual(response.status, '200 OK')
 
        assert response.status == '200 OK'
 
        response.mustcontain('Invalid password reset token')
 

	
 
        # GOOD TOKEN
 

	
 
        # TODO: The token should ideally be taken from the mail sent
 
        # above, instead of being recalculated.
 
@@ -408,24 +406,24 @@ class TestLoginController(TestController
 

	
 
        response = self.app.get(url(controller='login',
 
                                    action='password_reset_confirmation',
 
                                    email=email,
 
                                    timestamp=timestamp,
 
                                    token=token))
 
        self.assertEqual(response.status, '200 OK')
 
        assert response.status == '200 OK'
 
        response.mustcontain("You are about to set a new password for the email address %s" % email)
 

	
 
        response = self.app.post(url(controller='login',
 
                                     action='password_reset_confirmation'),
 
                                 {'email': email,
 
                                  'timestamp': timestamp,
 
                                  'password': "p@ssw0rd",
 
                                  'password_confirm': "p@ssw0rd",
 
                                  'token': token,
 
                                 })
 
        self.assertEqual(response.status, '302 Found')
 
        assert response.status == '302 Found'
 
        self.checkSessionFlash(response, 'Successfully updated password')
 

	
 
        response = response.follow()
 

	
 
    #==========================================================================
 
    # API
 
@@ -441,14 +439,13 @@ class TestLoginController(TestController
 
        ('fake_number', '123456'),
 
        ('proper_api_key', None)
 
    ])
 
    def test_access_not_whitelisted_page_via_api_key(self, test_name, api_key):
 
        whitelist = self._get_api_whitelist([])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            self.assertEqual([],
 
                             whitelist['api_access_controllers_whitelist'])
 
            assert [] == whitelist['api_access_controllers_whitelist']
 
            if test_name == 'proper_api_key':
 
                #use builtin if api_key is None
 
                api_key = User.get_first_admin().api_key
 

	
 
            with fixture.anon_access(False):
 
                self.app.get(url(controller='changeset',
 
@@ -464,42 +461,39 @@ class TestLoginController(TestController
 
        ('fake_api_key', '0123456789abcdef0123456789ABCDEF01234567', 302),
 
        ('proper_api_key', None, 200)
 
    ])
 
    def test_access_whitelisted_page_via_api_key(self, test_name, api_key, code):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            self.assertEqual(['ChangesetController:changeset_raw'],
 
                             whitelist['api_access_controllers_whitelist'])
 
            assert ['ChangesetController:changeset_raw'] == whitelist['api_access_controllers_whitelist']
 
            if test_name == 'proper_api_key':
 
                api_key = User.get_first_admin().api_key
 

	
 
            with fixture.anon_access(False):
 
                self.app.get(url(controller='changeset',
 
                                 action='changeset_raw',
 
                                 repo_name=HG_REPO, revision='tip', api_key=api_key),
 
                             status=code)
 

	
 
    def test_access_page_via_extra_api_key(self):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            self.assertEqual(['ChangesetController:changeset_raw'],
 
                             whitelist['api_access_controllers_whitelist'])
 
            assert ['ChangesetController:changeset_raw'] == whitelist['api_access_controllers_whitelist']
 

	
 
            new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
 
            Session().commit()
 
            with fixture.anon_access(False):
 
                self.app.get(url(controller='changeset',
 
                                 action='changeset_raw',
 
                                 repo_name=HG_REPO, revision='tip', api_key=new_api_key.api_key),
 
                             status=200)
 

	
 
    def test_access_page_via_expired_api_key(self):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            self.assertEqual(['ChangesetController:changeset_raw'],
 
                             whitelist['api_access_controllers_whitelist'])
 
            assert ['ChangesetController:changeset_raw'] == whitelist['api_access_controllers_whitelist']
 

	
 
            new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
 
            Session().commit()
 
            #patch the API key and make it expired
 
            new_api_key.expires = 0
 
            Session().add(new_api_key)
kallithea/tests/functional/test_my_account.py
Show inline comments
 
@@ -142,13 +142,13 @@ class TestMyAccountController(TestContro
 
            params['active'] = True
 
        if name == 'admin':
 
            #my account cannot make you an admin !
 
            params['admin'] = False
 

	
 
        params.pop('_authentication_token')
 
        self.assertEqual(params, updated_params)
 
        assert params == updated_params
 

	
 
    def test_my_account_update_err_email_exists(self):
 
        self.log_user()
 

	
 
        new_email = TEST_USER_REGULAR_EMAIL  # already existing email
 
        response = self.app.post(url('my_account'),
 
@@ -220,19 +220,19 @@ class TestMyAccountController(TestContro
 
                                 {'description': 'desc', 'lifetime': -1, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        response = response.follow()
 

	
 
        #now delete our key
 
        keys = UserApiKeys.query().all()
 
        self.assertEqual(1, len(keys))
 
        assert 1 == len(keys)
 

	
 
        response = self.app.post(url('my_account_api_keys'),
 
                 {'_method': 'delete', 'del_api_key': keys[0].api_key, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully deleted')
 
        keys = UserApiKeys.query().all()
 
        self.assertEqual(0, len(keys))
 
        assert 0 == len(keys)
 

	
 

	
 
    def test_my_account_reset_main_api_key(self):
 
        usr = self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        user = User.get(usr['user_id'])
 
        api_key = user.api_key
kallithea/tests/functional/test_pullrequests.py
Show inline comments
 
@@ -25,15 +25,15 @@ class TestPullrequestsController(TestCon
 
                                  'other_ref': 'branch:default:default',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_authentication_token': self.authentication_token(),
 
                                 }
 
                                )
 
        self.assertEqual(response.status, '302 Found')
 
        assert response.status == '302 Found'
 
        response = response.follow()
 
        self.assertEqual(response.status, '200 OK')
 
        assert response.status == '200 OK'
 
        response.mustcontain('This pull request has already been merged to default.')
 

	
 
    def test_create_with_existing_reviewer(self):
 
        self.log_user()
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
@@ -44,15 +44,15 @@ class TestPullrequestsController(TestCon
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_authentication_token': self.authentication_token(),
 
                                  'review_members': TEST_USER_ADMIN_LOGIN,
 
                                 }
 
                                )
 
        self.assertEqual(response.status, '302 Found')
 
        assert response.status == '302 Found'
 
        response = response.follow()
 
        self.assertEqual(response.status, '200 OK')
 
        assert response.status == '200 OK'
 
        response.mustcontain('This pull request has already been merged to default.')
 

	
 
    def test_create_with_invalid_reviewer(self):
 
        invalid_user_name = 'invalid_user'
 
        self.log_user()
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
@@ -83,17 +83,17 @@ class TestPullrequestsController(TestCon
 
                                  'other_ref': 'branch:default:default',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_authentication_token': self.authentication_token(),
 
                                 }
 
                                )
 
        self.assertEqual(response.status, '302 Found')
 
        assert response.status == '302 Found'
 
        # location is of the form:
 
        # http://localhost/vcs_test_hg/pull-request/54/_/title
 
        m = re.search('/pull-request/(\d+)/', response.location)
 
        self.assertNotEqual(m, None)
 
        assert m != None
 
        pull_request_id = m.group(1)
 

	
 
        # update it
 
        response = self.app.post(url(controller='pullrequests', action='post',
 
                                     repo_name=HG_REPO, pull_request_id=pull_request_id),
 
                                 {
 
@@ -120,17 +120,17 @@ class TestPullrequestsController(TestCon
 
                                  'other_ref': 'branch:default:default',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_authentication_token': self.authentication_token(),
 
                                 }
 
                                )
 
        self.assertEqual(response.status, '302 Found')
 
        assert response.status == '302 Found'
 
        # location is of the form:
 
        # http://localhost/vcs_test_hg/pull-request/54/_/title
 
        m = re.search('/pull-request/(\d+)/', response.location)
 
        self.assertNotEqual(m, None)
 
        assert m != None
 
        pull_request_id = m.group(1)
 

	
 
        # edit it
 
        response = self.app.post(url(controller='pullrequests', action='post',
 
                                     repo_name=HG_REPO, pull_request_id=pull_request_id),
 
                                 {
 
@@ -156,78 +156,78 @@ class TestPullrequestsGetRepoRefs(TestCo
 
        Session.commit()
 
        Session.remove()
 

	
 
    def test_repo_refs_empty_repo(self):
 
        # empty repo with no commits, no branches, no bookmarks, just one tag
 
        refs, default = self.c._get_repo_refs(self.main.scm_instance)
 
        self.assertEqual(default, 'tag:null:0000000000000000000000000000000000000000')
 
        assert default == 'tag:null:0000000000000000000000000000000000000000'
 

	
 
    def test_repo_refs_one_commit_no_hints(self):
 
        cs0 = fixture.commit_change(self.main.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.main.scm_instance)
 
        self.assertEqual(default, 'branch:default:%s' % cs0.raw_id)
 
        self.assertIn(([('branch:default:%s' % cs0.raw_id, 'default (current tip)')],
 
                'Branches'), refs)
 
        assert default == 'branch:default:%s' % cs0.raw_id
 
        assert ([('branch:default:%s' % cs0.raw_id, 'default (current tip)')],
 
                'Branches') in refs
 

	
 
    def test_repo_refs_one_commit_rev_hint(self):
 
        cs0 = fixture.commit_change(self.main.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.main.scm_instance, rev=cs0.raw_id)
 
        expected = 'branch:default:%s' % cs0.raw_id
 
        self.assertEqual(default, expected)
 
        self.assertIn(([(expected, 'default (current tip)')], 'Branches'), refs)
 
        assert default == expected
 
        assert ([(expected, 'default (current tip)')], 'Branches') in refs
 

	
 
    def test_repo_refs_two_commits_no_hints(self):
 
        cs0 = fixture.commit_change(self.main.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
        cs1 = fixture.commit_change(self.main.repo_name, filename='file2',
 
                content='line2\n', message='commit2', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.main.scm_instance)
 
        expected = 'branch:default:%s' % cs1.raw_id
 
        self.assertEqual(default, expected)
 
        self.assertIn(([(expected, 'default (current tip)')], 'Branches'), refs)
 
        assert default == expected
 
        assert ([(expected, 'default (current tip)')], 'Branches') in refs
 

	
 
    def test_repo_refs_two_commits_rev_hints(self):
 
        cs0 = fixture.commit_change(self.main.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
        cs1 = fixture.commit_change(self.main.repo_name, filename='file2',
 
                content='line2\n', message='commit2', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.main.scm_instance, rev=cs0.raw_id)
 
        expected = 'rev:%s:%s' % (cs0.raw_id, cs0.raw_id)
 
        self.assertEqual(default, expected)
 
        self.assertIn(([(expected, 'Changeset: %s' % cs0.raw_id[0:12])], 'Special'), refs)
 
        self.assertIn(([('branch:default:%s' % cs1.raw_id, 'default (current tip)')], 'Branches'), refs)
 
        assert default == expected
 
        assert ([(expected, 'Changeset: %s' % cs0.raw_id[0:12])], 'Special') in refs
 
        assert ([('branch:default:%s' % cs1.raw_id, 'default (current tip)')], 'Branches') in refs
 

	
 
        refs, default = self.c._get_repo_refs(self.main.scm_instance, rev=cs1.raw_id)
 
        expected = 'branch:default:%s' % cs1.raw_id
 
        self.assertEqual(default, expected)
 
        self.assertIn(([(expected, 'default (current tip)')], 'Branches'), refs)
 
        assert default == expected
 
        assert ([(expected, 'default (current tip)')], 'Branches') in refs
 

	
 
    def test_repo_refs_two_commits_branch_hint(self):
 
        cs0 = fixture.commit_change(self.main.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
        cs1 = fixture.commit_change(self.main.repo_name, filename='file2',
 
                content='line2\n', message='commit2', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.main.scm_instance, branch='default')
 
        expected = 'branch:default:%s' % cs1.raw_id
 
        self.assertEqual(default, expected)
 
        self.assertIn(([(expected, 'default (current tip)')], 'Branches'), refs)
 
        assert default == expected
 
        assert ([(expected, 'default (current tip)')], 'Branches') in refs
 

	
 
    def test_repo_refs_one_branch_no_hints(self):
 
        cs0 = fixture.commit_change(self.main.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
        # TODO
0 comments (0 inline, 0 general)