Changeset - 8e3137064ab6
[Not reviewed]
default
0 4 0
Thomas De Schampheleire - 9 years ago 2016-11-22 09:04:45
thomas.de.schampheleire@gmail.com
tests: use test_context for tests needing internationalization

Instead of relying on the top-level handling of the translator, use the
newly introduced test_context.
4 files changed with 296 insertions and 274 deletions:
0 comments (0 inline, 0 general)
kallithea/tests/functional/test_admin_notifications.py
Show inline comments
 
from kallithea.tests.base import *
 
from kallithea.model.db import User
 

	
 
from kallithea.model.user import UserModel
 
from kallithea.model.notification import NotificationModel
 
from kallithea.model.meta import Session
 
from kallithea.lib import helpers as h
 

	
 
from kallithea.tests.test_context import test_context
 

	
 
class TestNotificationsController(TestController):
 
    def setup_method(self, method):
 
        self.remove_all_notifications()
 

	
 
    def test_index(self, create_test_user):
 
        self.log_user()
 

	
 
        u1 = create_test_user(dict(username='u1', password='qweqwe',
 
                                   email='u1@example.com',
 
                                   firstname=u'u1', lastname=u'u1',
 
                                   active=True))
 
        u1 = u1.user_id
 
        Session().commit()
 

	
 
        response = self.app.get(url('notifications'))
 
        response.mustcontain('<div class="table">No notifications here yet</div>')
 

	
 
        cur_user = self._get_logged_user()
 
        notif = NotificationModel().create(created_by=u1, subject=u'test_notification_1',
 
                                           body=u'notification_1', recipients=[cur_user])
 
        Session().commit()
 
        with test_context(self.app):
 
            cur_user = self._get_logged_user()
 
            notif = NotificationModel().create(created_by=u1, subject=u'test_notification_1',
 
                                               body=u'notification_1', recipients=[cur_user])
 
            Session().commit()
 

	
 
        response = self.app.get(url('notifications'))
 
        response.mustcontain('id="notification_%s"' % notif.notification_id)
 

	
 
    def test_delete(self, create_test_user):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 

	
 
        u1 = create_test_user(dict(username='u1', password='qweqwe',
 
                                   email='u1@example.com',
 
                                   firstname=u'u1', lastname=u'u1',
 
                                   active=True))
 
        u2 = create_test_user(dict(username='u2', password='qweqwe',
 
                                   email='u2@example.com',
 
                                   firstname=u'u2', lastname=u'u2',
 
                                   active=True))
 
        with test_context(self.app):
 
            u1 = create_test_user(dict(username='u1', password='qweqwe',
 
                                       email='u1@example.com',
 
                                       firstname=u'u1', lastname=u'u1',
 
                                       active=True))
 
            u2 = create_test_user(dict(username='u2', password='qweqwe',
 
                                       email='u2@example.com',
 
                                       firstname=u'u2', lastname=u'u2',
 
                                       active=True))
 

	
 
        # make notifications
 
        notification = NotificationModel().create(created_by=cur_user,
 
                                                  subject=u'test',
 
                                                  body=u'hi there',
 
                                                  recipients=[cur_user, u1, u2])
 
        Session().commit()
 
        u1 = User.get(u1.user_id)
 
        u2 = User.get(u2.user_id)
 
            # make notifications
 
            notification = NotificationModel().create(created_by=cur_user,
 
                                                      subject=u'test',
 
                                                      body=u'hi there',
 
                                                      recipients=[cur_user, u1, u2])
 
            Session().commit()
 
            u1 = User.get(u1.user_id)
 
            u2 = User.get(u2.user_id)
 

	
 
        # check DB
 
        get_notif = lambda un: [x.notification for x in un]
 
        assert get_notif(cur_user.notifications) == [notification]
 
        assert get_notif(u1.notifications) == [notification]
 
        assert get_notif(u2.notifications) == [notification]
 
        cur_usr_id = cur_user.user_id
 

	
 
        response = self.app.post(
 
            url('notification_delete', notification_id=notification.notification_id),
 
            params={'_authentication_token': self.authentication_token()})
 
        assert response.body == 'ok'
 

	
 
        cur_user = User.get(cur_usr_id)
 
        assert cur_user.notifications == []
 

	
 
    def test_show(self, create_test_user):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 
        u1 = create_test_user(dict(username='u1', password='qweqwe',
 
                                   email='u1@example.com',
 
                                   firstname=u'u1', lastname=u'u1',
 
                                   active=True))
 
        u2 = create_test_user(dict(username='u2', password='qweqwe',
 
                                   email='u2@example.com',
 
                                   firstname=u'u2', lastname=u'u2',
 
                                   active=True))
 
        Session().commit()
 
        with test_context(self.app):
 
            cur_user = self._get_logged_user()
 
            u1 = create_test_user(dict(username='u1', password='qweqwe',
 
                                       email='u1@example.com',
 
                                       firstname=u'u1', lastname=u'u1',
 
                                       active=True))
 
            u2 = create_test_user(dict(username='u2', password='qweqwe',
 
                                       email='u2@example.com',
 
                                       firstname=u'u2', lastname=u'u2',
 
                                       active=True))
 
            Session().commit()
 

	
 
        subject = u'test'
 
        notif_body = u'hi there'
 
        notification = NotificationModel().create(created_by=cur_user,
 
                                                  subject=subject,
 
                                                  body=notif_body,
 
                                                  recipients=[cur_user, u1, u2])
 
            subject = u'test'
 
            notif_body = u'hi there'
 
            notification = NotificationModel().create(created_by=cur_user,
 
                                                      subject=subject,
 
                                                      body=notif_body,
 
                                                      recipients=[cur_user, u1, u2])
 

	
 
        response = self.app.get(url('notification',
 
                                    notification_id=notification.notification_id))
 

	
 
        response.mustcontain(subject)
 
        response.mustcontain(notif_body)
 

	
 
    def test_description_with_age(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 
        subject = u'test'
 
        notify_body = u'hi there'
 
        notification = NotificationModel().create(created_by = cur_user,
 
                                                  subject    = subject,
 
                                                  body       = notify_body)
 
        with test_context(self.app):
 
            cur_user = self._get_logged_user()
 
            subject = u'test'
 
            notify_body = u'hi there'
 

	
 
        description = NotificationModel().make_description(notification)
 
        assert description == "{0} sent message {1}".format(
 
                cur_user.username,
 
                h.age(notification.created_on)
 
                )
 
            notification = NotificationModel().create(created_by = cur_user,
 
                                                      subject    = subject,
 
                                                      body       = notify_body)
 

	
 
            description = NotificationModel().make_description(notification)
 
            assert description == "{0} sent message {1}".format(
 
                    cur_user.username,
 
                    h.age(notification.created_on)
 
                    )
 

	
 
    def test_description_with_datetime(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 
        subject = u'test'
 
        notify_body = u'hi there'
 
        notification = NotificationModel().create(created_by = cur_user,
 
                                                  subject    = subject,
 
                                                  body       = notify_body)
 
        with test_context(self.app):
 
            cur_user = self._get_logged_user()
 
            subject = u'test'
 
            notify_body = u'hi there'
 
            notification = NotificationModel().create(created_by = cur_user,
 
                                                      subject    = subject,
 
                                                      body       = notify_body)
 

	
 
        description = NotificationModel().make_description(notification, False)
 
        assert description == "{0} sent message at {1}".format(
 
                cur_user.username,
 
                h.fmt_date(notification.created_on)
 
                )
 
            description = NotificationModel().make_description(notification, False)
 
            assert description == "{0} sent message at {1}".format(
 
                    cur_user.username,
 
                    h.fmt_date(notification.created_on)
 
                    )
 

	
 
    def test_mark_all_read(self, create_test_user):
 
        self.log_user()
 
        u0 = self._get_logged_user()
 
        u1 = create_test_user(dict(username='u1', password='qweqwe',
 
                                   email='u1@example.com',
 
                                   firstname=u'u1', lastname=u'u1',
 
                                   active=True))
 
        u2 = create_test_user(dict(username='u2', password='qweqwe',
 
                                   email='u2@example.com',
 
                                   firstname=u'u2', lastname=u'u2',
 
                                   active=True))
 
        notif = NotificationModel().create(created_by=u1,
 
                                           subject=u'subject',
 
                                           body=u'body',
 
                                           recipients=[u0, u2])
 
        u0_id, u1_id, u2_id = u0.user_id, u1.user_id, u2.user_id
 
        with test_context(self.app):
 
            u0 = self._get_logged_user()
 
            u1 = create_test_user(dict(username='u1', password='qweqwe',
 
                                       email='u1@example.com',
 
                                       firstname=u'u1', lastname=u'u1',
 
                                       active=True))
 
            u2 = create_test_user(dict(username='u2', password='qweqwe',
 
                                       email='u2@example.com',
 
                                       firstname=u'u2', lastname=u'u2',
 
                                       active=True))
 
            notif = NotificationModel().create(created_by=u1,
 
                                               subject=u'subject',
 
                                               body=u'body',
 
                                               recipients=[u0, u2])
 
            u0_id, u1_id, u2_id = u0.user_id, u1.user_id, u2.user_id
 

	
 
        assert [n.read for n in u0.notifications] == [False]
 
        assert u1.notifications == []
 
        assert [n.read for n in u2.notifications] == [False]
 
            assert [n.read for n in u0.notifications] == [False]
 
            assert u1.notifications == []
 
            assert [n.read for n in u2.notifications] == [False]
 

	
 
        # Mark all read for current user.
 

	
 
        response = self.app.get(url('notifications_mark_all_read'), # TODO: should be POST
 
                                extra_environ=dict(HTTP_X_PARTIAL_XHR='1'))
 

	
 
        assert response.status_int == 200
 
        response.mustcontain('id="notification_%s"' % notif.notification_id)
 

	
 
        u0 = User.get(u0_id)
 
        u1 = User.get(u1_id)
 
        u2 = User.get(u2_id)
 

	
 
        assert [n.read for n in u0.notifications] == [True]
 
        assert u1.notifications == []
 
        assert [n.read for n in u2.notifications] == [False]
kallithea/tests/functional/test_admin_users.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
from sqlalchemy.orm.exc import NoResultFound, ObjectDeletedError
 

	
 
import pytest
 
from kallithea.tests.base import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.tests.test_context import test_context
 
from kallithea.controllers.admin.users import UsersController
 
from kallithea.model.db import User, Permission, UserIpMap, UserApiKeys
 
from kallithea.lib.auth import check_password
 
from kallithea.model.user import UserModel
 
from kallithea.model import validators
 
from kallithea.lib import helpers as h
 
from kallithea.model.meta import Session
 
from webob.exc import HTTPNotFound
 

	
 
fixture = Fixture()
 

	
 
@pytest.yield_fixture
 
def user_and_repo_group_fail():
 
    username = 'repogrouperr'
 
    groupname = u'repogroup_fail'
 
    user = fixture.create_user(name=username)
 
    repo_group = fixture.create_repo_group(name=groupname, cur_user=username)
 
    yield user, repo_group
 
    # cleanup
 
    try:
 
        fixture.destroy_repo_group(repo_group)
 
    except ObjectDeletedError:
 
        # delete already succeeded in test body
 
        pass
 

	
 
class TestAdminUsersController(TestController):
 
    test_user_1 = 'testme'
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        if User.get_by_username(cls.test_user_1):
 
            UserModel().delete(cls.test_user_1)
 
            Session().commit()
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('users'))
 
        # TODO: Test response...
 

	
 
    def test_create(self):
 
        self.log_user()
 
        username = 'newtestuser'
 
        password = 'test12'
 
        password_confirmation = password
 
        name = u'name'
 
        lastname = u'lastname'
 
        email = 'mail@example.com'
 

	
 
        response = self.app.post(url('new_user'),
 
            {'username': username,
 
             'password': password,
 
             'password_confirmation': password_confirmation,
 
             'firstname': name,
 
             'active': True,
 
             'lastname': lastname,
 
             'extern_name': 'internal',
 
             'extern_type': 'internal',
 
             'email': email,
 
             '_authentication_token': self.authentication_token()})
 
        # 302 Found
 
        # The resource was found at http://localhost/_admin/users/5/edit; you should be redirected automatically.
 

	
 
        self.checkSessionFlash(response, '''Created user %s''' % username)
 

	
 
        response = response.follow()
 
        response.mustcontain("""%s user settings""" % username) # in <title>
 

	
 
        new_user = Session().query(User). \
 
            filter(User.username == username).one()
 

	
 
        assert new_user.username == username
 
        assert check_password(password, new_user.password) == True
 
        assert new_user.name == name
 
        assert new_user.lastname == lastname
 
        assert new_user.email == email
 

	
 
    def test_create_err(self):
 
        self.log_user()
 
        username = 'new_user'
 
        password = ''
 
        name = u'name'
 
        lastname = u'lastname'
 
        email = 'errmail.example.com'
 

	
 
        response = self.app.post(url('new_user'),
 
            {'username': username,
 
             'password': password,
 
             'name': name,
 
             'active': False,
 
             'lastname': lastname,
 
             'email': email,
 
             '_authentication_token': self.authentication_token()})
 

	
 
        msg = validators.ValidUsername(False, {})._messages['system_invalid_username']
 
        msg = h.html_escape(msg % {'username': 'new_user'})
 
        response.mustcontain("""<span class="error-message">%s</span>""" % msg)
 
        response.mustcontain("""<span class="error-message">Please enter a value</span>""")
 
        response.mustcontain("""<span class="error-message">An email address must contain a single @</span>""")
 

	
 
        def get_user():
 
            Session().query(User).filter(User.username == username).one()
 

	
 
        with pytest.raises(NoResultFound):
 
            get_user(), 'found user in database'
 

	
 
    def test_new(self):
 
        self.log_user()
 
        response = self.app.get(url('new_user'))
 

	
 
    @parametrize('name,attrs',
 
        [('firstname', {'firstname': 'new_username'}),
 
         ('lastname', {'lastname': 'new_username'}),
 
         ('admin', {'admin': True}),
 
         ('admin', {'admin': False}),
 
         ('extern_type', {'extern_type': 'ldap'}),
 
         ('extern_type', {'extern_type': None}),
 
         ('extern_name', {'extern_name': 'test'}),
 
         ('extern_name', {'extern_name': None}),
 
         ('active', {'active': False}),
 
         ('active', {'active': True}),
 
         ('email', {'email': 'someemail@example.com'}),
 
        # ('new_password', {'new_password': 'foobar123',
 
        #                   'password_confirmation': 'foobar123'})
 
        ])
 
    def test_update(self, name, attrs):
 
        self.log_user()
 
        usr = fixture.create_user(self.test_user_1, password='qweqwe',
 
                                  email='testme@example.com',
 
                                  extern_type='internal',
 
                                  extern_name=self.test_user_1,
 
                                  skip_if_exists=True)
 
        Session().commit()
 
        params = usr.get_api_data(True)
 
        params.update({'password_confirmation': ''})
 
        params.update({'new_password': ''})
 
        params.update(attrs)
 
        if name == 'email':
 
            params['emails'] = [attrs['email']]
 
        if name == 'extern_type':
 
            #cannot update this via form, expected value is original one
 
            params['extern_type'] = "internal"
 
        if name == 'extern_name':
 
            #cannot update this via form, expected value is original one
 
            params['extern_name'] = self.test_user_1
 
            # special case since this user is not
 
                                          # logged in yet his data is not filled
 
                                          # so we use creation data
 

	
 
        params.update({'_authentication_token': self.authentication_token()})
 
        response = self.app.post(url('update_user', id=usr.user_id), params)
 
        self.checkSessionFlash(response, 'User updated successfully')
 
        params.pop('_authentication_token')
 

	
 
        updated_user = User.get_by_username(self.test_user_1)
 
        updated_params = updated_user.get_api_data(True)
 
        updated_params.update({'password_confirmation': ''})
 
        updated_params.update({'new_password': ''})
 

	
 
        assert params == updated_params
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        username = 'newtestuserdeleteme'
 

	
 
        fixture.create_user(name=username)
 

	
 
        new_user = Session().query(User) \
 
            .filter(User.username == username).one()
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
            params={'_authentication_token': self.authentication_token()})
 

	
 
        self.checkSessionFlash(response, 'Successfully deleted user')
 

	
 
    def test_delete_repo_err(self):
 
        self.log_user()
 
        username = 'repoerr'
 
        reponame = u'repoerr_fail'
 

	
 
        fixture.create_user(name=username)
 
        fixture.create_repo(name=reponame, cur_user=username)
 

	
 
        new_user = Session().query(User) \
 
            .filter(User.username == username).one()
 
        response = self.app.post(url('delete_user', id=new_user.user_id),
 
            params={'_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'User "%s" still '
 
                               'owns 1 repositories and cannot be removed. '
 
                               'Switch owners or remove those repositories: '
 
                               '%s' % (username, reponame))
 

	
 
        response = self.app.post(url('delete_repo', repo_name=reponame),
 
            params={'_authentication_token': self.authentication_token()})
 
@@ -324,280 +325,281 @@ class TestAdminUsersController(TestContr
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            #User should have None permission on creation repository
 
            assert UserModel().has_perm(uid, perm_none) == True
 
            assert UserModel().has_perm(uid, perm_create) == False
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_add_perm_fork_repo(self):
 
        self.log_user()
 
        perm_none = Permission.get_by_key('hg.fork.none')
 
        perm_fork = Permission.get_by_key('hg.fork.repository')
 

	
 
        user = UserModel().create_or_update(username='dummy', password='qwe',
 
                                            email='dummy', firstname=u'a',
 
                                            lastname=u'b')
 
        Session().commit()
 
        uid = user.user_id
 

	
 
        try:
 
            #User should have None permission on creation repository
 
            assert UserModel().has_perm(user, perm_none) == False
 
            assert UserModel().has_perm(user, perm_fork) == False
 

	
 
            response = self.app.post(url('edit_user_perms_update', id=uid),
 
                                     params=dict(create_repo_perm=True,
 
                                                 _authentication_token=self.authentication_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            #User should have None permission on creation repository
 
            assert UserModel().has_perm(uid, perm_none) == False
 
            assert UserModel().has_perm(uid, perm_create) == True
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_revoke_perm_fork_repo(self):
 
        self.log_user()
 
        perm_none = Permission.get_by_key('hg.fork.none')
 
        perm_fork = Permission.get_by_key('hg.fork.repository')
 

	
 
        user = UserModel().create_or_update(username='dummy', password='qwe',
 
                                            email='dummy', firstname=u'a',
 
                                            lastname=u'b')
 
        Session().commit()
 
        uid = user.user_id
 

	
 
        try:
 
            #User should have None permission on creation repository
 
            assert UserModel().has_perm(user, perm_none) == False
 
            assert UserModel().has_perm(user, perm_fork) == False
 

	
 
            response = self.app.post(url('edit_user_perms_update', id=uid),
 
                                     params=dict(_authentication_token=self.authentication_token()))
 

	
 
            perm_none = Permission.get_by_key('hg.create.none')
 
            perm_create = Permission.get_by_key('hg.create.repository')
 

	
 
            #User should have None permission on creation repository
 
            assert UserModel().has_perm(uid, perm_none) == True
 
            assert UserModel().has_perm(uid, perm_create) == False
 
        finally:
 
            UserModel().delete(uid)
 
            Session().commit()
 

	
 
    def test_ips(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        response = self.app.get(url('edit_user_ips', id=user.user_id))
 
        response.mustcontain('All IP addresses are allowed')
 

	
 
    @parametrize('test_name,ip,ip_range,failure', [
 
        ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False),
 
        ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False),
 
        ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False),
 
        ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False),
 
        ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True),
 
        ('127_bad_ip', 'foobar', 'foobar', True),
 
    ])
 
    def test_add_ip(self, test_name, ip, ip_range, failure, auto_clear_ip_permissions):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_ips_update', id=user_id),
 
                                 params=dict(new_ip=ip, _authentication_token=self.authentication_token()))
 

	
 
        if failure:
 
            self.checkSessionFlash(response, 'Please enter a valid IPv4 or IPv6 address')
 
            response = self.app.get(url('edit_user_ips', id=user_id))
 
            response.mustcontain(no=[ip])
 
            response.mustcontain(no=[ip_range])
 

	
 
        else:
 
            response = self.app.get(url('edit_user_ips', id=user_id))
 
            response.mustcontain(ip)
 
            response.mustcontain(ip_range)
 

	
 
    def test_delete_ip(self, auto_clear_ip_permissions):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 
        ip = '127.0.0.1/32'
 
        ip_range = '127.0.0.1 - 127.0.0.1'
 
        new_ip = UserModel().add_extra_ip(user_id, ip)
 
        Session().commit()
 
        new_ip_id = new_ip.ip_id
 

	
 
        response = self.app.get(url('edit_user_ips', id=user_id))
 
        response.mustcontain(ip)
 
        response.mustcontain(ip_range)
 

	
 
        self.app.post(url('edit_user_ips_delete', id=user_id),
 
                      params=dict(del_ip_id=new_ip_id, _authentication_token=self.authentication_token()))
 

	
 
        response = self.app.get(url('edit_user_ips', id=user_id))
 
        response.mustcontain('All IP addresses are allowed')
 
        response.mustcontain(no=[ip])
 
        response.mustcontain(no=[ip_range])
 

	
 
    def test_api_keys(self):
 
        self.log_user()
 

	
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        response = self.app.get(url('edit_user_api_keys', id=user.user_id))
 
        response.mustcontain(user.api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
    @parametrize('desc,lifetime', [
 
        ('forever', -1),
 
        ('5mins', 60*5),
 
        ('30days', 60*60*24*30),
 
    ])
 
    def test_add_api_keys(self, desc, lifetime):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_api_keys_update', id=user_id),
 
                 {'description': desc, 'lifetime': lifetime, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        try:
 
            response = response.follow()
 
            user = User.get(user_id)
 
            for api_key in user.api_keys:
 
                response.mustcontain(api_key)
 
        finally:
 
            for api_key in UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all():
 
                Session().delete(api_key)
 
                Session().commit()
 

	
 
    def test_remove_api_key(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_api_keys_update', id=user_id),
 
                {'description': 'desc', 'lifetime': -1, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        response = response.follow()
 

	
 
        #now delete our key
 
        keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
 
        assert 1 == len(keys)
 

	
 
        response = self.app.post(url('edit_user_api_keys_delete', id=user_id),
 
                 {'del_api_key': keys[0].api_key, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully deleted')
 
        keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
 
        assert 0 == len(keys)
 

	
 
    def test_reset_main_api_key(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 
        api_key = user.api_key
 
        response = self.app.get(url('edit_user_api_keys', id=user_id))
 
        response.mustcontain(api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
        response = self.app.post(url('edit_user_api_keys_delete', id=user_id),
 
                 {'del_api_key_builtin': api_key, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully reset')
 
        response = response.follow()
 
        response.mustcontain(no=[api_key])
 

	
 

	
 
class TestAdminUsersController_unittest(object):
 
class TestAdminUsersController_unittest(TestController):
 
    """ Unit tests for the users controller """
 

	
 
    def test_get_user_or_raise_if_default(self, monkeypatch):
 
        # flash complains about an non-existing session
 
        def flash_mock(*args, **kwargs):
 
            pass
 
        monkeypatch.setattr(h, 'flash', flash_mock)
 
        with test_context(self.app):
 
            # flash complains about an non-existing session
 
            def flash_mock(*args, **kwargs):
 
                pass
 
            monkeypatch.setattr(h, 'flash', flash_mock)
 

	
 
        u = UsersController()
 
        # a regular user should work correctly
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        assert u._get_user_or_raise_if_default(user.user_id) == user
 
        # the default user should raise
 
        with pytest.raises(HTTPNotFound):
 
            u._get_user_or_raise_if_default(User.get_default_user().user_id)
 
            u = UsersController()
 
            # a regular user should work correctly
 
            user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
            assert u._get_user_or_raise_if_default(user.user_id) == user
 
            # the default user should raise
 
            with pytest.raises(HTTPNotFound):
 
                u._get_user_or_raise_if_default(User.get_default_user().user_id)
 

	
 

	
 
class TestAdminUsersControllerForDefaultUser(TestController):
 
    """
 
    Edit actions on the default user are not allowed.
 
    Validate that they throw a 404 exception.
 
    """
 
    def test_edit_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user', id=user.user_id), status=404)
 

	
 
    def test_edit_advanced_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_advanced', id=user.user_id), status=404)
 

	
 
    # API keys
 
    def test_edit_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_api_keys', id=user.user_id), status=404)
 

	
 
    def test_add_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_api_keys_update', id=user.user_id),
 
                 {'_authentication_token': self.authentication_token()}, status=404)
 

	
 
    def test_delete_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_api_keys_delete', id=user.user_id),
 
                 {'_authentication_token': self.authentication_token()}, status=404)
 

	
 
    # Permissions
 
    def test_edit_perms_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_perms', id=user.user_id), status=404)
 

	
 
    def test_update_perms_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_perms_update', id=user.user_id),
 
                 {'_authentication_token': self.authentication_token()}, status=404)
 

	
 
    # Emails
 
    def test_edit_emails_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_emails', id=user.user_id), status=404)
 

	
 
    def test_add_emails_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_emails_update', id=user.user_id),
 
                 {'_authentication_token': self.authentication_token()}, status=404)
 

	
 
    def test_delete_emails_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_emails_delete', id=user.user_id),
 
                 {'_authentication_token': self.authentication_token()}, status=404)
 

	
 
    # IP addresses
 
    # Add/delete of IP addresses for the default user is used to maintain
 
    # the global IP whitelist and thus allowed. Only 'edit' is forbidden.
 
    def test_edit_ip_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_ips', id=user.user_id), status=404)
kallithea/tests/models/test_notifications.py
Show inline comments
 
import os
 
import re
 

	
 
import mock
 
import routes.util
 

	
 
from kallithea.tests.base import *
 
from kallithea.lib import helpers as h
 
from kallithea.model.db import User, Notification, UserNotification
 
from kallithea.model.user import UserModel
 
from kallithea.model.meta import Session
 
from kallithea.model.notification import NotificationModel, EmailNotificationModel
 

	
 
import kallithea.lib.celerylib
 
import kallithea.lib.celerylib.tasks
 

	
 
from kallithea.tests.test_context import test_context
 

	
 
class TestNotifications(TestController):
 

	
 
    def setup_method(self, method):
 
        Session.remove()
 
        u1 = UserModel().create_or_update(username=u'u1',
 
                                        password=u'qweqwe',
 
                                        email=u'u1@example.com',
 
                                        firstname=u'u1', lastname=u'u1')
 
        Session().commit()
 
        self.u1 = u1.user_id
 

	
 
        u2 = UserModel().create_or_update(username=u'u2',
 
                                        password=u'qweqwe',
 
                                        email=u'u2@example.com',
 
                                        firstname=u'u2', lastname=u'u3')
 
        Session().commit()
 
        self.u2 = u2.user_id
 

	
 
        u3 = UserModel().create_or_update(username=u'u3',
 
                                        password=u'qweqwe',
 
                                        email=u'u3@example.com',
 
                                        firstname=u'u3', lastname=u'u3')
 
        Session().commit()
 
        self.u3 = u3.user_id
 

	
 
        self.remove_all_notifications()
 
        assert [] == Notification.query().all()
 
        assert [] == UserNotification.query().all()
 

	
 
    def test_create_notification(self):
 
        usrs = [self.u1, self.u2]
 
        def send_email(recipients, subject, body='', html_body='', headers=None, author=None):
 
            assert recipients == ['u2@example.com']
 
            assert subject == 'Test Message'
 
            assert body == u"hi there"
 
            assert '>hi there<' in html_body
 
            assert author.username == 'u1'
 
        with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', send_email):
 
            notification = NotificationModel().create(created_by=self.u1,
 
                                               subject=u'subj', body=u'hi there',
 
                                               recipients=usrs)
 
        Session().commit()
 
        u1 = User.get(self.u1)
 
        u2 = User.get(self.u2)
 
        u3 = User.get(self.u3)
 
        notifications = Notification.query().all()
 
        assert len(notifications) == 1
 
        with test_context(self.app):
 
            usrs = [self.u1, self.u2]
 
            def send_email(recipients, subject, body='', html_body='', headers=None, author=None):
 
                assert recipients == ['u2@example.com']
 
                assert subject == 'Test Message'
 
                assert body == u"hi there"
 
                assert '>hi there<' in html_body
 
                assert author.username == 'u1'
 
            with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', send_email):
 
                notification = NotificationModel().create(created_by=self.u1,
 
                                                   subject=u'subj', body=u'hi there',
 
                                                   recipients=usrs)
 
                Session().commit()
 
                u1 = User.get(self.u1)
 
                u2 = User.get(self.u2)
 
                u3 = User.get(self.u3)
 
                notifications = Notification.query().all()
 
                assert len(notifications) == 1
 

	
 
        assert notifications[0].recipients == [u1, u2]
 
        assert notification.notification_id == notifications[0].notification_id
 
                assert notifications[0].recipients == [u1, u2]
 
                assert notification.notification_id == notifications[0].notification_id
 

	
 
        unotification = UserNotification.query() \
 
            .filter(UserNotification.notification == notification).all()
 
                unotification = UserNotification.query() \
 
                    .filter(UserNotification.notification == notification).all()
 

	
 
        assert len(unotification) == len(usrs)
 
        assert set([x.user_id for x in unotification]) == set(usrs)
 
                assert len(unotification) == len(usrs)
 
                assert set([x.user_id for x in unotification]) == set(usrs)
 

	
 
    def test_user_notifications(self):
 
        notification1 = NotificationModel().create(created_by=self.u1,
 
                                            subject=u'subj', body=u'hi there1',
 
                                            recipients=[self.u3])
 
        Session().commit()
 
        notification2 = NotificationModel().create(created_by=self.u1,
 
                                            subject=u'subj', body=u'hi there2',
 
                                            recipients=[self.u3])
 
        Session().commit()
 
        u3 = Session().query(User).get(self.u3)
 
        with test_context(self.app):
 
            notification1 = NotificationModel().create(created_by=self.u1,
 
                                                subject=u'subj', body=u'hi there1',
 
                                                recipients=[self.u3])
 
            Session().commit()
 
            notification2 = NotificationModel().create(created_by=self.u1,
 
                                                subject=u'subj', body=u'hi there2',
 
                                                recipients=[self.u3])
 
            Session().commit()
 
            u3 = Session().query(User).get(self.u3)
 

	
 
        assert sorted([x.notification for x in u3.notifications]) == sorted([notification2, notification1])
 
            assert sorted([x.notification for x in u3.notifications]) == sorted([notification2, notification1])
 

	
 
    def test_delete_notifications(self):
 
        notification = NotificationModel().create(created_by=self.u1,
 
                                           subject=u'title', body=u'hi there3',
 
                                    recipients=[self.u3, self.u1, self.u2])
 
        Session().commit()
 
        notifications = Notification.query().all()
 
        assert notification in notifications
 
        with test_context(self.app):
 
            notification = NotificationModel().create(created_by=self.u1,
 
                                               subject=u'title', body=u'hi there3',
 
                                        recipients=[self.u3, self.u1, self.u2])
 
            Session().commit()
 
            notifications = Notification.query().all()
 
            assert notification in notifications
 

	
 
        Notification.delete(notification.notification_id)
 
        Session().commit()
 
            Notification.delete(notification.notification_id)
 
            Session().commit()
 

	
 
        notifications = Notification.query().all()
 
        assert not notification in notifications
 
            notifications = Notification.query().all()
 
            assert not notification in notifications
 

	
 
        un = UserNotification.query().filter(UserNotification.notification
 
                                             == notification).all()
 
        assert un == []
 
            un = UserNotification.query().filter(UserNotification.notification
 
                                                 == notification).all()
 
            assert un == []
 

	
 
    def test_delete_association(self):
 
        notification = NotificationModel().create(created_by=self.u1,
 
                                           subject=u'title', body=u'hi there3',
 
                                    recipients=[self.u3, self.u1, self.u2])
 
        Session().commit()
 
        with test_context(self.app):
 
            notification = NotificationModel().create(created_by=self.u1,
 
                                               subject=u'title', body=u'hi there3',
 
                                        recipients=[self.u3, self.u1, self.u2])
 
            Session().commit()
 

	
 
        unotification = UserNotification.query() \
 
                            .filter(UserNotification.notification ==
 
                                    notification) \
 
                            .filter(UserNotification.user_id == self.u3) \
 
                            .scalar()
 
            unotification = UserNotification.query() \
 
                                .filter(UserNotification.notification ==
 
                                        notification) \
 
                                .filter(UserNotification.user_id == self.u3) \
 
                                .scalar()
 

	
 
        assert unotification.user_id == self.u3
 
            assert unotification.user_id == self.u3
 

	
 
        NotificationModel().delete(self.u3,
 
                                   notification.notification_id)
 
        Session().commit()
 
            NotificationModel().delete(self.u3,
 
                                       notification.notification_id)
 
            Session().commit()
 

	
 
        u3notification = UserNotification.query() \
 
                            .filter(UserNotification.notification ==
 
                                    notification) \
 
                            .filter(UserNotification.user_id == self.u3) \
 
                            .scalar()
 
            u3notification = UserNotification.query() \
 
                                .filter(UserNotification.notification ==
 
                                        notification) \
 
                                .filter(UserNotification.user_id == self.u3) \
 
                                .scalar()
 

	
 
        assert u3notification == None
 
            assert u3notification == None
 

	
 
        # notification object is still there
 
        assert Notification.query().all() == [notification]
 
            # notification object is still there
 
            assert Notification.query().all() == [notification]
 

	
 
        #u1 and u2 still have assignments
 
        u1notification = UserNotification.query() \
 
                            .filter(UserNotification.notification ==
 
                                    notification) \
 
                            .filter(UserNotification.user_id == self.u1) \
 
                            .scalar()
 
        assert u1notification != None
 
        u2notification = UserNotification.query() \
 
                            .filter(UserNotification.notification ==
 
                                    notification) \
 
                            .filter(UserNotification.user_id == self.u2) \
 
                            .scalar()
 
        assert u2notification != None
 
            #u1 and u2 still have assignments
 
            u1notification = UserNotification.query() \
 
                                .filter(UserNotification.notification ==
 
                                        notification) \
 
                                .filter(UserNotification.user_id == self.u1) \
 
                                .scalar()
 
            assert u1notification != None
 
            u2notification = UserNotification.query() \
 
                                .filter(UserNotification.notification ==
 
                                        notification) \
 
                                .filter(UserNotification.user_id == self.u2) \
 
                                .scalar()
 
            assert u2notification != None
 

	
 
    def test_notification_counter(self):
 
        NotificationModel().create(created_by=self.u1,
 
                            subject=u'title', body=u'hi there_delete',
 
                            recipients=[self.u3, self.u1])
 
        Session().commit()
 

	
 
        assert NotificationModel().get_unread_cnt_for_user(self.u1) == 0
 
        assert NotificationModel().get_unread_cnt_for_user(self.u2) == 0
 
        assert NotificationModel().get_unread_cnt_for_user(self.u3) == 1
 
        with test_context(self.app):
 
            NotificationModel().create(created_by=self.u1,
 
                                subject=u'title', body=u'hi there_delete',
 
                                recipients=[self.u3, self.u1])
 
            Session().commit()
 

	
 
        notification = NotificationModel().create(created_by=self.u1,
 
                                           subject=u'title', body=u'hi there3',
 
                                    recipients=[self.u3, self.u1, self.u2])
 
        Session().commit()
 
            assert NotificationModel().get_unread_cnt_for_user(self.u1) == 0
 
            assert NotificationModel().get_unread_cnt_for_user(self.u2) == 0
 
            assert NotificationModel().get_unread_cnt_for_user(self.u3) == 1
 

	
 
        assert NotificationModel().get_unread_cnt_for_user(self.u1) == 0
 
        assert NotificationModel().get_unread_cnt_for_user(self.u2) == 1
 
        assert NotificationModel().get_unread_cnt_for_user(self.u3) == 2
 
            notification = NotificationModel().create(created_by=self.u1,
 
                                               subject=u'title', body=u'hi there3',
 
                                        recipients=[self.u3, self.u1, self.u2])
 
            Session().commit()
 

	
 
            assert NotificationModel().get_unread_cnt_for_user(self.u1) == 0
 
            assert NotificationModel().get_unread_cnt_for_user(self.u2) == 1
 
            assert NotificationModel().get_unread_cnt_for_user(self.u3) == 2
 

	
 
    @mock.patch.object(h, 'canonical_url', (lambda arg, **kwargs: 'http://%s/?%s' % (arg, '&'.join('%s=%s' % (k, v) for (k, v) in sorted(kwargs.items())))))
 
    def test_dump_html_mails(self):
 
        # Exercise all notification types and dump them to one big html file
 
        l = []
 

	
 
        def send_email(recipients, subject, body='', html_body='', headers=None, author=None):
 
            l.append('<hr/>\n')
 
            l.append('<h1>%s</h1>\n' % desc) # desc is from outer scope
 
            l.append('<pre>\n')
 
            l.append('From: %s\n' % author.username)
 
            l.append('To: %s\n' % ' '.join(recipients))
 
            l.append('Subject: %s\n' % subject)
 
            l.append('</pre>\n')
 
            l.append('<hr/>\n')
 
            l.append('<pre>%s</pre>\n' % body)
 
            l.append('<hr/>\n')
 
            l.append(html_body)
 
            l.append('<hr/>\n')
 

	
 
        with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', send_email):
 
            pr_kwargs = dict(
 
                pr_nice_id='#7',
 
                pr_title='The Title',
 
                pr_title_short='The Title',
 
                pr_url='http://pr.org/7',
 
                pr_target_repo='http://mainline.com/repo',
 
                pr_target_branch='trunk',
 
                pr_source_repo='https://dev.org/repo',
 
                pr_source_branch='devbranch',
 
                pr_owner=User.get(self.u2),
 
                pr_owner_username='u2'
 
                )
 
        with test_context(self.app):
 
            with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', send_email):
 
                pr_kwargs = dict(
 
                    pr_nice_id='#7',
 
                    pr_title='The Title',
 
                    pr_title_short='The Title',
 
                    pr_url='http://pr.org/7',
 
                    pr_target_repo='http://mainline.com/repo',
 
                    pr_target_branch='trunk',
 
                    pr_source_repo='https://dev.org/repo',
 
                    pr_source_branch='devbranch',
 
                    pr_owner=User.get(self.u2),
 
                    pr_owner_username='u2'
 
                    )
 

	
 
            for type_, body, kwargs in [
 
                (Notification.TYPE_CHANGESET_COMMENT,
 
                 u'This is the new comment.\n\n - and here it ends indented.',
 
                 dict(
 
                    short_id='cafe1234',
 
                    raw_id='cafe1234c0ffeecafe',
 
                    branch='brunch',
 
                    cs_comment_user='Opinionated User (jsmith)',
 
                    cs_comment_url='http://comment.org',
 
                    is_mention=[False, True],
 
                    message='This changeset did something clever which is hard to explain',
 
                    message_short='This changeset did something cl...',
 
                    status_change=[None, 'Approved'],
 
                    cs_target_repo='repo_target',
 
                    cs_url='http://changeset.com',
 
                    cs_author=User.get(self.u2))),
 
                (Notification.TYPE_MESSAGE,
 
                 u'This is the body of the test message\n - nothing interesting here except indentation.',
 
                 dict()),
 
                #(Notification.TYPE_MENTION, '$body', None), # not used
 
                (Notification.TYPE_REGISTRATION,
 
                 u'Registration body',
 
                 dict(
 
                    new_username='newbie',
 
                    registered_user_url='http://newbie.org',
 
                    new_email='new@email.com',
 
                    new_full_name='New Full Name')),
 
                (Notification.TYPE_PULL_REQUEST,
 
                 u'This PR is awesome because it does stuff\n - please approve indented!',
 
                 dict(
 
                    pr_user_created='Requesting User (root)', # pr_owner should perhaps be used for @mention in description ...
 
                    is_mention=[False, True],
 
                    pr_revisions=[('123abc'*7, "Introduce one and two\n\nand that's it"), ('567fed'*7, 'Make one plus two equal tree')],
 
                    org_repo_name='repo_org',
 
                    **pr_kwargs)),
 
                (Notification.TYPE_PULL_REQUEST_COMMENT,
 
                 u'Me too!\n\n - and indented on second line',
 
                 dict(
 
                    closing_pr=[False, True],
 
                    is_mention=[False, True],
 
                    pr_comment_user='Opinionated User (jsmith)',
 
                    pr_comment_url='http://pr.org/comment',
 
                    status_change=[None, 'Under Review'],
 
                    **pr_kwargs)),
 
                ]:
 
                kwargs['repo_name'] = u'repo/name'
 
                params = [(type_, type_, body, kwargs)]
 
                for param_name in ['is_mention', 'status_change', 'closing_pr']: # TODO: inline/general
 
                    if not isinstance(kwargs.get(param_name), list):
 
                        continue
 
                    new_params = []
 
                    for v in kwargs[param_name]:
 
                        for desc, type_, body, kwargs in params:
 
                            kwargs = dict(kwargs)
 
                            kwargs[param_name] = v
 
                            new_params.append(('%s, %s=%r' % (desc, param_name, v), type_, body, kwargs))
 
                    params = new_params
 
                for type_, body, kwargs in [
 
                    (Notification.TYPE_CHANGESET_COMMENT,
 
                     u'This is the new comment.\n\n - and here it ends indented.',
 
                     dict(
 
                        short_id='cafe1234',
 
                        raw_id='cafe1234c0ffeecafe',
 
                        branch='brunch',
 
                        cs_comment_user='Opinionated User (jsmith)',
 
                        cs_comment_url='http://comment.org',
 
                        is_mention=[False, True],
 
                        message='This changeset did something clever which is hard to explain',
 
                        message_short='This changeset did something cl...',
 
                        status_change=[None, 'Approved'],
 
                        cs_target_repo='repo_target',
 
                        cs_url='http://changeset.com',
 
                        cs_author=User.get(self.u2))),
 
                    (Notification.TYPE_MESSAGE,
 
                     u'This is the body of the test message\n - nothing interesting here except indentation.',
 
                     dict()),
 
                    #(Notification.TYPE_MENTION, '$body', None), # not used
 
                    (Notification.TYPE_REGISTRATION,
 
                     u'Registration body',
 
                     dict(
 
                        new_username='newbie',
 
                        registered_user_url='http://newbie.org',
 
                        new_email='new@email.com',
 
                        new_full_name='New Full Name')),
 
                    (Notification.TYPE_PULL_REQUEST,
 
                     u'This PR is awesome because it does stuff\n - please approve indented!',
 
                     dict(
 
                        pr_user_created='Requesting User (root)', # pr_owner should perhaps be used for @mention in description ...
 
                        is_mention=[False, True],
 
                        pr_revisions=[('123abc'*7, "Introduce one and two\n\nand that's it"), ('567fed'*7, 'Make one plus two equal tree')],
 
                        org_repo_name='repo_org',
 
                        **pr_kwargs)),
 
                    (Notification.TYPE_PULL_REQUEST_COMMENT,
 
                     u'Me too!\n\n - and indented on second line',
 
                     dict(
 
                        closing_pr=[False, True],
 
                        is_mention=[False, True],
 
                        pr_comment_user='Opinionated User (jsmith)',
 
                        pr_comment_url='http://pr.org/comment',
 
                        status_change=[None, 'Under Review'],
 
                        **pr_kwargs)),
 
                    ]:
 
                    kwargs['repo_name'] = u'repo/name'
 
                    params = [(type_, type_, body, kwargs)]
 
                    for param_name in ['is_mention', 'status_change', 'closing_pr']: # TODO: inline/general
 
                        if not isinstance(kwargs.get(param_name), list):
 
                            continue
 
                        new_params = []
 
                        for v in kwargs[param_name]:
 
                            for desc, type_, body, kwargs in params:
 
                                kwargs = dict(kwargs)
 
                                kwargs[param_name] = v
 
                                new_params.append(('%s, %s=%r' % (desc, param_name, v), type_, body, kwargs))
 
                        params = new_params
 

	
 
                for desc, type_, body, kwargs in params:
 
                    # desc is used as "global" variable
 
                    notification = NotificationModel().create(created_by=self.u1,
 
                                                       subject=u'unused', body=body, email_kwargs=kwargs,
 
                                                       recipients=[self.u2], type_=type_)
 
                    for desc, type_, body, kwargs in params:
 
                        # desc is used as "global" variable
 
                        notification = NotificationModel().create(created_by=self.u1,
 
                                                           subject=u'unused', body=body, email_kwargs=kwargs,
 
                                                           recipients=[self.u2], type_=type_)
 

	
 
            # Email type TYPE_PASSWORD_RESET has no corresponding notification type - test it directly:
 
            desc = 'TYPE_PASSWORD_RESET'
 
            kwargs = dict(user='John Doe', reset_token='decbf64715098db5b0bd23eab44bd792670ab746', reset_url='http://reset.com/decbf64715098db5b0bd23eab44bd792670ab746')
 
            kallithea.lib.celerylib.tasks.send_email(['john@doe.com'],
 
                "Password reset link",
 
                EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'txt', **kwargs),
 
                EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'html', **kwargs),
 
                author=User.get(self.u1))
 
                # Email type TYPE_PASSWORD_RESET has no corresponding notification type - test it directly:
 
                desc = 'TYPE_PASSWORD_RESET'
 
                kwargs = dict(user='John Doe', reset_token='decbf64715098db5b0bd23eab44bd792670ab746', reset_url='http://reset.com/decbf64715098db5b0bd23eab44bd792670ab746')
 
                kallithea.lib.celerylib.tasks.send_email(['john@doe.com'],
 
                    "Password reset link",
 
                    EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'txt', **kwargs),
 
                    EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'html', **kwargs),
 
                    author=User.get(self.u1))
 

	
 
        out = '<!doctype html>\n<html lang="en">\n<head><title>Notifications</title><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"></head>\n<body>\n%s\n</body>\n</html>\n' % \
 
            re.sub(r'<(/?(?:!doctype|html|head|title|meta|body)\b[^>]*)>', r'<!--\1-->', ''.join(l))
 

	
 
        outfn = os.path.join(os.path.dirname(__file__), 'test_dump_html_mails.out.html')
 
        reffn = os.path.join(os.path.dirname(__file__), 'test_dump_html_mails.ref.html')
 
        with file(outfn, 'w') as f:
 
            f.write(out)
 
        with file(reffn) as f:
 
            ref = f.read()
 
        assert ref == out # copy test_dump_html_mails.out.html to test_dump_html_mails.ref.html to update expectations
 
        os.unlink(outfn)
kallithea/tests/other/test_libs.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.tests.other.test_libs
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Package for testing various lib/helper functions in kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 9, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import datetime
 
import hashlib
 
import mock
 
from kallithea.tests.base import *
 
from kallithea.lib.utils2 import AttributeDict
 
from kallithea.model.db import Repository
 
from kallithea.tests.test_context import test_context
 

	
 
proto = 'http'
 
TEST_URLS = [
 
    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
 
     '%s://127.0.0.1:8080' % proto),
 
    ('%s://example.com' % proto, ['%s://' % proto, 'example.com'],
 
     '%s://example.com' % proto),
 
    ('%s://user:pass@example.com:8080' % proto, ['%s://' % proto, 'example.com',
 
                                                '8080'],
 
     '%s://example.com:8080' % proto),
 
]
 

	
 
proto = 'https'
 
TEST_URLS += [
 
    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
 
     '%s://127.0.0.1:8080' % proto),
 
    ('%s://example.com' % proto, ['%s://' % proto, 'example.com'],
 
     '%s://example.com' % proto),
 
    ('%s://user:pass@example.com:8080' % proto, ['%s://' % proto, 'example.com',
 
                                                '8080'],
 
     '%s://example.com:8080' % proto),
 
]
 

	
 
class FakeUrlGenerator(object):
 

	
 
    def __init__(self, current_url=None, default_route=None, **routes):
 
        """Initialize using specified 'current' URL template,
 
        default route template, and all other aguments describing known
 
        routes (format: route=template)"""
 
        self.current_url = current_url
 
        self.default_route = default_route
 
        self.routes = routes
 

	
 
    def __call__(self, route_name, *args, **kwargs):
 
        if route_name in self.routes:
 
            return self.routes[route_name] % kwargs
 

	
 
        return self.default_route % kwargs
 

	
 
    def current(self, *args, **kwargs):
 
        return self.current_url % kwargs
 

	
 
class TestLibs(TestController):
 

	
 
    @parametrize('test_url,expected,expected_creds', TEST_URLS)
 
    def test_uri_filter(self, test_url, expected, expected_creds):
 
        from kallithea.lib.utils2 import uri_filter
 
        assert uri_filter(test_url) == expected
 

	
 
    @parametrize('test_url,expected,expected_creds', TEST_URLS)
 
    def test_credentials_filter(self, test_url, expected, expected_creds):
 
        from kallithea.lib.utils2 import credentials_filter
 
        assert credentials_filter(test_url) == expected_creds
 

	
 
    @parametrize('str_bool,expected', [
 
                           ('t', True),
 
                           ('true', True),
 
                           ('y', True),
 
                           ('yes', True),
 
                           ('on', True),
 
                           ('1', True),
 
                           ('Y', True),
 
                           ('yeS', True),
 
                           ('Y', True),
 
                           ('TRUE', True),
 
                           ('T', True),
 
                           ('False', False),
 
                           ('F', False),
 
                           ('FALSE', False),
 
                           ('0', False),
 
                           ('-1', False),
 
                           ('', False)
 
    ])
 
    def test_str2bool(self, str_bool, expected):
 
        from kallithea.lib.utils2 import str2bool
 
        assert str2bool(str_bool) == expected
 

	
 
    def test_mention_extractor(self):
 
        from kallithea.lib.utils2 import extract_mentioned_usernames
 
        sample = (
 
            "@first hi there @world here's my email username@example.com "
 
            "@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three "
 
            "@UPPER    @cAmEL @2one_more22 @john please see this http://org.pl "
 
            "@marian.user just do it @marco-polo and next extract @marco_polo "
 
            "user.dot  hej ! not-needed maril@example.com"
 
        )
 

	
 
        expected = set([
 
            '2one_more22', 'first', 'lukaszb', 'one', 'one_more22', 'UPPER', 'cAmEL', 'john',
 
            'marian.user', 'marco-polo', 'marco_polo', 'world'])
 
        assert expected == set(extract_mentioned_usernames(sample))
 

	
 
    @parametrize('age_args,expected', [
 
        (dict(), u'just now'),
 
        (dict(seconds= -1), u'1 second ago'),
 
        (dict(seconds= -60 * 2), u'2 minutes ago'),
 
        (dict(hours= -1), u'1 hour ago'),
 
        (dict(hours= -24), u'1 day ago'),
 
        (dict(hours= -24 * 5), u'5 days ago'),
 
        (dict(months= -1), u'1 month ago'),
 
        (dict(months= -1, days= -2), u'1 month and 2 days ago'),
 
        (dict(months= -1, days= -20), u'1 month and 19 days ago'),
 
        (dict(years= -1, months= -1), u'1 year and 1 month ago'),
 
        (dict(years= -1, months= -10), u'1 year and 10 months ago'),
 
        (dict(years= -2, months= -4), u'2 years and 4 months ago'),
 
        (dict(years= -2, months= -11), u'2 years and 11 months ago'),
 
        (dict(years= -3, months= -2), u'3 years and 2 months ago'),
 
    ])
 
    def test_age(self, age_args, expected):
 
        from kallithea.lib.utils2 import age
 
        from dateutil import relativedelta
 
        n = datetime.datetime(year=2012, month=5, day=17)
 
        delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
        assert age(n + delt(**age_args), now=n) == expected
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), now=n) == expected
 

	
 
    @parametrize('age_args,expected', [
 
        (dict(), u'just now'),
 
        (dict(seconds= -1), u'1 second ago'),
 
        (dict(seconds= -60 * 2), u'2 minutes ago'),
 
        (dict(hours= -1), u'1 hour ago'),
 
        (dict(hours= -24), u'1 day ago'),
 
        (dict(hours= -24 * 5), u'5 days ago'),
 
        (dict(months= -1), u'1 month ago'),
 
        (dict(months= -1, days= -2), u'1 month ago'),
 
        (dict(months= -1, days= -20), u'1 month ago'),
 
        (dict(years= -1, months= -1), u'13 months ago'),
 
        (dict(years= -1, months= -10), u'22 months ago'),
 
        (dict(years= -2, months= -4), u'2 years ago'),
 
        (dict(years= -2, months= -11), u'3 years ago'),
 
        (dict(years= -3, months= -2), u'3 years ago'),
 
        (dict(years= -4, months= -8), u'5 years ago'),
 
    ])
 
    def test_age_short(self, age_args, expected):
 
        from kallithea.lib.utils2 import age
 
        from dateutil import relativedelta
 
        n = datetime.datetime(year=2012, month=5, day=17)
 
        delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
        assert age(n + delt(**age_args), show_short_version=True, now=n) == expected
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), show_short_version=True, now=n) == expected
 

	
 
    @parametrize('age_args,expected', [
 
        (dict(), u'just now'),
 
        (dict(seconds=1), u'in 1 second'),
 
        (dict(seconds=60 * 2), u'in 2 minutes'),
 
        (dict(hours=1), u'in 1 hour'),
 
        (dict(hours=24), u'in 1 day'),
 
        (dict(hours=24 * 5), u'in 5 days'),
 
        (dict(months=1), u'in 1 month'),
 
        (dict(months=1, days=1), u'in 1 month and 1 day'),
 
        (dict(years=1, months=1), u'in 1 year and 1 month')
 
    ])
 
    def test_age_in_future(self, age_args, expected):
 
        from kallithea.lib.utils2 import age
 
        from dateutil import relativedelta
 
        n = datetime.datetime(year=2012, month=5, day=17)
 
        delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
        assert age(n + delt(**age_args), now=n) == expected
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), now=n) == expected
 

	
 
    def test_tag_extractor(self):
 
        sample = (
 
            "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]"
 
            "[requires] [stale] [see<>=>] [see => http://example.com]"
 
            "[requires => url] [lang => python] [just a tag]"
 
            "[,d] [ => ULR ] [obsolete] [desc]]"
 
        )
 
        from kallithea.lib.helpers import urlify_text
 
        res = urlify_text(sample, stylize=True)
 
        assert '<div class="metatag" tag="tag">tag</div>' in res
 
        assert '<div class="metatag" tag="obsolete">obsolete</div>' in res
 
        assert '<div class="metatag" tag="stale">stale</div>' in res
 
        assert '<div class="metatag" tag="lang">python</div>' in res
 
        assert '<div class="metatag" tag="requires">requires =&gt; <a href="/url">url</a></div>' in res
 
        assert '<div class="metatag" tag="tag">tag</div>' in res
 

	
 
    def test_alternative_gravatar(self):
 
        from kallithea.lib.helpers import gravatar_url
 
        _md5 = lambda s: hashlib.md5(s).hexdigest()
 

	
 
        #mock pylons.tmpl_context
 
        def fake_tmpl_context(_url):
 
            _c = AttributeDict()
 
            _c.visual = AttributeDict()
 
            _c.visual.use_gravatar = True
 
            _c.visual.gravatar_url = _url
 

	
 
            return _c
 

	
 
        fake_url = FakeUrlGenerator(current_url='https://example.com')
 
        with mock.patch('kallithea.config.routing.url', fake_url):
 
            fake = fake_tmpl_context(_url='http://example.com/{email}')
 
            with mock.patch('pylons.tmpl_context', fake):
 
                    from kallithea.config.routing import url
 
                    assert url.current() == 'https://example.com'
 
                    grav = gravatar_url(email_address='test@example.com', size=24)
 
                    assert grav == 'http://example.com/test@example.com'
 

	
 
            fake = fake_tmpl_context(_url='http://example.com/{email}')
 
            with mock.patch('pylons.tmpl_context', fake):
 
                grav = gravatar_url(email_address='test@example.com', size=24)
 
                assert grav == 'http://example.com/test@example.com'
 

	
 
            fake = fake_tmpl_context(_url='http://example.com/{md5email}')
 
            with mock.patch('pylons.tmpl_context', fake):
 
                em = 'test@example.com'
 
                grav = gravatar_url(email_address=em, size=24)
 
                assert grav == 'http://example.com/%s' % (_md5(em))
 

	
 
            fake = fake_tmpl_context(_url='http://example.com/{md5email}/{size}')
 
            with mock.patch('pylons.tmpl_context', fake):
 
                em = 'test@example.com'
 
                grav = gravatar_url(email_address=em, size=24)
 
                assert grav == 'http://example.com/%s/%s' % (_md5(em), 24)
 

	
 
            fake = fake_tmpl_context(_url='{scheme}://{netloc}/{md5email}/{size}')
 
            with mock.patch('pylons.tmpl_context', fake):
 
                em = 'test@example.com'
 
                grav = gravatar_url(email_address=em, size=24)
 
                assert grav == 'https://example.com/%s/%s' % (_md5(em), 24)
 

	
 
    @parametrize('tmpl,repo_name,overrides,prefix,expected', [
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', {}, '', 'http://vps1:8000/group/repo1'),
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'username'}, '', 'http://username@vps1:8000/group/repo1'),
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', {}, '/prefix', 'http://vps1:8000/prefix/group/repo1'),
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'user'}, '/prefix', 'http://user@vps1:8000/prefix/group/repo1'),
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'username'}, '/prefix', 'http://username@vps1:8000/prefix/group/repo1'),
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'user'}, '/prefix/', 'http://user@vps1:8000/prefix/group/repo1'),
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'username'}, '/prefix/', 'http://username@vps1:8000/prefix/group/repo1'),
 
        ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', {}, '', 'http://vps1:8000/_23'),
 
        ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', {'user': 'username'}, '', 'http://username@vps1:8000/_23'),
 
        ('http://{user}@{netloc}/_{repoid}', 'group/repo1', {'user': 'username'}, '', 'http://username@vps1:8000/_23'),
 
        ('http://{netloc}/_{repoid}', 'group/repo1', {'user': 'username'}, '', 'http://vps1:8000/_23'),
 
        ('https://{user}@proxy1.example.com/{repo}', 'group/repo1', {'user': 'username'}, '', 'https://username@proxy1.example.com/group/repo1'),
 
        ('https://{user}@proxy1.example.com/{repo}', 'group/repo1', {}, '', 'https://proxy1.example.com/group/repo1'),
 
        ('https://proxy1.example.com/{user}/{repo}', 'group/repo1', {'user': 'username'}, '', 'https://proxy1.example.com/username/group/repo1'),
 
    ])
 
    def test_clone_url_generator(self, tmpl, repo_name, overrides, prefix, expected):
 
        from kallithea.lib.utils2 import get_clone_url
 
        clone_url = get_clone_url(uri_tmpl=tmpl, qualified_home_url='http://vps1:8000'+prefix,
 
                                  repo_name=repo_name, repo_id=23, **overrides)
 
        assert clone_url == expected
 

	
 
    def _quick_url(self, text, tmpl="""<a class="revision-link" href="%s">%s</a>""", url_=None):
 
        """
 
        Changes `some text url[foo]` => `some text <a href="/">foo</a>
 

	
 
        :param text:
 
        """
 
        import re
 
        # quickly change expected url[] into a link
 
        URL_PAT = re.compile(r'(?:url\[)(.+?)(?:\])')
 

	
 
        def url_func(match_obj):
 
            _url = match_obj.groups()[0]
 
            return tmpl % (url_ or '/repo_name/changeset/%s' % _url, _url)
 
        return URL_PAT.sub(url_func, text)
 

	
 
    @parametrize('sample,expected', [
 
      ("",
 
       ""),
 
      ("git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68",
 
       """git-svn-id: <a href="https://svn.apache.org/repos/asf/libcloud/trunk@1441655">https://svn.apache.org/repos/asf/libcloud/trunk@1441655</a> 13f79535-47bb-0310-9956-ffa450edef68"""),
 
      ("from rev 000000000000",
 
       """from rev url[000000000000]"""),
 
      ("from rev 000000000000123123 also rev 000000000000",
 
       """from rev url[000000000000123123] also rev url[000000000000]"""),
 
      ("this should-000 00",
 
       """this should-000 00"""),
 
      ("longtextffffffffff rev 123123123123",
 
       """longtextffffffffff rev url[123123123123]"""),
 
      ("rev ffffffffffffffffffffffffffffffffffffffffffffffffff",
 
       """rev ffffffffffffffffffffffffffffffffffffffffffffffffff"""),
 
      ("ffffffffffff some text traalaa",
 
       """url[ffffffffffff] some text traalaa"""),
 
       ("""Multi line
 
       123123123123
 
       some text 123123123123
 
       sometimes !
 
       """,
 
       """Multi line<br/>"""
 
       """       url[123123123123]<br/>"""
 
       """       some text url[123123123123]<br/>"""
 
       """       sometimes !"""),
 
    ])
 
    def test_urlify_text(self, sample, expected):
 
        expected = self._quick_url(expected)
 
        fake_url = FakeUrlGenerator(changeset_home='/%(repo_name)s/changeset/%(revision)s')
 
        with mock.patch('kallithea.config.routing.url', fake_url):
 
            from kallithea.lib.helpers import urlify_text
 
            assert urlify_text(sample, 'repo_name') == expected
 

	
 
    @parametrize('sample,expected,url_', [
 
      ("",
 
       "",
 
       ""),
 
      ("https://svn.apache.org/repos",
 
       """url[https://svn.apache.org/repos]""",
 
       "https://svn.apache.org/repos"),
 
      ("http://svn.apache.org/repos",
 
       """url[http://svn.apache.org/repos]""",
 
       "http://svn.apache.org/repos"),
 
      ("from rev a also rev http://google.com",
 
       """from rev a also rev url[http://google.com]""",
 
       "http://google.com"),
 
      ("http://imgur.com/foo.gif inline http://imgur.com/foo.gif ending http://imgur.com/foo.gif",
 
       """url[http://imgur.com/foo.gif] inline url[http://imgur.com/foo.gif] ending url[http://imgur.com/foo.gif]""",
 
       "http://imgur.com/foo.gif"),
 
      ("""Multi line
 
       https://foo.bar.example.com
 
       some text lalala""",
 
       """Multi line<br/>"""
 
       """       url[https://foo.bar.example.com]<br/>"""
 
       """       some text lalala""",
 
       "https://foo.bar.example.com"),
 
      ("@mention @someone",
 
       """<b>@mention</b> <b>@someone</b>""",
 
       ""),
 
      ("deadbeefcafe 123412341234",
 
       """<a class="revision-link" href="/repo_name/changeset/deadbeefcafe">deadbeefcafe</a> <a class="revision-link" href="/repo_name/changeset/123412341234">123412341234</a>""",
 
       ""),
 
      ("We support * markup for *bold* markup of *single or multiple* words, "
 
       "*a bit @like http://slack.com*. "
 
       "The first * must come after whitespace and not be followed by whitespace, "
 
       "contain anything but * and newline until the next *, "
 
       "which must not come after whitespace "
 
       "and not be followed by * or alphanumerical *characters*.",
 
       """We support * markup for <b>*bold*</b> markup of <b>*single or multiple*</b> words, """
 
       """<b>*a bit <b>@like</b> <a href="http://slack.com">http://slack.com</a>*</b>. """
 
       """The first * must come after whitespace and not be followed by whitespace, """
 
       """contain anything but * and newline until the next *, """
 
       """which must not come after whitespace """
 
       """and not be followed by * or alphanumerical <b>*characters*</b>.""",
 
       "-"),
 
      # tags are covered by test_tag_extractor
 
    ])
 
    def test_urlify_test(self, sample, expected, url_):
 
        expected = self._quick_url(expected,
 
                                   tmpl="""<a href="%s">%s</a>""", url_=url_)
 
        fake_url = FakeUrlGenerator(changeset_home='/%(repo_name)s/changeset/%(revision)s')
 
        with mock.patch('kallithea.config.routing.url', fake_url):
 
            from kallithea.lib.helpers import urlify_text
 
            assert urlify_text(sample, 'repo_name', stylize=True) == expected
 

	
 
    @parametrize('sample,expected', [
 
      ("deadbeefcafe @mention, and http://foo.bar/ yo",
 
       """<a class="revision-link" href="/repo_name/changeset/deadbeefcafe">deadbeefcafe</a>"""
 
       """<a class="message-link" href="#the-link"> <b>@mention</b>, and </a>"""
 
       """<a href="http://foo.bar/">http://foo.bar/</a>"""
 
       """<a class="message-link" href="#the-link"> yo</a>"""),
 
    ])
0 comments (0 inline, 0 general)