Changeset - 5adc4ad9ce77
[Not reviewed]
kallithea/tests/functional/test_admin_auth_settings.py
Show inline comments
 
from kallithea.tests import *
 
from kallithea.model.db import Setting
 

	
 

	
 
class TestAuthSettingsController(TestController):
 
class TestAuthSettingsController(TestControllerPytest):
 
    def _enable_plugins(self, plugins_list):
 
        test_url = url(controller='admin/auth_settings',
 
                       action='auth_settings')
 
        params={'auth_plugins': plugins_list, '_authentication_token': self.authentication_token()}
 

	
 
        for plugin in plugins_list.split(','):
 
            enable = plugin.partition('kallithea.lib.auth_modules.')[-1]
 
            params.update({'%s_enabled' % enable: True})
 
        response = self.app.post(url=test_url, params=params)
 
        return params
 
        #self.checkSessionFlash(response, 'Auth settings updated successfully')
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/auth_settings',
 
                                    action='index'))
 
        response.mustcontain('Authentication Plugins')
 

	
 
    @skipif(not ldap_lib_installed, reason='skipping due to missing ldap lib')
 
    def test_ldap_save_settings(self):
 
        self.log_user()
 

	
 
        params = self._enable_plugins('kallithea.lib.auth_modules.auth_internal,kallithea.lib.auth_modules.auth_ldap')
 
        params.update({'auth_ldap_host': u'dc.example.com',
 
                       'auth_ldap_port': '999',
 
                       'auth_ldap_tls_kind': 'PLAIN',
 
                       'auth_ldap_tls_reqcert': 'NEVER',
 
                       'auth_ldap_dn_user': 'test_user',
 
                       'auth_ldap_dn_pass': 'test_pass',
 
                       'auth_ldap_base_dn': 'test_base_dn',
 
                       'auth_ldap_filter': 'test_filter',
 
                       'auth_ldap_search_scope': 'BASE',
 
                       'auth_ldap_attr_login': 'test_attr_login',
 
                       'auth_ldap_attr_firstname': 'ima',
 
                       'auth_ldap_attr_lastname': 'tester',
 
                       'auth_ldap_attr_email': 'test@example.com'})
 

	
 
        test_url = url(controller='admin/auth_settings',
 
                       action='auth_settings')
 

	
 
        response = self.app.post(url=test_url, params=params)
 
        self.checkSessionFlash(response, 'Auth settings updated successfully')
 

	
 
        new_settings = Setting.get_auth_settings()
 
        self.assertEqual(new_settings['auth_ldap_host'], u'dc.example.com',
 
                         'fail db write compare')
 

	
 
    @skipif(not ldap_lib_installed, reason='skipping due to missing ldap lib')
 
    def test_ldap_error_form_wrong_port_number(self):
 
        self.log_user()
 

	
 
        params = self._enable_plugins('kallithea.lib.auth_modules.auth_internal,kallithea.lib.auth_modules.auth_ldap')
 
        params.update({'auth_ldap_host': '',
 
                       'auth_ldap_port': 'i-should-be-number',  # bad port num
 
                       'auth_ldap_tls_kind': 'PLAIN',
 
                       'auth_ldap_tls_reqcert': 'NEVER',
 
                       'auth_ldap_dn_user': '',
 
                       'auth_ldap_dn_pass': '',
 
                       'auth_ldap_base_dn': '',
 
                       'auth_ldap_filter': '',
 
                       'auth_ldap_search_scope': 'BASE',
 
                       'auth_ldap_attr_login': '',
 
                       'auth_ldap_attr_firstname': '',
 
                       'auth_ldap_attr_lastname': '',
 
                       'auth_ldap_attr_email': ''})
 
        test_url = url(controller='admin/auth_settings',
 
                       action='auth_settings')
 

	
 
        response = self.app.post(url=test_url, params=params)
 

	
 
        response.mustcontain("""<span class="error-message">"""
 
                             """Please enter a number</span>""")
 

	
 
    @skipif(not ldap_lib_installed, reason='skipping due to missing ldap lib')
 
    def test_ldap_error_form(self):
 
        self.log_user()
 

	
 
        params = self._enable_plugins('kallithea.lib.auth_modules.auth_internal,kallithea.lib.auth_modules.auth_ldap')
 
        params.update({'auth_ldap_host': 'Host',
 
                       'auth_ldap_port': '123',
 
                       'auth_ldap_tls_kind': 'PLAIN',
 
                       'auth_ldap_tls_reqcert': 'NEVER',
 
                       'auth_ldap_dn_user': '',
 
                       'auth_ldap_dn_pass': '',
 
                       'auth_ldap_base_dn': '',
 
                       'auth_ldap_filter': '',
 
                       'auth_ldap_search_scope': 'BASE',
 
                       'auth_ldap_attr_login': '',  # <----- missing required input
 
                       'auth_ldap_attr_firstname': '',
 
                       'auth_ldap_attr_lastname': '',
 
                       'auth_ldap_attr_email': ''})
 

	
 
        test_url = url(controller='admin/auth_settings',
 
                       action='auth_settings')
 

	
 
        response = self.app.post(url=test_url, params=params)
 

	
 
        response.mustcontain("""<span class="error-message">The LDAP Login"""
 
                             """ attribute of the CN must be specified""")
 

	
 
    def test_ldap_login(self):
 
        pass
 

	
 
    def test_ldap_login_incorrect(self):
 
        pass
 

	
 
    def _container_auth_setup(self, **settings):
 
        self.log_user()
 

	
 
        params = self._enable_plugins('kallithea.lib.auth_modules.auth_internal,kallithea.lib.auth_modules.auth_container')
 
        params.update(settings)
 

	
 
        test_url = url(controller='admin/auth_settings',
 
                       action='auth_settings')
 

	
 
        response = self.app.post(url=test_url, params=params)
 
        response = response.follow()
 
        response.click('Log Out') # end admin login session
 

	
 
    def _container_auth_verify_login(self, resulting_username, **get_kwargs):
 
        response = self.app.get(
 
            url=url(controller='admin/my_account', action='my_account'),
 
            **get_kwargs
 
        )
 
        response.mustcontain('My Account %s' % resulting_username)
 

	
 
    def test_container_auth_login_header(self):
 
        self._container_auth_setup(
 
            auth_container_header='THE_USER_NAME',
 
            auth_container_email_header='',
 
            auth_container_firstname_header='',
 
            auth_container_lastname_header='',
 
            auth_container_fallback_header='',
 
            auth_container_clean_username='False',
 
        )
 
        self._container_auth_verify_login(
 
            extra_environ={'THE_USER_NAME': 'john@example.org'},
 
            resulting_username='john@example.org',
 
        )
 

	
 
    def test_container_auth_login_header_attr(self):
 
        self._container_auth_setup(
 
            auth_container_header='THE_USER_NAME',
 
            auth_container_email_header='THE_USER_EMAIL',
 
            auth_container_firstname_header='THE_USER_FIRSTNAME',
 
            auth_container_lastname_header='THE_USER_LASTNAME',
 
            auth_container_fallback_header='',
 
            auth_container_clean_username='False',
 
        )
 
        response = self.app.get(
 
            url=url(controller='admin/my_account', action='my_account'),
 
            extra_environ={'THE_USER_NAME': 'johnd',
 
                           'THE_USER_EMAIL': 'john@example.org',
 
                           'THE_USER_FIRSTNAME': 'John',
 
                           'THE_USER_LASTNAME': 'Doe',
 
                           }
 
        )
 
        self.assertEqual(response.form['email'].value, 'john@example.org')
 
        self.assertEqual(response.form['firstname'].value, 'John')
 
        self.assertEqual(response.form['lastname'].value, 'Doe')
 

	
 

	
 
    def test_container_auth_login_fallback_header(self):
 
        self._container_auth_setup(
 
            auth_container_header='THE_USER_NAME',
 
            auth_container_email_header='',
 
            auth_container_firstname_header='',
 
            auth_container_lastname_header='',
 
            auth_container_fallback_header='HTTP_X_YZZY',
 
            auth_container_clean_username='False',
 
        )
 
        self._container_auth_verify_login(
 
            headers={'X-Yzzy': r'foo\bar'},
 
            resulting_username=r'foo\bar',
 
        )
 

	
 
    def test_container_auth_clean_username_at(self):
 
        self._container_auth_setup(
 
            auth_container_header='REMOTE_USER',
 
            auth_container_email_header='',
 
            auth_container_firstname_header='',
 
            auth_container_lastname_header='',
 
            auth_container_fallback_header='',
 
            auth_container_clean_username='True',
 
        )
 
        self._container_auth_verify_login(
 
            extra_environ={'REMOTE_USER': 'john@example.org'},
 
            resulting_username='john',
 
        )
 

	
 
    def test_container_auth_clean_username_backslash(self):
 
        self._container_auth_setup(
kallithea/tests/functional/test_admin_defaults.py
Show inline comments
 
from kallithea.tests import *
 
from kallithea.model.db import Setting
 

	
 

	
 
class TestDefaultsController(TestController):
 
class TestDefaultsController(TestControllerPytest):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('defaults'))
 
        response.mustcontain('default_repo_private')
 
        response.mustcontain('default_repo_enable_statistics')
 
        response.mustcontain('default_repo_enable_downloads')
 
        response.mustcontain('default_repo_enable_locking')
 

	
 
    def test_index_as_xml(self):
 
        response = self.app.get(url('formatted_defaults', format='xml'))
 

	
 
    def test_create(self):
 
        response = self.app.post(url('defaults'),
 
            {'_authentication_token': self.authentication_token()})
 

	
 
    def test_new(self):
 
        response = self.app.get(url('new_default'))
 

	
 
    def test_new_as_xml(self):
 
        response = self.app.get(url('formatted_new_default', format='xml'))
 

	
 
    def test_update_params_true_hg(self):
 
        self.log_user()
 
        params = {
 
            'default_repo_enable_locking': True,
 
            'default_repo_enable_downloads': True,
 
            'default_repo_enable_statistics': True,
 
            'default_repo_private': True,
 
            'default_repo_type': 'hg',
 
            '_authentication_token': self.authentication_token(),
 
        }
 
        response = self.app.put(url('default', id='default'), params=params)
 
        self.checkSessionFlash(response, 'Default settings updated successfully')
 

	
 
        params.pop('_authentication_token')
 
        defs = Setting.get_default_repo_settings()
 
        self.assertEqual(params, defs)
 

	
 
    def test_update_params_false_git(self):
 
        self.log_user()
 
        params = {
 
            'default_repo_enable_locking': False,
 
            'default_repo_enable_downloads': False,
 
            'default_repo_enable_statistics': False,
 
            'default_repo_private': False,
 
            'default_repo_type': 'git',
 
            '_authentication_token': self.authentication_token(),
 
        }
 
        response = self.app.put(url('default', id='default'), params=params)
 
        self.checkSessionFlash(response, 'Default settings updated successfully')
 

	
 
        params.pop('_authentication_token')
 
        defs = Setting.get_default_repo_settings()
 
        self.assertEqual(params, defs)
 

	
 
    def test_update_browser_fakeout(self):
 
        response = self.app.post(url('default', id=1), params=dict(_method='put', _authentication_token=self.authentication_token()))
 

	
 
    def test_delete(self):
 
        # Not possible due to CSRF protection.
 
        response = self.app.delete(url('default', id=1), status=405)
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('default', id=1), params=dict(_method='delete', _authentication_token=self.authentication_token()))
 

	
 
    def test_show(self):
 
        response = self.app.get(url('default', id=1))
 

	
 
    def test_show_as_xml(self):
 
        response = self.app.get(url('formatted_default', id=1, format='xml'))
 

	
 
    def test_edit(self):
 
        response = self.app.get(url('edit_default', id=1))
 

	
 
    def test_edit_as_xml(self):
 
        response = self.app.get(url('formatted_edit_default', id=1, format='xml'))
kallithea/tests/functional/test_admin_permissions.py
Show inline comments
 
from kallithea.model.db import User, UserIpMap
 
from kallithea.tests import *
 

	
 
class TestAdminPermissionsController(TestController):
 
class TestAdminPermissionsController(TestControllerPytest):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_permissions'))
 
        # Test response...
 

	
 
    def test_index_ips(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_permissions_ips'))
 
        # Test response...
 
        response.mustcontain('All IP addresses are allowed')
 

	
 
    def test_add_ips(self):
 
        self.log_user()
 
        default_user_id = User.get_default_user().user_id
 
        response = self.app.put(url('edit_user_ips', id=default_user_id),
 
                                 params=dict(new_ip='127.0.0.0/24',
 
                                 _authentication_token=self.authentication_token()))
 

	
 
        response = self.app.get(url('admin_permissions_ips'))
 
        response.mustcontain('127.0.0.0/24')
 
        response.mustcontain('127.0.0.0 - 127.0.0.255')
 

	
 
        ## delete
 
        default_user_id = User.get_default_user().user_id
 
        del_ip_id = UserIpMap.query().filter(UserIpMap.user_id ==
 
                                             default_user_id).first().ip_id
 

	
 
        response = self.app.post(url('edit_user_ips', id=default_user_id),
 
                                 params=dict(_method='delete',
 
                                             del_ip_id=del_ip_id,
 
                                             _authentication_token=self.authentication_token()))
 

	
 
        response = self.app.get(url('admin_permissions_ips'))
 
        response.mustcontain('All IP addresses are allowed')
 
        response.mustcontain(no=['127.0.0.0/24'])
 
        response.mustcontain(no=['127.0.0.0 - 127.0.0.255'])
 

	
 

	
 
    def test_index_overview(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_permissions_perms'))
 
        # Test response...
kallithea/tests/functional/test_admin_repo_groups.py
Show inline comments
 
from kallithea.tests import *
 

	
 
class TestRepoGroupsController(TestController):
 
class TestRepoGroupsController(TestControllerPytest):
 
    pass
kallithea/tests/functional/test_admin_settings.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
from kallithea.model.db import Setting, Ui
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestAdminSettingsController(TestController):
 
class TestAdminSettingsController(TestControllerPytest):
 

	
 
    def test_index_main(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings'))
 

	
 
    def test_index_mapping(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_mapping'))
 

	
 
    def test_index_global(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_global'))
 

	
 
    def test_index_visual(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_visual'))
 

	
 
    def test_index_email(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_email'))
 

	
 
    def test_index_hooks(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_hooks'))
 

	
 
    def test_create_custom_hook(self):
 
        self.log_user()
 
        response = self.app.post(url('admin_settings_hooks'),
 
                                params=dict(new_hook_ui_key='test_hooks_1',
 
                                            new_hook_ui_value='cd /tmp',
 
                                            _authentication_token=self.authentication_token()))
 

	
 
        response = response.follow()
 
        response.mustcontain('test_hooks_1')
 
        response.mustcontain('cd /tmp')
 

	
 
    def test_create_custom_hook_delete(self):
 
        self.log_user()
 
        response = self.app.post(url('admin_settings_hooks'),
 
                                params=dict(new_hook_ui_key='test_hooks_2',
 
                                            new_hook_ui_value='cd /tmp2',
 
                                            _authentication_token=self.authentication_token()))
 

	
 
        response = response.follow()
 
        response.mustcontain('test_hooks_2')
 
        response.mustcontain('cd /tmp2')
 

	
 
        hook_id = Ui.get_by_key('hooks', 'test_hooks_2').ui_id
 
        ## delete
 
        self.app.post(url('admin_settings_hooks'),
 
                        params=dict(hook_id=hook_id, _authentication_token=self.authentication_token()))
 
        response = self.app.get(url('admin_settings_hooks'))
 
        response.mustcontain(no=['test_hooks_2'])
 
        response.mustcontain(no=['cd /tmp2'])
 

	
 
    def test_index_search(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_search'))
 

	
 
    def test_index_system(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_system'))
 

	
 
    def test_ga_code_active(self):
 
        self.log_user()
 
        old_title = 'Kallithea'
 
        old_realm = 'Kallithea authentication'
 
        new_ga_code = 'ga-test-123456789'
 
        response = self.app.post(url('admin_settings_global'),
 
                        params=dict(title=old_title,
 
                                 realm=old_realm,
 
                                 ga_code=new_ga_code,
 
                                 captcha_private_key='',
 
                                 captcha_public_key='',
 
                                 _authentication_token=self.authentication_token(),
 
                                 ))
 

	
 
        self.checkSessionFlash(response, 'Updated application settings')
 

	
 
        self.assertEqual(Setting
 
                         .get_app_settings()['ga_code'], new_ga_code)
 

	
 
        response = response.follow()
 
        response.mustcontain("""_gaq.push(['_setAccount', '%s']);""" % new_ga_code)
 

	
 
    def test_ga_code_inactive(self):
 
        self.log_user()
 
        old_title = 'Kallithea'
 
        old_realm = 'Kallithea authentication'
 
        new_ga_code = ''
 
        response = self.app.post(url('admin_settings_global'),
 
                        params=dict(title=old_title,
 
                                 realm=old_realm,
 
                                 ga_code=new_ga_code,
 
                                 captcha_private_key='',
 
                                 captcha_public_key='',
 
                                 _authentication_token=self.authentication_token(),
 
                                 ))
 

	
 
        self.checkSessionFlash(response, 'Updated application settings')
 
        self.assertEqual(Setting
 
                        .get_app_settings()['ga_code'], new_ga_code)
 

	
 
        response = response.follow()
 
        response.mustcontain(no=["_gaq.push(['_setAccount', '%s']);" % new_ga_code])
 

	
 
    def test_captcha_activate(self):
 
        self.log_user()
 
        old_title = 'Kallithea'
 
        old_realm = 'Kallithea authentication'
 
        new_ga_code = ''
 
        response = self.app.post(url('admin_settings_global'),
 
                        params=dict(title=old_title,
 
                                 realm=old_realm,
 
                                 ga_code=new_ga_code,
 
                                 captcha_private_key='1234567890',
 
                                 captcha_public_key='1234567890',
 
                                 _authentication_token=self.authentication_token(),
 
                                 ))
 

	
 
        self.checkSessionFlash(response, 'Updated application settings')
 
        self.assertEqual(Setting
 
                        .get_app_settings()['captcha_private_key'], '1234567890')
 

	
 
        response = self.app.get(url('register'))
 
        response.mustcontain('captcha')
 

	
 
    def test_captcha_deactivate(self):
 
        self.log_user()
 
        old_title = 'Kallithea'
 
        old_realm = 'Kallithea authentication'
 
        new_ga_code = ''
 
        response = self.app.post(url('admin_settings_global'),
 
                        params=dict(title=old_title,
 
                                 realm=old_realm,
 
                                 ga_code=new_ga_code,
 
                                 captcha_private_key='',
 
                                 captcha_public_key='1234567890',
 
                                 _authentication_token=self.authentication_token(),
 
                                 ))
 

	
 
        self.checkSessionFlash(response, 'Updated application settings')
 
        self.assertEqual(Setting
 
                        .get_app_settings()['captcha_private_key'], '')
 

	
 
        response = self.app.get(url('register'))
 
        response.mustcontain(no=['captcha'])
 

	
 
    def test_title_change(self):
 
        self.log_user()
 
        old_title = 'Kallithea'
 
        new_title = old_title + '_changed'
 
        old_realm = 'Kallithea authentication'
 

	
 
        for new_title in ['Changed', 'Żółwik', old_title]:
 
            response = self.app.post(url('admin_settings_global'),
 
                        params=dict(title=new_title,
 
                                 realm=old_realm,
 
                                 ga_code='',
 
                                 captcha_private_key='',
 
                                 captcha_public_key='',
 
                                 _authentication_token=self.authentication_token(),
 
                                ))
 

	
 
            self.checkSessionFlash(response, 'Updated application settings')
 
            self.assertEqual(Setting
 
                             .get_app_settings()['title'],
 
                             new_title.decode('utf-8'))
 

	
 
            response = response.follow()
 
            response.mustcontain("""<div class="branding">%s</div>""" % new_title)
kallithea/tests/functional/test_admin_user_groups.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
from kallithea.tests import *
 
from kallithea.model.db import UserGroup, UserGroupToPerm, Permission
 
from kallithea.model.meta import Session
 

	
 
TEST_USER_GROUP = u'admins_test'
 

	
 

	
 
class TestAdminUsersGroupsController(TestController):
 
class TestAdminUsersGroupsController(TestControllerPytest):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('users_groups'))
 
        # Test response...
 

	
 
    def test_create(self):
 
        self.log_user()
 
        users_group_name = TEST_USER_GROUP
 
        response = self.app.post(url('users_groups'),
 
                                 {'users_group_name': users_group_name,
 
                                  'user_group_description': u'DESC',
 
                                  'active': True,
 
                                  '_authentication_token': self.authentication_token()})
 
        response.follow()
 

	
 
        self.checkSessionFlash(response,
 
                               'Created user group <a href="/_admin/user_groups/')
 
        self.checkSessionFlash(response,
 
                               '/edit">%s</a>' % TEST_USER_GROUP)
 

	
 
    def test_new(self):
 
        response = self.app.get(url('new_users_group'))
 

	
 
    def test_update(self):
 
        response = self.app.put(url('users_group', id=1), status=403)
 

	
 
    def test_update_browser_fakeout(self):
 
        response = self.app.post(url('users_group', id=1),
 
                                 params=dict(_method='put', _authentication_token=self.authentication_token()))
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        users_group_name = TEST_USER_GROUP + 'another'
 
        response = self.app.post(url('users_groups'),
 
                                 {'users_group_name':users_group_name,
 
                                  'user_group_description': u'DESC',
 
                                  'active': True,
 
                                  '_authentication_token': self.authentication_token()})
 
        response.follow()
 

	
 
        self.checkSessionFlash(response,
 
                               'Created user group ')
 

	
 
        gr = Session().query(UserGroup) \
 
            .filter(UserGroup.users_group_name == users_group_name).one()
 

	
 
        response = self.app.post(url('users_group', id=gr.users_group_id),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 

	
 
        gr = Session().query(UserGroup) \
 
            .filter(UserGroup.users_group_name == users_group_name).scalar()
 

	
 
        self.assertEqual(gr, None)
 

	
 
    def test_default_perms_enable_repository_read_on_group(self):
 
        self.log_user()
 
        users_group_name = TEST_USER_GROUP + 'another2'
 
        response = self.app.post(url('users_groups'),
 
                                 {'users_group_name': users_group_name,
 
                                  'user_group_description': u'DESC',
 
                                  'active': True,
 
                                  '_authentication_token': self.authentication_token()})
 
        response.follow()
 

	
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        self.checkSessionFlash(response,
 
                               'Created user group ')
 
        ## ENABLE REPO CREATE ON A GROUP
 
        response = self.app.put(url('edit_user_group_default_perms',
 
                                    id=ug.users_group_id),
 
                                 {'create_repo_perm': True,
 
                                  '_authentication_token': self.authentication_token()})
 
        response.follow()
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        p = Permission.get_by_key('hg.create.repository')
 
        p2 = Permission.get_by_key('hg.usergroup.create.false')
 
        p3 = Permission.get_by_key('hg.fork.none')
 
        # check if user has this perms, they should be here since
 
        # defaults are on
 
        perms = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group == ug).all()
 

	
 
        self.assertEqual(
 
            sorted([[x.users_group_id, x.permission_id, ] for x in perms]),
 
            sorted([[ug.users_group_id, p.permission_id],
 
                    [ug.users_group_id, p2.permission_id],
 
                    [ug.users_group_id, p3.permission_id]]))
 

	
 
        ## DISABLE REPO CREATE ON A GROUP
 
        response = self.app.put(
 
            url('edit_user_group_default_perms', id=ug.users_group_id),
 
            params={'_authentication_token': self.authentication_token()})
 

	
 
        response.follow()
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        p = Permission.get_by_key('hg.create.none')
 
        p2 = Permission.get_by_key('hg.usergroup.create.false')
 
        p3 = Permission.get_by_key('hg.fork.none')
 

	
 
        # check if user has this perms, they should be here since
 
        # defaults are on
 
        perms = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group == ug).all()
 

	
 
        self.assertEqual(
 
            sorted([[x.users_group_id, x.permission_id, ] for x in perms]),
 
            sorted([[ug.users_group_id, p.permission_id],
 
                    [ug.users_group_id, p2.permission_id],
 
                    [ug.users_group_id, p3.permission_id]]))
 

	
 
        # DELETE !
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        ugid = ug.users_group_id
 
        response = self.app.post(url('users_group', id=ug.users_group_id),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
        response = response.follow()
 
        gr = Session().query(UserGroup) \
 
            .filter(UserGroup.users_group_name == users_group_name).scalar()
 

	
 
        self.assertEqual(gr, None)
 
        p = Permission.get_by_key('hg.create.repository')
 
        perms = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group_id == ugid).all()
 
        perms = [[x.users_group_id,
 
                  x.permission_id, ] for x in perms]
 
        self.assertEqual(perms, [])
 

	
 
    def test_default_perms_enable_repository_fork_on_group(self):
 
        self.log_user()
 
        users_group_name = TEST_USER_GROUP + 'another2'
 
        response = self.app.post(url('users_groups'),
 
                                 {'users_group_name': users_group_name,
 
                                  'user_group_description': u'DESC',
 
                                  'active': True,
 
                                  '_authentication_token': self.authentication_token()})
 
        response.follow()
 

	
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        self.checkSessionFlash(response,
 
                               'Created user group ')
 
        ## ENABLE REPO CREATE ON A GROUP
 
        response = self.app.put(url('edit_user_group_default_perms',
 
                                    id=ug.users_group_id),
 
                                {'fork_repo_perm': True, '_authentication_token': self.authentication_token()})
 

	
 
        response.follow()
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        p = Permission.get_by_key('hg.create.none')
 
        p2 = Permission.get_by_key('hg.usergroup.create.false')
 
        p3 = Permission.get_by_key('hg.fork.repository')
 
        # check if user has this perms, they should be here since
 
        # defaults are on
 
        perms = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group == ug).all()
 

	
 
        self.assertEqual(
 
            sorted([[x.users_group_id, x.permission_id, ] for x in perms]),
 
            sorted([[ug.users_group_id, p.permission_id],
 
                    [ug.users_group_id, p2.permission_id],
 
                    [ug.users_group_id, p3.permission_id]]))
 

	
 
        ## DISABLE REPO CREATE ON A GROUP
 
        response = self.app.put(url('edit_user_group_default_perms', id=ug.users_group_id),
 
            params={'_authentication_token': self.authentication_token()})
 

	
 
        response.follow()
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        p = Permission.get_by_key('hg.create.none')
 
        p2 = Permission.get_by_key('hg.usergroup.create.false')
 
        p3 = Permission.get_by_key('hg.fork.none')
 
        # check if user has this perms, they should be here since
 
        # defaults are on
 
        perms = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group == ug).all()
 

	
 
        self.assertEqual(
 
            sorted([[x.users_group_id, x.permission_id, ] for x in perms]),
 
            sorted([[ug.users_group_id, p.permission_id],
 
                    [ug.users_group_id, p2.permission_id],
 
                    [ug.users_group_id, p3.permission_id]]))
 

	
 
        # DELETE !
 
        ug = UserGroup.get_by_group_name(users_group_name)
 
        ugid = ug.users_group_id
 
        response = self.app.post(url('users_group', id=ug.users_group_id),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
        response = response.follow()
 
        gr = Session().query(UserGroup) \
 
                           .filter(UserGroup.users_group_name ==
 
                                   users_group_name).scalar()
 

	
kallithea/tests/functional/test_admin_users.py
Show inline comments
 
@@ -343,262 +343,262 @@ class TestAdminUsersController(TestContr
 
                                                 _authentication_token=self.authentication_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            #User should have None permission on creation repository
 
            self.assertEqual(UserModel().has_perm(uid, perm_none), False)
 
            self.assertEqual(UserModel().has_perm(uid, perm_create), True)
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_revoke_perm_fork_repo(self):
 
        self.log_user()
 
        perm_none = Permission.get_by_key('hg.fork.none')
 
        perm_fork = Permission.get_by_key('hg.fork.repository')
 

	
 
        user = UserModel().create_or_update(username='dummy', password='qwe',
 
                                            email='dummy', firstname=u'a',
 
                                            lastname=u'b')
 
        Session().commit()
 
        uid = user.user_id
 

	
 
        try:
 
            #User should have None permission on creation repository
 
            self.assertEqual(UserModel().has_perm(user, perm_none), False)
 
            self.assertEqual(UserModel().has_perm(user, perm_fork), False)
 

	
 
            response = self.app.post(url('edit_user_perms', id=uid),
 
                                     params=dict(_method='put', _authentication_token=self.authentication_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            #User should have None permission on creation repository
 
            self.assertEqual(UserModel().has_perm(uid, perm_none), True)
 
            self.assertEqual(UserModel().has_perm(uid, perm_create), False)
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_ips(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        response = self.app.get(url('edit_user_ips', id=user.user_id))
 
        response.mustcontain('All IP addresses are allowed')
 

	
 
    @parameterized.expand([
 
        ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False),
 
        ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False),
 
        ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False),
 
        ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False),
 
        ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True),
 
        ('127_bad_ip', 'foobar', 'foobar', True),
 
    ])
 
    def test_add_ip(self, test_name, ip, ip_range, failure):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.put(url('edit_user_ips', id=user_id),
 
                                params=dict(new_ip=ip, _authentication_token=self.authentication_token()))
 

	
 
        if failure:
 
            self.checkSessionFlash(response, 'Please enter a valid IPv4 or IPv6 address')
 
            response = self.app.get(url('edit_user_ips', id=user_id))
 
            response.mustcontain(no=[ip])
 
            response.mustcontain(no=[ip_range])
 

	
 
        else:
 
            response = self.app.get(url('edit_user_ips', id=user_id))
 
            response.mustcontain(ip)
 
            response.mustcontain(ip_range)
 

	
 
        ## cleanup
 
        for del_ip in UserIpMap.query().filter(UserIpMap.user_id == user_id).all():
 
            Session().delete(del_ip)
 
            Session().commit()
 

	
 
    def test_delete_ip(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 
        ip = '127.0.0.1/32'
 
        ip_range = '127.0.0.1 - 127.0.0.1'
 
        new_ip = UserModel().add_extra_ip(user_id, ip)
 
        Session().commit()
 
        new_ip_id = new_ip.ip_id
 

	
 
        response = self.app.get(url('edit_user_ips', id=user_id))
 
        response.mustcontain(ip)
 
        response.mustcontain(ip_range)
 

	
 
        self.app.post(url('edit_user_ips', id=user_id),
 
                      params=dict(_method='delete', del_ip_id=new_ip_id, _authentication_token=self.authentication_token()))
 

	
 
        response = self.app.get(url('edit_user_ips', id=user_id))
 
        response.mustcontain('All IP addresses are allowed')
 
        response.mustcontain(no=[ip])
 
        response.mustcontain(no=[ip_range])
 

	
 
    def test_api_keys(self):
 
        self.log_user()
 

	
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        response = self.app.get(url('edit_user_api_keys', id=user.user_id))
 
        response.mustcontain(user.api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
    @parameterized.expand([
 
        ('forever', -1),
 
        ('5mins', 60*5),
 
        ('30days', 60*60*24*30),
 
    ])
 
    def test_add_api_keys(self, desc, lifetime):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_api_keys', id=user_id),
 
                 {'description': desc, 'lifetime': lifetime, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        try:
 
            response = response.follow()
 
            user = User.get(user_id)
 
            for api_key in user.api_keys:
 
                response.mustcontain(api_key)
 
        finally:
 
            for api_key in UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all():
 
                Session().delete(api_key)
 
                Session().commit()
 

	
 
    def test_remove_api_key(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_api_keys', id=user_id),
 
                {'description': 'desc', 'lifetime': -1, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        response = response.follow()
 

	
 
        #now delete our key
 
        keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
 
        self.assertEqual(1, len(keys))
 

	
 
        response = self.app.post(url('edit_user_api_keys', id=user_id),
 
                 {'_method': 'delete', 'del_api_key': keys[0].api_key, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully deleted')
 
        keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
 
        self.assertEqual(0, len(keys))
 

	
 
    def test_reset_main_api_key(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 
        api_key = user.api_key
 
        response = self.app.get(url('edit_user_api_keys', id=user_id))
 
        response.mustcontain(api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
        response = self.app.post(url('edit_user_api_keys', id=user_id),
 
                 {'_method': 'delete', 'del_api_key_builtin': api_key, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully reset')
 
        response = response.follow()
 
        response.mustcontain(no=[api_key])
 

	
 
# TODO To be uncommented when pytest is the test runner
 
#import pytest
 
#from kallithea.controllers.admin.users import UsersController
 
#class TestAdminUsersController_unittest(object):
 
#    """
 
#    Unit tests for the users controller
 
#    These are in a separate class, not deriving from TestController (and thus
 
#    unittest.TestCase), to be able to benefit from pytest features like
 
#    monkeypatch.
 
#    """
 
#    def test_get_user_or_raise_if_default(self, monkeypatch):
 
#        # flash complains about an unexisting session
 
#        def flash_mock(*args, **kwargs):
 
#            pass
 
#        monkeypatch.setattr(h, 'flash', flash_mock)
 
#
 
#        u = UsersController()
 
#        # a regular user should work correctly
 
#        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
#        assert u._get_user_or_raise_if_default(user.user_id) == user
 
#        # the default user should raise
 
#        with pytest.raises(HTTPNotFound):
 
#            u._get_user_or_raise_if_default(User.get_default_user().user_id)
 

	
 

	
 
class TestAdminUsersControllerForDefaultUser(TestController):
 
class TestAdminUsersControllerForDefaultUser(TestControllerPytest):
 
    """
 
    Edit actions on the default user are not allowed.
 
    Validate that they throw a 404 exception.
 
    """
 
    def test_edit_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user', id=user.user_id), status=404)
 

	
 
    def test_edit_advanced_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_advanced', id=user.user_id), status=404)
 

	
 
    # API keys
 
    def test_edit_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_api_keys', id=user.user_id), status=404)
 

	
 
    def test_add_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_api_keys', id=user.user_id),
 
                 {'_method': 'put', '_authentication_token': self.authentication_token()}, status=404)
 

	
 
    def test_delete_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_api_keys', id=user.user_id),
 
                 {'_method': 'delete', '_authentication_token': self.authentication_token()}, status=404)
 

	
 
    # Permissions
 
    def test_edit_perms_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_perms', id=user.user_id), status=404)
 

	
 
    def test_update_perms_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_perms', id=user.user_id),
 
                 {'_method': 'put', '_authentication_token': self.authentication_token()}, status=404)
 

	
 
    # Emails
 
    def test_edit_emails_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_emails', id=user.user_id), status=404)
 

	
 
    def test_add_emails_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_emails', id=user.user_id),
 
                 {'_method': 'put', '_authentication_token': self.authentication_token()}, status=404)
 

	
 
    def test_delete_emails_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_emails', id=user.user_id),
 
                 {'_method': 'delete', '_authentication_token': self.authentication_token()}, status=404)
 

	
 
    # IP addresses
 
    # Add/delete of IP addresses for the default user is used to maintain
 
    # the global IP whitelist and thus allowed. Only 'edit' is forbidden.
 
    def test_edit_ip_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_ips', id=user.user_id), status=404)
kallithea/tests/functional/test_branches.py
Show inline comments
 
from kallithea.tests import *
 

	
 

	
 
class TestBranchesController(TestController):
 
class TestBranchesController(TestControllerPytest):
 

	
 
    def test_index_hg(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='branches',
 
                                    action='index', repo_name=HG_REPO))
 
        response.mustcontain("""<a href="/%s/changelog?branch=default">default</a>""" % HG_REPO)
 

	
 
        # closed branches
 
        response.mustcontain("""<a href="/%s/changelog?branch=git">git [closed]</a>""" % HG_REPO)
 
        response.mustcontain("""<a href="/%s/changelog?branch=web">web [closed]</a>""" % HG_REPO)
 

	
 
    def test_index_git(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='branches',
 
                                    action='index', repo_name=GIT_REPO))
 
        response.mustcontain("""<a href="/%s/changelog?branch=master">master</a>""" % GIT_REPO)
kallithea/tests/functional/test_changelog.py
Show inline comments
 
from kallithea.tests import *
 

	
 

	
 
class TestChangelogController(TestController):
 
class TestChangelogController(TestControllerPytest):
 

	
 
    def test_index_hg(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=HG_REPO))
 

	
 
        response.mustcontain('''id="chg_20" class="container mergerow"''')
 
        response.mustcontain(
 
            """<input class="changeset_range" """
 
            """id="7b22a518347bb9bc19679f6af07cd0a61bfe16e7" """
 
            """name="7b22a518347bb9bc19679f6af07cd0a61bfe16e7" """
 
            """type="checkbox" value="1" />"""
 
        )
 
        #rev 640: code garden
 
        response.mustcontain(
 
            """<span class="changeset_hash">r640:0a4e54a44604</span>"""
 
        )
 
        response.mustcontain("""code garden""")
 

	
 
    def test_index_pagination_hg(self):
 
        self.log_user()
 
        #pagination
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=HG_REPO), {'page': 1})
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=HG_REPO), {'page': 2})
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=HG_REPO), {'page': 3})
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=HG_REPO), {'page': 4})
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=HG_REPO), {'page': 5})
 
        response = self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=HG_REPO), {'page': 6, 'size': 20})
 

	
 
        # Test response after pagination...
 
        response.mustcontain(
 
            """<input class="changeset_range" """
 
            """id="22baf968d547386b9516965ce89d189665003a31" """
 
            """name="22baf968d547386b9516965ce89d189665003a31" """
 
            """type="checkbox" value="1" />"""
 
        )
 

	
 
        response.mustcontain(
 
            """<span class="changeset_hash">r539:22baf968d547</span>"""
 
        )
 

	
 
    def test_index_git(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=GIT_REPO))
 

	
 
        response.mustcontain('''id="chg_20" class="container "''') # why no mergerow for git?
 
        response.mustcontain(
 
            """<input class="changeset_range" """
 
            """id="95f9a91d775b0084b2368ae7779e44931c849c0e" """
 
            """name="95f9a91d775b0084b2368ae7779e44931c849c0e" """
 
            """type="checkbox" value="1" />"""
 
        )
 

	
 
        response.mustcontain(
 
            """<span class="changeset_hash">r613:95f9a91d775b</span>"""
 
        )
 

	
 
        response.mustcontain("""fixing stupid typo in context for mercurial""")
 

	
 
#        response.mustcontain(
 
#            """<div id="changed_total_5e204e7583b9c8e7b93a020bd036564b1e731dae" """
 
#            """style="float:right;" class="changed_total tooltip" """
 
#            """title="Affected number of files, click to show """
 
#            """more details">3</div>"""
 
#        )
 

	
 
    def test_index_pagination_git(self):
 
        self.log_user()
 
        #pagination
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=GIT_REPO), {'page': 1})
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=GIT_REPO), {'page': 2})
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=GIT_REPO), {'page': 3})
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=GIT_REPO), {'page': 4})
 
        self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=GIT_REPO), {'page': 5})
 
        response = self.app.get(url(controller='changelog', action='index',
 
                                    repo_name=GIT_REPO), {'page': 6, 'size': 20})
 

	
 
        # Test response after pagination...
 
        response.mustcontain(
 
            """<input class="changeset_range" """
 
            """id="636ed213f2f11ef91071b9c24f2d5e6bd01a6ed5" """
 
            """name="636ed213f2f11ef91071b9c24f2d5e6bd01a6ed5" """
 
            """type="checkbox" value="1" />"""
 
        )
 

	
 
        response.mustcontain(
 
            """<span class="changeset_hash">r515:636ed213f2f1</span>"""
 
        )
 

	
 
    def test_index_hg_with_filenode(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
                                    revision='tip', f_path='/vcs/exceptions.py',
 
                                    repo_name=HG_REPO))
 
        #history commits messages
 
        response.mustcontain('Added exceptions module, this time for real')
 
        response.mustcontain('Added not implemented hg backend test case')
 
        response.mustcontain('Added BaseChangeset class')
 
        # Test response...
 

	
 
    def test_index_git_with_filenode(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
                                    revision='tip', f_path='/vcs/exceptions.py',
 
                                    repo_name=GIT_REPO))
 
        #history commits messages
 
        response.mustcontain('Added exceptions module, this time for real')
 
        response.mustcontain('Added not implemented hg backend test case')
 
        response.mustcontain('Added BaseChangeset class')
 

	
 
    def test_index_hg_with_filenode_that_is_dirnode(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
                                    revision='tip', f_path='/tests',
 
                                    repo_name=HG_REPO))
 
        self.assertEqual(response.status, '302 Found')
 

	
 
    def test_index_git_with_filenode_that_is_dirnode(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
                                    revision='tip', f_path='/tests',
 
                                    repo_name=GIT_REPO))
 
        self.assertEqual(response.status, '302 Found')
 

	
 
    def test_index_hg_with_filenode_not_existing(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
                                    revision='tip', f_path='/wrong_path',
 
                                    repo_name=HG_REPO))
 
        self.assertEqual(response.status, '302 Found')
 

	
 
    def test_index_git_with_filenode_not_existing(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',
 
                                    revision='tip', f_path='/wrong_path',
 
                                    repo_name=GIT_REPO))
 
        self.assertEqual(response.status, '302 Found')
kallithea/tests/functional/test_changeset.py
Show inline comments
 
from kallithea.tests import *
 

	
 
class TestChangesetController(TestController):
 
class TestChangesetController(TestControllerPytest):
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                    repo_name=HG_REPO, revision='tip'))
 
        # Test response...
 

	
 
    def test_changeset_range(self):
 
        #print self.app.get(url(controller='changelog', action='index', repo_name=HG_REPO))
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                    repo_name=HG_REPO, revision='a53d9201d4bc278910d416d94941b7ea007ecd52...96507bd11ecc815ebc6270fdf6db110928c09c1e'))
 

	
 
        response = self.app.get(url(controller='changeset', action='changeset_raw',
 
                                    repo_name=HG_REPO, revision='a53d9201d4bc278910d416d94941b7ea007ecd52'))
 

	
 
        response = self.app.get(url(controller='changeset', action='changeset_patch',
 
                                    repo_name=HG_REPO, revision='a53d9201d4bc278910d416d94941b7ea007ecd52'))
 

	
 
        response = self.app.get(url(controller='changeset', action='changeset_download',
 
                                    repo_name=HG_REPO, revision='a53d9201d4bc278910d416d94941b7ea007ecd52'))
kallithea/tests/functional/test_compare_local.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
from kallithea.tests import *
 

	
 
class TestCompareController(TestController):
 
class TestCompareController(TestControllerPytest):
 

	
 
    def test_compare_tag_hg(self):
 
        self.log_user()
 
        tag1 = 'v0.1.2'
 
        tag2 = 'v0.1.3'
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=HG_REPO,
 
                                    org_ref_type="tag",
 
                                    org_ref_name=tag1,
 
                                    other_ref_type="tag",
 
                                    other_ref_name=tag2,
 
                                    ), status=200)
 
        response.mustcontain('%s@%s' % (HG_REPO, tag1))
 
        response.mustcontain('%s@%s' % (HG_REPO, tag2))
 

	
 
        ## outgoing changesets between tags
 
        response.mustcontain('''<a href="/%s/changeset/c5ddebc06eaaba3010c2d66ea6ec9d074eb0f678">r112:c5ddebc06eaa</a>''' % HG_REPO)
 
        response.mustcontain('''<a href="/%s/changeset/70d4cef8a37657ee4cf5aabb3bd9f68879769816">r115:70d4cef8a376</a>''' % HG_REPO)
 
        response.mustcontain('''<a href="/%s/changeset/9749bfbfc0d2eba208d7947de266303b67c87cda">r116:9749bfbfc0d2</a>''' % HG_REPO)
 
        response.mustcontain('''<a href="/%s/changeset/41fda979f02fda216374bf8edac4e83f69e7581c">r117:41fda979f02f</a>''' % HG_REPO)
 
        response.mustcontain('''<a href="/%s/changeset/bb1a3ab98cc45cb934a77dcabf87a5a598b59e97">r118:bb1a3ab98cc4</a>''' % HG_REPO)
 
        response.mustcontain('''<a href="/%s/changeset/36e0fc9d2808c5022a24f49d6658330383ed8666">r119:36e0fc9d2808</a>''' % HG_REPO)
 
        response.mustcontain('''<a href="/%s/changeset/17544fbfcd33ffb439e2b728b5d526b1ef30bfcf">r120:17544fbfcd33</a>''' % HG_REPO)
 

	
 
        response.mustcontain('11 files changed with 94 insertions and 64 deletions')
 

	
 
        ## files diff
 
        response.mustcontain('''<div class="node">
 
                             <i class="icon-diff-A"></i>
 
                             <a href="#C--1c5cf9e91c12">docs/api/utils/index.rst</a>''')
 
        response.mustcontain('''<div class="node">
 
                             <i class="icon-diff-A"></i>
 
                             <a href="#C--e3305437df55">test_and_report.sh</a>''')
 
        response.mustcontain('''<div class="node">
 
                             <i class="icon-diff-M"></i>
 
                             <a href="#C--c8e92ef85cd1">.hgignore</a>''')
 
        response.mustcontain('''<div class="node">
 
                             <i class="icon-diff-M"></i>
 
                             <a href="#C--6e08b694d687">.hgtags</a>''')
 
        response.mustcontain('''<div class="node">
 
                             <i class="icon-diff-M"></i>
 
                             <a href="#C--2c14b00f3393">docs/api/index.rst</a>''')
 
        response.mustcontain('''<div class="node">
 
                             <i class="icon-diff-M"></i>
 
                             <a href="#C--430ccbc82bdf">vcs/__init__.py</a>''')
 
        response.mustcontain('''<div class="node">
 
                             <i class="icon-diff-M"></i>
 
                             <a href="#C--9c390eb52cd6">vcs/backends/hg.py</a>''')
 
        response.mustcontain('''<div class="node">
 
                             <i class="icon-diff-M"></i>
 
                             <a href="#C--ebb592c595c0">vcs/utils/__init__.py</a>''')
 
        response.mustcontain('''<div class="node">
 
                             <i class="icon-diff-M"></i>
 
                             <a href="#C--7abc741b5052">vcs/utils/annotate.py</a>''')
 
        response.mustcontain('''<div class="node">
 
                             <i class="icon-diff-M"></i>
 
                             <a href="#C--2ef0ef106c56">vcs/utils/diffs.py</a>''')
 
        response.mustcontain('''<div class="node">
 
                             <i class="icon-diff-M"></i>
 
                             <a href="#C--3150cb87d4b7">vcs/utils/lazy.py</a>''')
 

	
 
    def test_compare_tag_git(self):
 
        self.log_user()
 
        tag1 = 'v0.1.2'
 
        tag2 = 'v0.1.3'
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=GIT_REPO,
 
                                    org_ref_type="tag",
 
                                    org_ref_name=tag1,
 
                                    other_ref_type="tag",
 
                                    other_ref_name=tag2,
 
                                    ), status=200)
 
        response.mustcontain('%s@%s' % (GIT_REPO, tag1))
 
        response.mustcontain('%s@%s' % (GIT_REPO, tag2))
 

	
 
        ## outgoing changesets between tags
 
        response.mustcontain('''<a href="/%s/changeset/794bbdd31545c199f74912709ea350dedcd189a2">r113:794bbdd31545</a>''' % GIT_REPO)
 
        response.mustcontain('''<a href="/%s/changeset/e36d8c5025329bdd4212bd53d4ed8a70ff44985f">r115:e36d8c502532</a>''' % GIT_REPO)
 
        response.mustcontain('''<a href="/%s/changeset/5c9ff4f6d7508db0e72b1d2991c357d0d8e07af2">r116:5c9ff4f6d750</a>''' % GIT_REPO)
 
        response.mustcontain('''<a href="/%s/changeset/b7187fa2b8c1d773ec35e9dee12f01f74808c879">r117:b7187fa2b8c1</a>''' % GIT_REPO)
 
        response.mustcontain('''<a href="/%s/changeset/5f3b74262014a8de2dc7dade1152de9fd0c8efef">r118:5f3b74262014</a>''' % GIT_REPO)
 
        response.mustcontain('''<a href="/%s/changeset/17438a11f72b93f56d0e08e7d1fa79a378578a82">r119:17438a11f72b</a>''' % GIT_REPO)
 
        response.mustcontain('''<a href="/%s/changeset/5a3a8fb005554692b16e21dee62bf02667d8dc3e">r120:5a3a8fb00555</a>''' % GIT_REPO)
 

	
 
        response.mustcontain('11 files changed with 94 insertions and 64 deletions')
 

	
 
        #files
 
        response.mustcontain('''<a href="#C--1c5cf9e91c12">docs/api/utils/index.rst</a>''')
 
        response.mustcontain('''<a href="#C--e3305437df55">test_and_report.sh</a>''')
 
        response.mustcontain('''<a href="#C--c8e92ef85cd1">.hgignore</a>''')
 
        response.mustcontain('''<a href="#C--6e08b694d687">.hgtags</a>''')
 
        response.mustcontain('''<a href="#C--2c14b00f3393">docs/api/index.rst</a>''')
 
        response.mustcontain('''<a href="#C--430ccbc82bdf">vcs/__init__.py</a>''')
 
        response.mustcontain('''<a href="#C--9c390eb52cd6">vcs/backends/hg.py</a>''')
 
        response.mustcontain('''<a href="#C--ebb592c595c0">vcs/utils/__init__.py</a>''')
 
        response.mustcontain('''<a href="#C--7abc741b5052">vcs/utils/annotate.py</a>''')
 
        response.mustcontain('''<a href="#C--2ef0ef106c56">vcs/utils/diffs.py</a>''')
 
        response.mustcontain('''<a href="#C--3150cb87d4b7">vcs/utils/lazy.py</a>''')
 

	
 
    def test_index_branch_hg(self):
 
        self.log_user()
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=HG_REPO,
 
                                    org_ref_type="branch",
 
                                    org_ref_name='default',
 
                                    other_ref_type="branch",
 
                                    other_ref_name='default',
 
                                    ))
 

	
 
        response.mustcontain('%s@default' % (HG_REPO))
 
        response.mustcontain('%s@default' % (HG_REPO))
 
        # branch are equal
 
        response.mustcontain('<span class="empty_data">No files</span>')
 
        response.mustcontain('<span class="empty_data">No changesets</span>')
 

	
 
    def test_index_branch_git(self):
 
        self.log_user()
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=GIT_REPO,
 
                                    org_ref_type="branch",
 
                                    org_ref_name='master',
 
                                    other_ref_type="branch",
 
                                    other_ref_name='master',
 
                                    ))
 

	
 
        response.mustcontain('%s@master' % (GIT_REPO))
 
        response.mustcontain('%s@master' % (GIT_REPO))
 
        # branch are equal
 
        response.mustcontain('<span class="empty_data">No files</span>')
 
        response.mustcontain('<span class="empty_data">No changesets</span>')
 

	
 
    def test_compare_revisions_hg(self):
 
        self.log_user()
 
        rev1 = 'b986218ba1c9'
 
        rev2 = '3d8f361e72ab'
 

	
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=HG_REPO,
 
                                    org_ref_type="rev",
 
                                    org_ref_name=rev1,
 
                                    other_ref_type="rev",
 
                                    other_ref_name=rev2,
 
                                    ))
 
        response.mustcontain('%s@%s' % (HG_REPO, rev1))
 
        response.mustcontain('%s@%s' % (HG_REPO, rev2))
 

	
 
        ## outgoing changesets between those revisions
 
        response.mustcontain("""<a href="/%s/changeset/3d8f361e72ab303da48d799ff1ac40d5ac37c67e">r1:%s</a>""" % (HG_REPO, rev2))
 

	
 
        response.mustcontain('1 file changed with 7 insertions and 0 deletions')
 
        ## files
 
        response.mustcontain("""<a href="#C--c8e92ef85cd1">.hgignore</a>""")
 

	
 
    def test_compare_revisions_git(self):
 
        self.log_user()
 
        rev1 = 'c1214f7e79e02fc37156ff215cd71275450cffc3'
 
        rev2 = '38b5fe81f109cb111f549bfe9bb6b267e10bc557'
 

	
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=GIT_REPO,
 
                                    org_ref_type="rev",
 
                                    org_ref_name=rev1,
 
                                    other_ref_type="rev",
 
                                    other_ref_name=rev2,
 
                                    ))
 
        response.mustcontain('%s@%s' % (GIT_REPO, rev1))
 
        response.mustcontain('%s@%s' % (GIT_REPO, rev2))
 

	
 
        ## outgoing changesets between those revisions
 
        response.mustcontain("""<a href="/%s/changeset/38b5fe81f109cb111f549bfe9bb6b267e10bc557">r1:%s</a>""" % (GIT_REPO, rev2[:12]))
 
        response.mustcontain('1 file changed with 7 insertions and 0 deletions')
 

	
 
        ## files
 
        response.mustcontain("""<a href="#C--c8e92ef85cd1">.hgignore</a>""")
 

	
 
    def test_compare_revisions_hg_as_form(self):
 
        self.log_user()
 
        rev1 = 'b986218ba1c9'
 
        rev2 = '3d8f361e72ab'
 

	
 
        response = self.app.get(url('compare_url',
 
                                    repo_name=HG_REPO,
 
                                    org_ref_type="rev",
 
                                    org_ref_name=rev1,
 
                                    other_ref_type="rev",
 
                                    other_ref_name=rev2,
 
                                    as_form=True,
 
                                    ),
 
                                extra_environ={'HTTP_X_PARTIAL_XHR': '1'},)
 

	
 
        ## outgoing changesets between those revisions
 
        response.mustcontain("""<a href="/%s/changeset/3d8f361e72ab303da48d799ff1ac40d5ac37c67e">r1:%s</a>""" % (HG_REPO, rev2))
kallithea/tests/functional/test_feed.py
Show inline comments
 
from kallithea.tests import *
 

	
 
class TestFeedController(TestController):
 
class TestFeedController(TestControllerPytest):
 

	
 
    def test_rss(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='feed', action='rss',
 
                                    repo_name=HG_REPO))
 

	
 

	
 

	
 
        assert response.content_type == "application/rss+xml"
 
        assert """<rss version="2.0">""" in response
 

	
 
    def test_atom(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='feed', action='atom',
 
                                    repo_name=HG_REPO))
 

	
 
        assert response.content_type == """application/atom+xml"""
 
        assert """<?xml version="1.0" encoding="utf-8"?>""" in response
 
        assert """<feed xmlns="http://www.w3.org/2005/Atom" xml:lang="en-us">""" in response
kallithea/tests/functional/test_followers.py
Show inline comments
 
from kallithea.tests import *
 

	
 

	
 
class TestFollowersController(TestController):
 
class TestFollowersController(TestControllerPytest):
 

	
 
    def test_index_hg(self):
 
        self.log_user()
 
        repo_name = HG_REPO
 
        response = self.app.get(url(controller='followers',
 
                                    action='followers',
 
                                    repo_name=repo_name))
 

	
 
        response.mustcontain(TEST_USER_ADMIN_LOGIN)
 
        response.mustcontain("""Started following""")
 

	
 
    def test_index_git(self):
 
        self.log_user()
 
        repo_name = GIT_REPO
 
        response = self.app.get(url(controller='followers',
 
                                    action='followers',
 
                                    repo_name=repo_name))
 

	
 
        response.mustcontain(TEST_USER_ADMIN_LOGIN)
 
        response.mustcontain("""Started following""")
kallithea/tests/functional/test_home.py
Show inline comments
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.model.meta import Session
 
from kallithea.model.db import Repository
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestHomeController(TestController):
 
class TestHomeController(TestControllerPytest):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='home', action='index'))
 
        #if global permission is set
 
        response.mustcontain('Add Repository')
 
        # html in javascript variable:
 
        response.mustcontain('var data = {"totalRecords": %s' % len(Repository.getAll()))
 
        response.mustcontain(r'href=\"/%s\"' % HG_REPO)
 

	
 
        response.mustcontain(r'<span class="repotag">git')
 
        response.mustcontain(r'<i class=\"icon-globe\"')
 

	
 
        response.mustcontain("""fixes issue with having custom format for git-log""")
 
        response.mustcontain("""/%s/changeset/5f2c6ee195929b0be80749243c18121c9864a3b3""" % GIT_REPO)
 

	
 
        response.mustcontain("""disable security checks on hg clone for travis""")
 
        response.mustcontain("""/%s/changeset/96507bd11ecc815ebc6270fdf6db110928c09c1e""" % HG_REPO)
 

	
 
    def test_repo_summary_with_anonymous_access_disabled(self):
 
        with fixture.anon_access(False):
 
            response = self.app.get(url(controller='summary',
 
                                        action='index', repo_name=HG_REPO),
 
                                        status=302)
 
            assert 'login' in response.location
 

	
 
    def test_index_with_anonymous_access_disabled(self):
 
        with fixture.anon_access(False):
 
            response = self.app.get(url(controller='home', action='index'),
 
                                    status=302)
 
            assert 'login' in response.location
 

	
 
    def test_index_page_on_groups(self):
 
        self.log_user()
 
        gr = fixture.create_repo_group(u'gr1')
 
        fixture.create_repo(name=u'gr1/repo_in_group', repo_group=gr)
 
        response = self.app.get(url('repos_group_home', group_name=u'gr1'))
 

	
 
        try:
 
            response.mustcontain(u"gr1/repo_in_group")
 
        finally:
 
            RepoModel().delete(u'gr1/repo_in_group')
 
            RepoGroupModel().delete(repo_group=u'gr1', force_delete=True)
 
            Session().commit()
kallithea/tests/functional/test_journal.py
Show inline comments
 
from kallithea.tests import *
 
import datetime
 

	
 

	
 
class TestJournalController(TestController):
 
class TestJournalController(TestControllerPytest):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='journal', action='index'))
 

	
 
        response.mustcontain("""<div class="journal_day">%s</div>""" % datetime.date.today())
 

	
 
    def test_stop_following_repository(self):
 
        session = self.log_user()
 
#        usr = Session().query(User).filter(User.username == TEST_USER_ADMIN_LOGIN).one()
 
#        repo = Session().query(Repository).filter(Repository.repo_name == HG_REPO).one()
 
#
 
#        followings = Session().query(UserFollowing) \
 
#            .filter(UserFollowing.user == usr) \
 
#            .filter(UserFollowing.follows_repository == repo).all()
 
#
 
#        assert len(followings) == 1, 'Not following any repository'
 
#
 
#        response = self.app.post(url(controller='journal',
 
#                                     action='toggle_following'),
 
#                                     {'follows_repo_id':repo.repo_id})
 

	
 
    def test_start_following_repository(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='journal', action='index'),)
 

	
 
    def test_public_journal_atom(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='journal', action='public_journal_atom'),)
 

	
 
    def test_public_journal_rss(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='journal', action='public_journal_rss'),)
kallithea/tests/functional/test_pullrequests.py
Show inline comments
 
import re
 

	
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.model.meta import Session
 

	
 
from kallithea.controllers.pullrequests import PullrequestsController
 

	
 
fixture = Fixture()
 

	
 
class TestPullrequestsController(TestController):
 
class TestPullrequestsController(TestControllerPytest):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='pullrequests', action='index',
 
                                    repo_name=HG_REPO))
 

	
 
    def test_create_trivial(self):
 
        self.log_user()
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
                                 {'org_repo': HG_REPO,
 
                                  'org_ref': 'branch:default:default',
 
                                  'other_repo': HG_REPO,
 
                                  'other_ref': 'branch:default:default',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_authentication_token': self.authentication_token(),
 
                                 }
 
                                )
 
        self.assertEqual(response.status, '302 Found')
 
        response = response.follow()
 
        self.assertEqual(response.status, '200 OK')
 
        response.mustcontain('This pull request has already been merged to default.')
 

	
 
    def test_create_with_existing_reviewer(self):
 
        self.log_user()
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
                                 {'org_repo': HG_REPO,
 
                                  'org_ref': 'branch:default:default',
 
                                  'other_repo': HG_REPO,
 
                                  'other_ref': 'branch:default:default',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_authentication_token': self.authentication_token(),
 
                                  'review_members': TEST_USER_ADMIN_LOGIN,
 
                                 }
 
                                )
 
        self.assertEqual(response.status, '302 Found')
 
        response = response.follow()
 
        self.assertEqual(response.status, '200 OK')
 
        response.mustcontain('This pull request has already been merged to default.')
 

	
 
    def test_create_with_invalid_reviewer(self):
 
        invalid_user_name = 'invalid_user'
 
        self.log_user()
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
                                 {
 
                                  'org_repo': HG_REPO,
 
                                  'org_ref': 'branch:default:default',
 
                                  'other_repo': HG_REPO,
 
                                  'other_ref': 'branch:default:default',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_authentication_token': self.authentication_token(),
 
                                  'review_members': invalid_user_name,
 
                                 },
 
                                 status=400)
 
        response.mustcontain('Invalid reviewer &#34;%s&#34; specified' % invalid_user_name)
 

	
 
    def test_update_with_invalid_reviewer(self):
 
        invalid_user_id = 99999
 
        self.log_user()
 
        # create a valid pull request
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
                                 {
 
                                  'org_repo': HG_REPO,
 
                                  'org_ref': 'branch:default:default',
 
                                  'other_repo': HG_REPO,
 
                                  'other_ref': 'branch:default:default',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_authentication_token': self.authentication_token(),
 
                                 }
 
                                )
 
        self.assertEqual(response.status, '302 Found')
 
        # location is of the form:
 
        # http://localhost/vcs_test_hg/pull-request/54/_/title
 
        m = re.search('/pull-request/(\d+)/', response.location)
 
        self.assertNotEqual(m, None)
 
        pull_request_id = m.group(1)
 

	
 
        # update it
 
        response = self.app.post(url(controller='pullrequests', action='post',
 
                                     repo_name=HG_REPO, pull_request_id=pull_request_id),
 
                                 {
 
                                  'updaterev': 'default',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  'owner': TEST_USER_ADMIN_LOGIN,
 
                                  '_authentication_token': self.authentication_token(),
 
                                  'review_members': invalid_user_id,
 
                                 },
 
                                 status=400)
 
        response.mustcontain('Invalid reviewer &#34;%s&#34; specified' % invalid_user_id)
 

	
 
    def test_edit_with_invalid_reviewer(self):
 
        invalid_user_id = 99999
 
        self.log_user()
 
        # create a valid pull request
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
                                 {
 
                                  'org_repo': HG_REPO,
 
                                  'org_ref': 'branch:default:default',
 
                                  'other_repo': HG_REPO,
 
                                  'other_ref': 'branch:default:default',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_authentication_token': self.authentication_token(),
 
                                 }
 
                                )
 
        self.assertEqual(response.status, '302 Found')
 
        # location is of the form:
 
        # http://localhost/vcs_test_hg/pull-request/54/_/title
 
        m = re.search('/pull-request/(\d+)/', response.location)
 
        self.assertNotEqual(m, None)
 
        pull_request_id = m.group(1)
 

	
 
        # edit it
 
        response = self.app.post(url(controller='pullrequests', action='post',
 
                                     repo_name=HG_REPO, pull_request_id=pull_request_id),
 
                                 {
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  'owner': TEST_USER_ADMIN_LOGIN,
 
                                  '_authentication_token': self.authentication_token(),
 
                                  'review_members': invalid_user_id,
 
                                 },
 
                                 status=400)
 
        response.mustcontain('Invalid reviewer &#34;%s&#34; specified' % invalid_user_id)
 

	
 
class TestPullrequestsGetRepoRefs(TestController):
 

	
 
    def setUp(self):
 
        self.main = fixture.create_repo(u'main', repo_type='hg')
 
        Session.add(self.main)
 
        Session.commit()
 
        self.c = PullrequestsController()
 

	
 
    def tearDown(self):
 
        fixture.destroy_repo(u'main')
 
        Session.commit()
 
        Session.remove()
 

	
 
    def test_repo_refs_empty_repo(self):
 
        # empty repo with no commits, no branches, no bookmarks, just one tag
 
        refs, default = self.c._get_repo_refs(self.main.scm_instance)
 
        self.assertEqual(default, 'tag:null:0000000000000000000000000000000000000000')
 

	
 
    def test_repo_refs_one_commit_no_hints(self):
 
        cs0 = fixture.commit_change(self.main.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.main.scm_instance)
 
        self.assertEqual(default, 'branch:default:%s' % cs0.raw_id)
 
        self.assertIn(([('branch:default:%s' % cs0.raw_id, 'default (current tip)')],
 
                'Branches'), refs)
 

	
 
    def test_repo_refs_one_commit_rev_hint(self):
 
        cs0 = fixture.commit_change(self.main.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.main.scm_instance, rev=cs0.raw_id)
 
        expected = 'branch:default:%s' % cs0.raw_id
 
        self.assertEqual(default, expected)
 
        self.assertIn(([(expected, 'default (current tip)')], 'Branches'), refs)
 

	
 
    def test_repo_refs_two_commits_no_hints(self):
 
        cs0 = fixture.commit_change(self.main.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
        cs1 = fixture.commit_change(self.main.repo_name, filename='file2',
 
                content='line2\n', message='commit2', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.main.scm_instance)
 
        expected = 'branch:default:%s' % cs1.raw_id
 
        self.assertEqual(default, expected)
 
        self.assertIn(([(expected, 'default (current tip)')], 'Branches'), refs)
 

	
 
    def test_repo_refs_two_commits_rev_hints(self):
 
        cs0 = fixture.commit_change(self.main.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
        cs1 = fixture.commit_change(self.main.repo_name, filename='file2',
 
                content='line2\n', message='commit2', vcs_type='hg',
 
                parent=None, newfile=True)
kallithea/tests/functional/test_repo_groups.py
Show inline comments
 
from kallithea.tests import *
 

	
 

	
 
class TestRepoGroupsController(TestController):
 
class TestRepoGroupsController(TestControllerPytest):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('repos_groups'))
 
        response.mustcontain('{"totalRecords": 0, "sort": null, "startIndex": 0, "dir": "asc", "records": []};')
 

	
 
#    def test_create(self):
 
#        response = self.app.post(url('repos_groups'))
 

	
 
    def test_new(self):
 
        self.log_user()
 
        response = self.app.get(url('new_repos_group'))
 

	
 
    def test_new_by_regular_user(self):
 
        self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
 
        response = self.app.get(url('new_repos_group'), status=403)
 
#
 
#    def test_update(self):
 
#        response = self.app.put(url('repos_group', group_name=1))
 
#
 
#    def test_delete(self):
 
#        self.log_user()
 
#        response = self.app.delete(url('repos_group', group_name=1))
 
#
 
#    def test_show(self):
 
#        response = self.app.get(url('repos_group', group_name=1))
 
#
 
#    def test_edit(self):
 
#        response = self.app.get(url('edit_repo_group', group_name=1))
kallithea/tests/functional/test_summary.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.model.db import Repository
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.meta import Session
 
from kallithea.model.scm import ScmModel
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestSummaryController(TestController):
 
class TestSummaryController(TestControllerPytest):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(HG_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name=HG_REPO))
 

	
 
        #repo type
 
        response.mustcontain(
 
            """<span class="repotag">hg"""
 
        )
 
        #public/private
 
        response.mustcontain(
 
            """<i class="icon-globe">"""
 
        )
 

	
 
        # clone url...
 
        response.mustcontain('''id="clone_url" readonly="readonly" value="http://%s@localhost:80/%s"''' % (TEST_USER_ADMIN_LOGIN, HG_REPO))
 
        response.mustcontain('''id="clone_url_id" readonly="readonly" value="http://%s@localhost:80/_%s"''' % (TEST_USER_ADMIN_LOGIN, ID))
 

	
 
    def test_index_git(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(GIT_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name=GIT_REPO))
 

	
 
        #repo type
 
        response.mustcontain(
 
            """<span class="repotag">git"""
 
        )
 
        #public/private
 
        response.mustcontain(
 
            """<i class="icon-globe">"""
 
        )
 

	
 
        # clone url...
 
        response.mustcontain('''id="clone_url" readonly="readonly" value="http://%s@localhost:80/%s"''' % (TEST_USER_ADMIN_LOGIN, GIT_REPO))
 
        response.mustcontain('''id="clone_url_id" readonly="readonly" value="http://%s@localhost:80/_%s"''' % (TEST_USER_ADMIN_LOGIN, ID))
 

	
 
    def test_index_by_id_hg(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(HG_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name='_%s' % ID))
 

	
 
        #repo type
 
        response.mustcontain(
 
            """<span class="repotag">hg"""
 
        )
 
        #public/private
 
        response.mustcontain(
 
            """<i class="icon-globe">"""
 
        )
 

	
 
    def test_index_by_repo_having_id_path_in_name_hg(self):
 
        self.log_user()
 
        fixture.create_repo(name=u'repo_1')
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name='repo_1'))
 

	
 
        try:
 
            response.mustcontain("repo_1")
 
        finally:
 
            RepoModel().delete(Repository.get_by_repo_name(u'repo_1'))
 
            Session().commit()
 

	
 
    def test_index_by_id_git(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(GIT_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name='_%s' % ID))
 

	
 
        #repo type
 
        response.mustcontain(
 
            """<span class="repotag">git"""
 
        )
 
        #public/private
 
        response.mustcontain(
 
            """<i class="icon-globe">"""
 
        )
 

	
 
    def _enable_stats(self, repo):
 
        r = Repository.get_by_repo_name(repo)
 
        r.enable_statistics = True
 
        Session().add(r)
 
        Session().commit()
 

	
 
    def test_index_trending(self):
 
        self.log_user()
 
        #codes stats
 
        self._enable_stats(HG_REPO)
 

	
 
        ScmModel().mark_for_invalidation(HG_REPO)
 
        # generate statistics first
 
        response = self.app.get(url(controller='summary', action='statistics',
 
                                    repo_name=HG_REPO))
 
        response = self.app.get(url(controller='summary', action='index',
 
                                    repo_name=HG_REPO))
 
        response.mustcontain(
 
            '[["py", {"count": 68, "desc": ["Python"]}], '
 
            '["rst", {"count": 16, "desc": ["Rst"]}], '
 
            '["css", {"count": 2, "desc": ["Css"]}], '
 
            '["sh", {"count": 2, "desc": ["Bash"]}], '
 
            '["yml", {"count": 1, "desc": ["Yaml"]}], '
 
            '["makefile", {"count": 1, "desc": ["Makefile", "Makefile"]}], '
 
            '["js", {"count": 1, "desc": ["Javascript"]}], '
 
            '["cfg", {"count": 1, "desc": ["Ini"]}], '
 
            '["ini", {"count": 1, "desc": ["Ini"]}], '
 
            '["html", {"count": 1, "desc": ["EvoqueHtml", "Html"]}]];'
 
        )
 

	
 
    def test_index_statistics(self):
 
        self.log_user()
 
        #codes stats
 
        self._enable_stats(HG_REPO)
 

	
 
        ScmModel().mark_for_invalidation(HG_REPO)
 
        response = self.app.get(url(controller='summary', action='statistics',
 
                                    repo_name=HG_REPO))
 

	
 
    def test_index_trending_git(self):
 
        self.log_user()
 
        #codes stats
 
        self._enable_stats(GIT_REPO)
 

	
 
        ScmModel().mark_for_invalidation(GIT_REPO)
 
        # generate statistics first
 
        response = self.app.get(url(controller='summary', action='statistics',
 
                                    repo_name=GIT_REPO))
 
        response = self.app.get(url(controller='summary', action='index',
 
                                    repo_name=GIT_REPO))
 
        response.mustcontain(
 
            '[["py", {"count": 68, "desc": ["Python"]}], '
 
            '["rst", {"count": 16, "desc": ["Rst"]}], '
 
            '["css", {"count": 2, "desc": ["Css"]}], '
 
            '["sh", {"count": 2, "desc": ["Bash"]}], '
 
            '["makefile", {"count": 1, "desc": ["Makefile", "Makefile"]}], '
 
            '["js", {"count": 1, "desc": ["Javascript"]}], '
 
            '["cfg", {"count": 1, "desc": ["Ini"]}], '
 
            '["ini", {"count": 1, "desc": ["Ini"]}], '
 
            '["html", {"count": 1, "desc": ["EvoqueHtml", "Html"]}], '
 
            '["bat", {"count": 1, "desc": ["Batch"]}]];'
 
        )
 

	
 
    def test_index_statistics_git(self):
 
        self.log_user()
 
        #codes stats
 
        self._enable_stats(GIT_REPO)
 

	
 
        ScmModel().mark_for_invalidation(GIT_REPO)
 
        response = self.app.get(url(controller='summary', action='statistics',
 
                                    repo_name=GIT_REPO))
kallithea/tests/functional/test_tags.py
Show inline comments
 
from kallithea.tests import *
 

	
 

	
 
class TestTagsController(TestController):
 
class TestTagsController(TestControllerPytest):
 

	
 
    def test_index_hg(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='tags', action='index', repo_name=HG_REPO))
 
        response.mustcontain("""<a href="/%s/changeset/96507bd11ecc815ebc6270fdf6db110928c09c1e">tip</a>""" % HG_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/2c96c02def9a7c997f33047761a53943e6254396">v0.2.0</a>""" % HG_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/fef5bfe1dc17611d5fb59a7f6f95c55c3606f933">v0.1.11</a>""" % HG_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/92831aebf2f8dd4879e897024b89d09af214df1c">v0.1.10</a>""" % HG_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/8680b1d1cee3aa3c1ab3734b76ee164bbedbc5c9">v0.1.9</a>""" % HG_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/ecb25ba9c96faf1e65a0bc3fd914918420a2f116">v0.1.8</a>""" % HG_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/f67633a2894edaf28513706d558205fa93df9209">v0.1.7</a>""" % HG_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/02b38c0eb6f982174750c0e309ff9faddc0c7e12">v0.1.6</a>""" % HG_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/a6664e18181c6fc81b751a8d01474e7e1a3fe7fc">v0.1.5</a>""" % HG_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/fd4bdb5e9b2a29b4393a4ac6caef48c17ee1a200">v0.1.4</a>""" % HG_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/17544fbfcd33ffb439e2b728b5d526b1ef30bfcf">v0.1.3</a>""" % HG_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/a7e60bff65d57ac3a1a1ce3b12a70f8a9e8a7720">v0.1.2</a>""" % HG_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/eb3a60fc964309c1a318b8dfe26aa2d1586c85ae">v0.1.1</a>""" % HG_REPO)
 

	
 
    def test_index_git(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='tags', action='index', repo_name=GIT_REPO))
 

	
 
        response.mustcontain("""<a href="/%s/changeset/137fea89f304a42321d40488091ee2ed419a3686">v0.2.2</a>""" % GIT_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/5051d0fa344d4408a2659d9a0348eb2d41868ecf">v0.2.1</a>""" % GIT_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/599ba911aa24d2981225f3966eb659dfae9e9f30">v0.2.0</a>""" % GIT_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/c60f01b77c42dce653d6b1d3b04689862c261929">v0.1.11</a>""" % GIT_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/10cddef6b794696066fb346434014f0a56810218">v0.1.10</a>""" % GIT_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/341d28f0eec5ddf0b6b77871e13c2bbd6bec685c">v0.1.9</a>""" % GIT_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/74ebce002c088b8a5ecf40073db09375515ecd68">v0.1.8</a>""" % GIT_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/4d78bf73b5c22c82b68f902f138f7881b4fffa2c">v0.1.7</a>""" % GIT_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/0205cb3f44223fb3099d12a77a69c81b798772d9">v0.1.6</a>""" % GIT_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/6c0ce52b229aa978889e91b38777f800e85f330b">v0.1.5</a>""" % GIT_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/7d735150934cd7645ac3051903add952390324a5">v0.1.4</a>""" % GIT_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/5a3a8fb005554692b16e21dee62bf02667d8dc3e">v0.1.3</a>""" % GIT_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/0ba5f8a4660034ff25c0cac2a5baabf5d2791d63">v0.1.2</a>""" % GIT_REPO)
 
        response.mustcontain("""<a href="/%s/changeset/e6ea6d16e2f26250124a1f4b4fe37a912f9d86a0">v0.1.1</a>""" % GIT_REPO)
0 comments (0 inline, 0 general)