Changeset - 63e49418a4cc
[Not reviewed]
beta
0 7 0
Marcin Kuzminski - 13 years ago 2013-03-31 21:44:27
marcin@python-works.com
Use only mustcontain for testing response body
7 files changed with 31 insertions and 25 deletions:
0 comments (0 inline, 0 general)
rhodecode/tests/__init__.py
Show inline comments
 
"""Pylons application test package
 

	
 
This package assumes the Pylons environment is already loaded, such as
 
when this script is imported from the `nosetests --with-pylons=test.ini`
 
command.
 

	
 
This module initializes the application via ``websetup`` (`paster
 
setup-app`) and provides the base testing objects.
 

	
 
nosetests -x - fail on first error
 
nosetests rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
 
nosetests --pdb --pdb-failures
 
nosetests --with-coverage --cover-package=rhodecode.model.validators rhodecode.tests.test_validators
 

	
 
optional FLAGS:
 
    RC_WHOOSH_TEST_DISABLE=1 - skip whoosh index building and tests
 
    RC_NO_TMP_PATH=1 - disable new temp path for tests, used mostly for test_vcs_operations
 

	
 
"""
 
import os
 
import time
 
import logging
 
import datetime
 
import hashlib
 
import tempfile
 
from os.path import join as jn
 

	
 
from unittest import TestCase
 
from tempfile import _RandomNameSequence
 

	
 
from paste.deploy import loadapp
 
from paste.script.appinstall import SetupCommand
 
from pylons import config, url
 
from routes.util import URLGenerator
 
from webtest import TestApp
 
from nose.plugins.skip import SkipTest
 

	
 
from rhodecode import is_windows
 
from rhodecode.model.meta import Session
 
from rhodecode.model.db import User
 
from rhodecode.tests.nose_parametrized import parameterized
 

	
 
import pylons.test
 
from rhodecode.lib.utils2 import safe_unicode, safe_str
 

	
 

	
 
os.environ['TZ'] = 'UTC'
 
if not is_windows:
 
    time.tzset()
 

	
 
log = logging.getLogger(__name__)
 

	
 
__all__ = [
 
    'parameterized', 'environ', 'url', 'get_new_dir', 'TestController',
 
    'SkipTest',
 
    'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
 
    'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
 
    'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
 
    'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
 
    'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
 
    'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
 
    'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
 
    'GIT_REMOTE_REPO', 'SCM_TESTS', '_get_repo_create_params',
 
    '_get_group_create_params'
 
]
 

	
 
# Invoke websetup with the current config file
 
# SetupCommand('setup-app').run([config_file])
 

	
 
environ = {}
 

	
 
#SOME GLOBALS FOR TESTS
 

	
 
TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
 
TEST_USER_ADMIN_LOGIN = 'test_admin'
 
TEST_USER_ADMIN_PASS = 'test12'
 
TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
 

	
 
TEST_USER_REGULAR_LOGIN = 'test_regular'
 
TEST_USER_REGULAR_PASS = 'test12'
 
TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
 

	
 
TEST_USER_REGULAR2_LOGIN = 'test_regular2'
 
TEST_USER_REGULAR2_PASS = 'test12'
 
TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
 

	
 
HG_REPO = 'vcs_test_hg'
 
GIT_REPO = 'vcs_test_git'
 

	
 
NEW_HG_REPO = 'vcs_test_hg_new'
 
NEW_GIT_REPO = 'vcs_test_git_new'
 

	
 
HG_FORK = 'vcs_test_hg_fork'
 
GIT_FORK = 'vcs_test_git_fork'
 

	
 
## VCS
 
SCM_TESTS = ['hg', 'git']
 
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 

	
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 

	
 
TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
 
TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
 

	
 

	
 
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 

	
 
TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 
TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
 
TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
 

	
 
TEST_DIR = tempfile.gettempdir()
 
TEST_REPO_PREFIX = 'vcs-test'
 

	
 
# cached repos if any !
 
# comment out to get some other repos from bb or github
 
GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 

	
 

	
 
def get_new_dir(title):
 
    """
 
    Returns always new directory path.
 
    """
 
    from rhodecode.tests.vcs.utils import get_normalized_path
 
    name = TEST_REPO_PREFIX
 
    if title:
 
        name = '-'.join((name, title))
 
    hex = hashlib.sha1(str(time.time())).hexdigest()
 
    name = '-'.join((name, hex))
 
    path = os.path.join(TEST_DIR, name)
 
    return get_normalized_path(path)
 

	
 

	
 
class TestController(TestCase):
 

	
 
    def __init__(self, *args, **kwargs):
 
        wsgiapp = pylons.test.pylonsapp
 
        config = wsgiapp.config
 

	
 
        self.app = TestApp(wsgiapp)
 
        url._push_object(URLGenerator(config['routes.map'], environ))
 
        self.Session = Session
 
        self.index_location = config['app_conf']['index_dir']
 
        TestCase.__init__(self, *args, **kwargs)
 

	
 
    def log_user(self, username=TEST_USER_ADMIN_LOGIN,
 
                 password=TEST_USER_ADMIN_PASS):
 
        self._logged_username = username
 
        response = self.app.post(url(controller='login', action='index'),
rhodecode/tests/functional/test_admin_ldap_settings.py
Show inline comments
 
from rhodecode.tests import *
 
from rhodecode.model.db import RhodeCodeSetting
 
from nose.plugins.skip import SkipTest
 

	
 
skip_ldap_test = False
 
try:
 
    import ldap
 
except ImportError:
 
    # means that python-ldap is not installed
 
    skip_ldap_test = True
 
    pass
 

	
 

	
 
class TestLdapSettingsController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='admin/ldap_settings',
 
                                    action='index'))
 
        self.assertTrue('LDAP administration' in response.body)
 
        response.mustcontain('LDAP administration')
 

	
 
    def test_ldap_save_settings(self):
 
        self.log_user()
 
        if skip_ldap_test:
 
            raise SkipTest('skipping due to missing ldap lib')
 

	
 
        test_url = url(controller='admin/ldap_settings',
 
                       action='ldap_settings')
 

	
 
        response = self.app.post(url=test_url,
 
            params={'ldap_host' : u'dc.example.com',
 
                    'ldap_port' : '999',
 
                    'ldap_tls_kind' : 'PLAIN',
 
                    'ldap_tls_reqcert' : 'NEVER',
 
                    'ldap_dn_user':'test_user',
 
                    'ldap_dn_pass':'test_pass',
 
                    'ldap_base_dn':'test_base_dn',
 
                    'ldap_filter':'test_filter',
 
                    'ldap_search_scope':'BASE',
 
                    'ldap_attr_login':'test_attr_login',
 
                    'ldap_attr_firstname':'ima',
 
                    'ldap_attr_lastname':'tester',
 
                    'ldap_attr_email':'test@example.com' })
 

	
 
        new_settings = RhodeCodeSetting.get_ldap_settings()
 
        self.assertEqual(new_settings['ldap_host'], u'dc.example.com',
 
                         'fail db write compare')
 

	
 
        self.checkSessionFlash(response,
 
                               'LDAP settings updated successfully')
 

	
 
    def test_ldap_error_form(self):
 
        self.log_user()
 
        if skip_ldap_test:
 
            raise SkipTest('skipping due to missing ldap lib')
 

	
 
        test_url = url(controller='admin/ldap_settings',
 
                       action='ldap_settings')
 

	
 
        response = self.app.post(url=test_url,
 
            params={'ldap_host' : '',
 
                    'ldap_port' : 'i-should-be-number',
 
                    'ldap_tls_kind' : 'PLAIN',
 
                    'ldap_tls_reqcert' : 'NEVER',
 
                    'ldap_dn_user':'',
 
                    'ldap_dn_pass':'',
 
                    'ldap_base_dn':'',
 
                    'ldap_filter':'',
 
                    'ldap_search_scope':'BASE',
 
                    'ldap_attr_login':'', #  <----- missing required input
 
                    'ldap_attr_firstname':'',
 
                    'ldap_attr_lastname':'',
 
                    'ldap_attr_email':'' })
 

	
 
        self.assertTrue("""<span class="error-message">The LDAP Login"""
 
                        """ attribute of the CN must be specified""" in
 
                        response.body)
 
        response.mustcontain("""<span class="error-message">The LDAP Login"""
 
                             """ attribute of the CN must be specified""")
 

	
 

	
 

	
 
        self.assertTrue("""<span class="error-message">Please """
 
                        """enter a number</span>""" in response.body)
 
        response.mustcontain("""<span class="error-message">Please """
 
                             """enter a number</span>""")
 

	
 
    def test_ldap_login(self):
 
        pass
 

	
 
    def test_ldap_login_incorrect(self):
 
        pass
rhodecode/tests/functional/test_admin_notifications.py
Show inline comments
 
from rhodecode.tests import *
 
from rhodecode.model.db import Notification, User
 

	
 
from rhodecode.model.user import UserModel
 
from rhodecode.model.notification import NotificationModel
 

	
 

	
 
class TestNotificationsController(TestController):
 

	
 
    def tearDown(self):
 
        for n in Notification.query().all():
 
            inst = Notification.get(n.notification_id)
 
            self.Session().delete(inst)
 
        self.Session().commit()
 

	
 
    def test_index(self):
 
        self.log_user()
 

	
 
        u1 = UserModel().create_or_update(username='u1', password='qweqwe',
 
                                               email='u1@rhodecode.org',
 
                                               firstname='u1', lastname='u1')
 
        u1 = u1.user_id
 

	
 
        response = self.app.get(url('notifications'))
 
        self.assertTrue('''<div class="table">No notifications here yet</div>'''
 
                        in response.body)
 
        response.mustcontain('<div class="table">No notifications here yet</div>')
 

	
 
        cur_user = self._get_logged_user()
 

	
 
        NotificationModel().create(created_by=u1, subject=u'test_notification_1',
 
                                   body=u'notification_1',
 
                                   recipients=[cur_user])
 
        self.Session().commit()
 
        response = self.app.get(url('notifications'))
 
        self.assertTrue(u'test_notification_1' in response.body)
 
        response.mustcontain(u'test_notification_1')
 

	
 
#    def test_index_as_xml(self):
 
#        response = self.app.get(url('formatted_notifications', format='xml'))
 
#
 
#    def test_create(self):
 
#        response = self.app.post(url('notifications'))
 
#
 
#    def test_new(self):
 
#        response = self.app.get(url('new_notification'))
 
#
 
#    def test_new_as_xml(self):
 
#        response = self.app.get(url('formatted_new_notification', format='xml'))
 
#
 
#    def test_update(self):
 
#        response = self.app.put(url('notification', notification_id=1))
 
#
 
#    def test_update_browser_fakeout(self):
 
#        response = self.app.post(url('notification', notification_id=1), params=dict(_method='put'))
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 

	
 
        u1 = UserModel().create_or_update(username='u1', password='qweqwe',
 
                                               email='u1@rhodecode.org',
 
                                               firstname='u1', lastname='u1')
 
        u2 = UserModel().create_or_update(username='u2', password='qweqwe',
 
                                               email='u2@rhodecode.org',
 
                                               firstname='u2', lastname='u2')
 

	
 
        # make notifications
 
        notification = NotificationModel().create(created_by=cur_user,
 
                                                  subject=u'test',
 
                                                  body=u'hi there',
 
                                                  recipients=[cur_user, u1, u2])
 
        self.Session().commit()
 
        u1 = User.get(u1.user_id)
 
        u2 = User.get(u2.user_id)
 

	
 
        # check DB
 
        get_notif = lambda un: [x.notification for x in un]
 
        self.assertEqual(get_notif(cur_user.notifications), [notification])
 
        self.assertEqual(get_notif(u1.notifications), [notification])
 
        self.assertEqual(get_notif(u2.notifications), [notification])
 
        cur_usr_id = cur_user.user_id
 

	
 
        response = self.app.delete(url('notification',
 
                                       notification_id=
 
                                       notification.notification_id))
 
        self.assertEqual(response.body, 'ok')
 

	
 
        cur_user = User.get(cur_usr_id)
 
        self.assertEqual(cur_user.notifications, [])
 

	
 
    def test_show(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 
        u1 = UserModel().create_or_update(username='u1', password='qweqwe',
 
                                               email='u1@rhodecode.org',
 
                                               firstname='u1', lastname='u1')
 
        u2 = UserModel().create_or_update(username='u2', password='qweqwe',
 
                                               email='u2@rhodecode.org',
 
                                               firstname='u2', lastname='u2')
 

	
 
        notification = NotificationModel().create(created_by=cur_user,
 
                                                  subject=u'test',
 
                                                  body=u'hi there',
 
                                                  recipients=[cur_user, u1, u2])
 

	
 
        response = self.app.get(url('notification',
 
                                    notification_id=notification.notification_id))
rhodecode/tests/functional/test_admin_settings.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
from rhodecode.lib.auth import get_crypt_password, check_password
 
from rhodecode.model.db import User, RhodeCodeSetting, Repository
 
from rhodecode.tests import *
 
from rhodecode.lib import helpers as h
 
from rhodecode.model.user import UserModel
 
from rhodecode.model.scm import ScmModel
 

	
 

	
 
class TestAdminSettingsController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url('admin_settings'))
 
        # Test response...
 

	
 
    def test_index_as_xml(self):
 
        response = self.app.get(url('formatted_admin_settings', format='xml'))
 

	
 
    def test_create(self):
 
        response = self.app.post(url('admin_settings'))
 

	
 
    def test_new(self):
 
        response = self.app.get(url('admin_new_setting'))
 

	
 
    def test_new_as_xml(self):
 
        response = self.app.get(url('formatted_admin_new_setting', format='xml'))
 

	
 
    def test_update(self):
 
        response = self.app.put(url('admin_setting', setting_id=1))
 

	
 
    def test_update_browser_fakeout(self):
 
        response = self.app.post(url('admin_setting', setting_id=1), params=dict(_method='put'))
 

	
 
    def test_delete(self):
 
        response = self.app.delete(url('admin_setting', setting_id=1))
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('admin_setting', setting_id=1), params=dict(_method='delete'))
 

	
 
    def test_show(self):
 
        response = self.app.get(url('admin_setting', setting_id=1))
 

	
 
    def test_show_as_xml(self):
 
        response = self.app.get(url('formatted_admin_setting', setting_id=1, format='xml'))
 

	
 
    def test_edit(self):
 
        response = self.app.get(url('admin_edit_setting', setting_id=1))
 

	
 
    def test_edit_as_xml(self):
 
        response = self.app.get(url('formatted_admin_edit_setting',
 
                                    setting_id=1, format='xml'))
 

	
 
    def test_ga_code_active(self):
 
        self.log_user()
 
        old_title = 'RhodeCode'
 
        old_realm = 'RhodeCode authentication'
 
        new_ga_code = 'ga-test-123456789'
 
        response = self.app.post(url('admin_setting', setting_id='global'),
 
                                     params=dict(
 
                                                 _method='put',
 
                                                 rhodecode_title=old_title,
 
                                                 rhodecode_realm=old_realm,
 
                                                 rhodecode_ga_code=new_ga_code
 
                                                 ))
 

	
 
        self.checkSessionFlash(response, 'Updated application settings')
 

	
 
        self.assertEqual(RhodeCodeSetting
 
                         .get_app_settings()['rhodecode_ga_code'], new_ga_code)
 

	
 
        response = response.follow()
 
        response.mustcontain("""_gaq.push(['_setAccount', '%s']);""" % new_ga_code)
 

	
 
    def test_ga_code_inactive(self):
 
        self.log_user()
 
        old_title = 'RhodeCode'
 
        old_realm = 'RhodeCode authentication'
 
        new_ga_code = ''
 
        response = self.app.post(url('admin_setting', setting_id='global'),
 
                                     params=dict(
 
                                                 _method='put',
 
                                                 rhodecode_title=old_title,
 
                                                 rhodecode_realm=old_realm,
 
                                                 rhodecode_ga_code=new_ga_code
 
                                                 ))
 

	
 
        self.checkSessionFlash(response, 'Updated application settings')
 
        self.assertEqual(RhodeCodeSetting
 
                        .get_app_settings()['rhodecode_ga_code'], new_ga_code)
 

	
 
        response = response.follow()
 
        self.assertFalse("""_gaq.push(['_setAccount', '%s']);""" % new_ga_code
 
                         in response.body)
 
        response.mustcontain(no=["_gaq.push(['_setAccount', '%s']);" % new_ga_code])
 

	
 
    def test_title_change(self):
 
        self.log_user()
 
        old_title = 'RhodeCode'
 
        new_title = old_title + '_changed'
 
        old_realm = 'RhodeCode authentication'
 

	
 
        for new_title in ['Changed', 'Żółwik', old_title]:
 
            response = self.app.post(url('admin_setting', setting_id='global'),
 
                                         params=dict(
 
                                                     _method='put',
 
                                                     rhodecode_title=new_title,
 
                                                     rhodecode_realm=old_realm,
 
                                                     rhodecode_ga_code=''
 
                                                     ))
 

	
 
            self.checkSessionFlash(response, 'Updated application settings')
 
            self.assertEqual(RhodeCodeSetting
 
                             .get_app_settings()['rhodecode_title'],
 
                             new_title.decode('utf-8'))
 

	
 
            response = response.follow()
 
            response.mustcontain("""<h1><a href="/">%s</a></h1>""" % new_title)
 

	
 
    def test_my_account(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_my_account'))
 

	
 
        self.assertTrue('value="test_admin' in response.body)
 
        response.mustcontain('value="test_admin')
 

	
 
    @parameterized.expand([('firstname', 'new_username'),
 
                           ('lastname', 'new_username'),
 
                           ('admin', True),
 
                           ('admin', False),
 
                           ('ldap_dn', 'test'),
 
                           ('ldap_dn', None),
 
                           ('active', False),
 
                           ('active', True),
 
                           ('email', 'some@email.com'),
 
                           ])
 
    def test_my_account_update(self, name, expected):
 
        uname = 'testme'
 
        usr = UserModel().create_or_update(username=uname, password='qweqwe',
 
                                           email='testme@rhodecod.org')
 
        self.Session().commit()
 
        params = usr.get_api_data()
 
        user_id = usr.user_id
 
        self.log_user(username=uname, password='qweqwe')
 
        params.update({name: expected})
 
        params.update({'password_confirmation': ''})
 
        params.update({'new_password': ''})
 

	
 
        try:
 
            response = self.app.put(url('admin_settings_my_account_update',
 
                                        id=user_id), params)
 

	
 
            self.checkSessionFlash(response,
 
                                   'Your account was updated successfully')
 

	
 
            updated_user = User.get_by_username(uname)
 
            updated_params = updated_user.get_api_data()
 
            updated_params.update({'password_confirmation': ''})
 
            updated_params.update({'new_password': ''})
 

	
 
            params['last_login'] = updated_params['last_login']
 
            if name == 'email':
 
                params['emails'] = [expected]
 
            if name == 'ldap_dn':
 
                #cannot update this via form
 
                params['ldap_dn'] = None
 
            if name == 'active':
 
                #my account cannot deactivate account
 
                params['active'] = True
 
            if name == 'admin':
 
                #my account cannot make you an admin !
 
                params['admin'] = False
 

	
 
            self.assertEqual(params, updated_params)
 

	
 
        finally:
 
            UserModel().delete('testme')
 

	
 
    def test_my_account_update_err_email_exists(self):
 
        self.log_user()
 

	
 
        new_email = 'test_regular@mail.com'  # already exisitn email
 
        response = self.app.put(url('admin_settings_my_account_update'),
 
                                params=dict(
 
                                    username='test_admin',
 
                                    new_password='test12',
 
                                    password_confirmation='test122',
 
                                    firstname='NewName',
 
                                    lastname='NewLastname',
 
                                    email=new_email,)
 
                                )
 

	
 
        response.mustcontain('This e-mail address is already taken')
 

	
 
    def test_my_account_update_err(self):
 
        self.log_user('test_regular2', 'test12')
 

	
 
        new_email = 'newmail.pl'
 
        response = self.app.post(url('admin_settings_my_account_update'),
 
                                 params=dict(
 
                                            _method='put',
 
                                            username='test_admin',
 
                                            new_password='test12',
 
                                            password_confirmation='test122',
 
                                            firstname='NewName',
 
                                            lastname='NewLastname',
 
                                            email=new_email,)
 
                                 )
 

	
 
        response.mustcontain('An email address must contain a single @')
 
        from rhodecode.model import validators
 
        msg = validators.ValidUsername(edit=False,
 
                                    old_data={})._messages['username_exists']
 
        msg = h.html_escape(msg % {'username': 'test_admin'})
 
        response.mustcontain(u"%s" % msg)
 

	
 
    def test_set_repo_fork_has_no_self_id(self):
 
        self.log_user()
 
        repo = Repository.get_by_repo_name(HG_REPO)
 
        response = self.app.get(url('edit_repo', repo_name=HG_REPO))
 
        opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id
 
        assert opt not in response.body
 
        response.mustcontain(no=[opt])
 

	
 
    def test_set_fork_of_repo(self):
 
        self.log_user()
 
        repo = Repository.get_by_repo_name(HG_REPO)
 
        repo2 = Repository.get_by_repo_name(GIT_REPO)
 
        response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
 
                                 params=dict(
 
                                    id_fork_of=repo2.repo_id
 
                                 ))
 
        repo = Repository.get_by_repo_name(HG_REPO)
 
        repo2 = Repository.get_by_repo_name(GIT_REPO)
 
        self.checkSessionFlash(response,
 
        'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))
 

	
 
        assert repo.fork == repo2
 
        response = response.follow()
 
        # check if given repo is selected
 

	
 
        opt = """<option value="%s" selected="selected">%s</option>""" % (
 
                                                repo2.repo_id, repo2.repo_name)
 
        response.mustcontain(opt)
 

	
 
        # clean session flash
 
        #response = self.app.get(url('edit_repo', repo_name=HG_REPO))
 

	
 
        ## mark it as None
 
        response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
 
                                 params=dict(
 
                                    id_fork_of=None
 
                                 ))
 
        repo = Repository.get_by_repo_name(HG_REPO)
 
        repo2 = Repository.get_by_repo_name(GIT_REPO)
 
        self.checkSessionFlash(response,
 
        'Marked repo %s as fork of %s' % (repo.repo_name, "Nothing"))
 
        assert repo.fork == None
 

	
 
    def test_set_fork_of_same_repo(self):
 
        self.log_user()
 
        repo = Repository.get_by_repo_name(HG_REPO)
 
        response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
 
                                 params=dict(
 
                                    id_fork_of=repo.repo_id
 
                                 ))
 
        self.checkSessionFlash(response,
 
                               'An error occurred during this operation')
rhodecode/tests/functional/test_followers.py
Show inline comments
 
from rhodecode.tests import *
 

	
 

	
 
class TestFollowersController(TestController):
 

	
 
    def test_index(self):
 
    def test_index_hg(self):
 
        self.log_user()
 
        repo_name = HG_REPO
 
        response = self.app.get(url(controller='followers',
 
                                    action='followers',
 
                                    repo_name=repo_name))
 

	
 
        self.assertTrue("""test_admin""" in response.body)
 
        self.assertTrue("""Started following""" in response.body)
 
        response.mustcontain("""test_admin""")
 
        response.mustcontain("""Started following""")
 

	
 
    def test_index_git(self):
 
        self.log_user()
 
        repo_name = GIT_REPO
 
        response = self.app.get(url(controller='followers',
 
                                    action='followers',
 
                                    repo_name=repo_name))
 

	
 
        response.mustcontain("""test_admin""")
 
        response.mustcontain("""Started following""")
rhodecode/tests/functional/test_forks.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
from rhodecode.model.db import Repository
 
from rhodecode.model.repo import RepoModel
 
from rhodecode.model.user import UserModel
 
from rhodecode.model.meta import Session
 

	
 

	
 
class TestForksController(TestController):
 

	
 
    def setUp(self):
 
        self.username = u'forkuser'
 
        self.password = u'qweqwe'
 
        self.u1 = UserModel().create_or_update(
 
            username=self.username, password=self.password,
 
            email=u'fork_king@rhodecode.org', firstname=u'u1', lastname=u'u1'
 
        )
 
        Session().commit()
 

	
 
    def tearDown(self):
 
        Session().delete(self.u1)
 
        Session().commit()
 

	
 
    def test_index(self):
 
        self.log_user()
 
        repo_name = HG_REPO
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 

	
 
        self.assertTrue("""There are no forks yet""" in response.body)
 
        response.mustcontain("""There are no forks yet""")
 

	
 
    def test_no_permissions_to_fork(self):
 
        usr = self.log_user(TEST_USER_REGULAR_LOGIN,
 
                            TEST_USER_REGULAR_PASS)['user_id']
 
        user_model = UserModel()
 
        user_model.revoke_perm(usr, 'hg.fork.repository')
 
        user_model.grant_perm(usr, 'hg.fork.none')
 
        u = UserModel().get(usr)
 
        u.inherit_default_permissions = False
 
        Session().commit()
 
        # try create a fork
 
        repo_name = HG_REPO
 
        self.app.post(url(controller='forks', action='fork_create',
 
                          repo_name=repo_name), {}, status=403)
 

	
 
    def test_index_with_fork_hg(self):
 
        self.log_user()
 

	
 
        # create a fork
 
        fork_name = HG_FORK
 
        description = 'fork of vcs test'
 
        repo_name = HG_REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        response = self.app.post(url(controller='forks',
 
                                     action='fork_create',
 
                                    repo_name=repo_name),
 
                                    {'repo_name': fork_name,
 
                                     'repo_group': '',
 
                                     'fork_parent_id': org_repo.repo_id,
 
                                     'repo_type': 'hg',
 
                                     'description': description,
 
                                     'private': 'False',
 
                                     'landing_rev': 'tip'})
 

	
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 

	
 
        response.mustcontain(
 
            """<a href="/%s">%s</a>""" % (fork_name, fork_name)
 
        )
 

	
 
        #remove this fork
 
        response = self.app.delete(url('repo', repo_name=fork_name))
 

	
 
    def test_index_with_fork_git(self):
 
        self.log_user()
 

	
 
        # create a fork
 
        fork_name = GIT_FORK
 
        description = 'fork of vcs test'
 
        repo_name = GIT_REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        response = self.app.post(url(controller='forks',
 
                                     action='fork_create',
 
                                    repo_name=repo_name),
 
                                    {'repo_name': fork_name,
 
                                     'repo_group': '',
 
                                     'fork_parent_id': org_repo.repo_id,
 
                                     'repo_type': 'git',
 
                                     'description': description,
 
                                     'private': 'False',
 
                                     'landing_rev': 'tip'})
 

	
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 

	
 
        response.mustcontain(
 
            """<a href="/%s">%s</a>""" % (fork_name, fork_name)
 
        )
 

	
 
        #remove this fork
 
        response = self.app.delete(url('repo', repo_name=fork_name))
 

	
 
    def test_z_fork_create(self):
 
        self.log_user()
 
        fork_name = HG_FORK
 
        description = 'fork of vcs test'
 
        repo_name = HG_REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        response = self.app.post(url(controller='forks', action='fork_create',
 
                                    repo_name=repo_name),
 
                                    {'repo_name': fork_name,
 
                                     'repo_group':'',
 
                                     'fork_parent_id':org_repo.repo_id,
 
                                     'repo_type':'hg',
 
                                     'description':description,
 
                                     'private':'False',
 
                                     'landing_rev': 'tip'})
 

	
 
        #test if we have a message that fork is ok
 
        self.checkSessionFlash(response,
 
                'Forked repository %s as <a href="/%s">%s</a>'
 
                % (repo_name, fork_name, fork_name))
 

	
 
        #test if the fork was created in the database
 
        fork_repo = Session().query(Repository)\
 
            .filter(Repository.repo_name == fork_name).one()
 

	
 
        self.assertEqual(fork_repo.repo_name, fork_name)
 
        self.assertEqual(fork_repo.fork.repo_name, repo_name)
 

	
 
        #test if fork is visible in the list ?
 
        response = response.follow()
 

	
 
        response = self.app.get(url(controller='summary', action='index',
 
                                    repo_name=fork_name))
 

	
 
        self.assertTrue('Fork of %s' % repo_name in response.body)
 
        response.mustcontain('Fork of %s' % repo_name)
 

	
 
    def test_zz_fork_permission_page(self):
 
        usr = self.log_user(self.username, self.password)['user_id']
 
        repo_name = HG_REPO
 

	
 
        forks = Session().query(Repository)\
 
            .filter(Repository.fork_id != None)\
 
            .all()
 
        self.assertEqual(1, len(forks))
 

	
 
        # set read permissions for this
 
        RepoModel().grant_user_permission(repo=forks[0],
 
                                          user=usr,
 
                                          perm='repository.read')
 
        Session().commit()
 

	
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 

	
 
        response.mustcontain('<div style="padding:5px 3px 3px 42px;">fork of vcs test</div>')
 

	
 
    def test_zzz_fork_permission_page(self):
 
        usr = self.log_user(self.username, self.password)['user_id']
 
        repo_name = HG_REPO
 

	
 
        forks = Session().query(Repository)\
 
            .filter(Repository.fork_id != None)\
 
            .all()
 
        self.assertEqual(1, len(forks))
 

	
 
        # set none
 
        RepoModel().grant_user_permission(repo=forks[0],
 
                                          user=usr, perm='repository.none')
 
        Session().commit()
 
        # fork shouldn't be there
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 
        response.mustcontain('There are no forks yet')
rhodecode/tests/functional/test_search.py
Show inline comments
 
import os
 
from rhodecode.tests import *
 
from nose.plugins.skip import SkipTest
 

	
 

	
 
class TestSearchController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'))
 

	
 
        self.assertTrue('class="small" id="q" name="q" type="text"' in
 
                        response.body)
 
        response.mustcontain('class="small" id="q" name="q" type="text"')
 
        # Test response...
 

	
 
    def test_empty_search(self):
 
        if os.path.isdir(self.index_location):
 
            raise SkipTest('skipped due to existing index')
 
        else:
 
            self.log_user()
 
            response = self.app.get(url(controller='search', action='index'),
 
                                    {'q': HG_REPO})
 
            self.assertTrue('There is no index to search in. '
 
                            'Please run whoosh indexer' in response.body)
 
            response.mustcontain('There is no index to search in. '
 
                                 'Please run whoosh indexer')
 

	
 
    def test_normal_search(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'),
 
                                {'q': 'def repo'})
 
        response.mustcontain('39 results')
 

	
 
    def test_repo_search(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'),
 
                                {'q': 'repository:%s def test' % HG_REPO})
 

	
 
        response.mustcontain('4 results')
 

	
 
    def test_search_last(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'),
 
                                {'q': 'last:t', 'type': 'commit'})
 

	
 
        response.mustcontain('2 results')
 

	
 
    def test_search_commit_message(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'),
 
                    {'q': 'bother to ask where to fetch repo during tests',
 
                     'type': 'commit'})
 

	
 
        response.mustcontain('2 results')
 
        response.mustcontain('a00c1b6f5d7a6ae678fd553a8b81d92367f7ecf1')
 
        response.mustcontain('c6eb379775c578a95dad8ddab53f963b80894850')
 

	
 
    def test_search_commit_message_hg_repo(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index',
 
                                    repo_name=HG_REPO),
 
                    {'q': 'bother to ask where to fetch repo during tests',
 
                     'type': 'commit'})
 

	
 
        response.mustcontain('1 results')
 
        response.mustcontain('a00c1b6f5d7a6ae678fd553a8b81d92367f7ecf1')
 

	
 
    def test_search_commit_changed_file(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'),
 
                                {'q': 'changed:tests/utils.py',
 
                                 'type': 'commit'})
 

	
 
        response.mustcontain('20 results')
 

	
 
    def test_search_commit_changed_files_get_commit(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'),
 
                                {'q': 'changed:vcs/utils/lazy.py',
 
                                 'type': 'commit'})
 

	
 
        response.mustcontain('7 results')
 
        response.mustcontain('36e0fc9d2808c5022a24f49d6658330383ed8666')
 
        response.mustcontain('af182745859d779f17336241a0815d15166ae1ee')
 
        response.mustcontain('17438a11f72b93f56d0e08e7d1fa79a378578a82')
 
        response.mustcontain('33fa3223355104431402a888fa77a4e9956feb3e')
 
        response.mustcontain('d1f898326327e20524fe22417c22d71064fe54a1')
 
        response.mustcontain('fe568b4081755c12abf6ba673ba777fc02a415f3')
 
        response.mustcontain('bafe786f0d8c2ff7da5c1dcfcfa577de0b5e92f1')
 

	
 
    def test_search_commit_added_file(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'),
 
                                {'q': 'added:README.rst',
 
                                 'type': 'commit'})
 

	
 
        response.mustcontain('2 results')
 
        #HG
 
        response.mustcontain('3803844fdbd3b711175fc3da9bdacfcd6d29a6fb')
 
        #GIT
 
        response.mustcontain('ff7ca51e58c505fec0dd2491de52c622bb7a806b')
 

	
 
    def test_search_author(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'),
 
                    {'q': 'author:marcin@python-blog.com raw_id:b986218ba1c9b0d6a259fac9b050b1724ed8e545',
 
                     'type': 'commit'})
 

	
 
        response.mustcontain('1 results')
 

	
 
    def test_search_file_name(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'),
 
                    {'q': 'README.rst', 'type': 'path'})
 

	
 
        response.mustcontain('2 results')
0 comments (0 inline, 0 general)