Changeset - 8e3137064ab6
[Not reviewed]
default
0 4 0
Thomas De Schampheleire - 9 years ago 2016-11-22 09:04:45
thomas.de.schampheleire@gmail.com
tests: use test_context for tests needing internationalization

Instead of relying on the top-level handling of the translator, use the
newly introduced test_context.
4 files changed with 296 insertions and 274 deletions:
0 comments (0 inline, 0 general)
kallithea/tests/functional/test_admin_notifications.py
Show inline comments
 
from kallithea.tests.base import *
 
from kallithea.model.db import User
 

	
 
from kallithea.model.user import UserModel
 
from kallithea.model.notification import NotificationModel
 
from kallithea.model.meta import Session
 
from kallithea.lib import helpers as h
 

	
 
from kallithea.tests.test_context import test_context
 

	
 
class TestNotificationsController(TestController):
 
    def setup_method(self, method):
 
        self.remove_all_notifications()
 

	
 
    def test_index(self, create_test_user):
 
        self.log_user()
 

	
 
        u1 = create_test_user(dict(username='u1', password='qweqwe',
 
                                   email='u1@example.com',
 
                                   firstname=u'u1', lastname=u'u1',
 
                                   active=True))
 
        u1 = u1.user_id
 
        Session().commit()
 

	
 
        response = self.app.get(url('notifications'))
 
        response.mustcontain('<div class="table">No notifications here yet</div>')
 

	
 
        cur_user = self._get_logged_user()
 
        notif = NotificationModel().create(created_by=u1, subject=u'test_notification_1',
 
                                           body=u'notification_1', recipients=[cur_user])
 
        Session().commit()
 
        with test_context(self.app):
 
            cur_user = self._get_logged_user()
 
            notif = NotificationModel().create(created_by=u1, subject=u'test_notification_1',
 
                                               body=u'notification_1', recipients=[cur_user])
 
            Session().commit()
 

	
 
        response = self.app.get(url('notifications'))
 
        response.mustcontain('id="notification_%s"' % notif.notification_id)
 

	
 
    def test_delete(self, create_test_user):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 

	
 
        u1 = create_test_user(dict(username='u1', password='qweqwe',
 
                                   email='u1@example.com',
 
                                   firstname=u'u1', lastname=u'u1',
 
                                   active=True))
 
        u2 = create_test_user(dict(username='u2', password='qweqwe',
 
                                   email='u2@example.com',
 
                                   firstname=u'u2', lastname=u'u2',
 
                                   active=True))
 
        with test_context(self.app):
 
            u1 = create_test_user(dict(username='u1', password='qweqwe',
 
                                       email='u1@example.com',
 
                                       firstname=u'u1', lastname=u'u1',
 
                                       active=True))
 
            u2 = create_test_user(dict(username='u2', password='qweqwe',
 
                                       email='u2@example.com',
 
                                       firstname=u'u2', lastname=u'u2',
 
                                       active=True))
 

	
 
        # make notifications
 
        notification = NotificationModel().create(created_by=cur_user,
 
                                                  subject=u'test',
 
                                                  body=u'hi there',
 
                                                  recipients=[cur_user, u1, u2])
 
        Session().commit()
 
        u1 = User.get(u1.user_id)
 
        u2 = User.get(u2.user_id)
 
            # make notifications
 
            notification = NotificationModel().create(created_by=cur_user,
 
                                                      subject=u'test',
 
                                                      body=u'hi there',
 
                                                      recipients=[cur_user, u1, u2])
 
            Session().commit()
 
            u1 = User.get(u1.user_id)
 
            u2 = User.get(u2.user_id)
 

	
 
        # check DB
 
        get_notif = lambda un: [x.notification for x in un]
 
        assert get_notif(cur_user.notifications) == [notification]
 
        assert get_notif(u1.notifications) == [notification]
 
        assert get_notif(u2.notifications) == [notification]
 
        cur_usr_id = cur_user.user_id
 

	
 
        response = self.app.post(
 
            url('notification_delete', notification_id=notification.notification_id),
 
            params={'_authentication_token': self.authentication_token()})
 
        assert response.body == 'ok'
 

	
 
        cur_user = User.get(cur_usr_id)
 
        assert cur_user.notifications == []
 

	
 
    def test_show(self, create_test_user):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 
        u1 = create_test_user(dict(username='u1', password='qweqwe',
 
                                   email='u1@example.com',
 
                                   firstname=u'u1', lastname=u'u1',
 
                                   active=True))
 
        u2 = create_test_user(dict(username='u2', password='qweqwe',
 
                                   email='u2@example.com',
 
                                   firstname=u'u2', lastname=u'u2',
 
                                   active=True))
 
        Session().commit()
 
        with test_context(self.app):
 
            cur_user = self._get_logged_user()
 
            u1 = create_test_user(dict(username='u1', password='qweqwe',
 
                                       email='u1@example.com',
 
                                       firstname=u'u1', lastname=u'u1',
 
                                       active=True))
 
            u2 = create_test_user(dict(username='u2', password='qweqwe',
 
                                       email='u2@example.com',
 
                                       firstname=u'u2', lastname=u'u2',
 
                                       active=True))
 
            Session().commit()
 

	
 
        subject = u'test'
 
        notif_body = u'hi there'
 
        notification = NotificationModel().create(created_by=cur_user,
 
                                                  subject=subject,
 
                                                  body=notif_body,
 
                                                  recipients=[cur_user, u1, u2])
 
            subject = u'test'
 
            notif_body = u'hi there'
 
            notification = NotificationModel().create(created_by=cur_user,
 
                                                      subject=subject,
 
                                                      body=notif_body,
 
                                                      recipients=[cur_user, u1, u2])
 

	
 
        response = self.app.get(url('notification',
 
                                    notification_id=notification.notification_id))
 

	
 
        response.mustcontain(subject)
 
        response.mustcontain(notif_body)
 

	
 
    def test_description_with_age(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 
        subject = u'test'
 
        notify_body = u'hi there'
 
        notification = NotificationModel().create(created_by = cur_user,
 
                                                  subject    = subject,
 
                                                  body       = notify_body)
 
        with test_context(self.app):
 
            cur_user = self._get_logged_user()
 
            subject = u'test'
 
            notify_body = u'hi there'
 

	
 
        description = NotificationModel().make_description(notification)
 
        assert description == "{0} sent message {1}".format(
 
                cur_user.username,
 
                h.age(notification.created_on)
 
                )
 
            notification = NotificationModel().create(created_by = cur_user,
 
                                                      subject    = subject,
 
                                                      body       = notify_body)
 

	
 
            description = NotificationModel().make_description(notification)
 
            assert description == "{0} sent message {1}".format(
 
                    cur_user.username,
 
                    h.age(notification.created_on)
 
                    )
 

	
 
    def test_description_with_datetime(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 
        subject = u'test'
 
        notify_body = u'hi there'
 
        notification = NotificationModel().create(created_by = cur_user,
 
                                                  subject    = subject,
 
                                                  body       = notify_body)
 
        with test_context(self.app):
 
            cur_user = self._get_logged_user()
 
            subject = u'test'
 
            notify_body = u'hi there'
 
            notification = NotificationModel().create(created_by = cur_user,
 
                                                      subject    = subject,
 
                                                      body       = notify_body)
 

	
 
        description = NotificationModel().make_description(notification, False)
 
        assert description == "{0} sent message at {1}".format(
 
                cur_user.username,
 
                h.fmt_date(notification.created_on)
 
                )
 
            description = NotificationModel().make_description(notification, False)
 
            assert description == "{0} sent message at {1}".format(
 
                    cur_user.username,
 
                    h.fmt_date(notification.created_on)
 
                    )
 

	
 
    def test_mark_all_read(self, create_test_user):
 
        self.log_user()
 
        u0 = self._get_logged_user()
 
        u1 = create_test_user(dict(username='u1', password='qweqwe',
 
                                   email='u1@example.com',
 
                                   firstname=u'u1', lastname=u'u1',
 
                                   active=True))
 
        u2 = create_test_user(dict(username='u2', password='qweqwe',
 
                                   email='u2@example.com',
 
                                   firstname=u'u2', lastname=u'u2',
 
                                   active=True))
 
        notif = NotificationModel().create(created_by=u1,
 
                                           subject=u'subject',
 
                                           body=u'body',
 
                                           recipients=[u0, u2])
 
        u0_id, u1_id, u2_id = u0.user_id, u1.user_id, u2.user_id
 
        with test_context(self.app):
 
            u0 = self._get_logged_user()
 
            u1 = create_test_user(dict(username='u1', password='qweqwe',
 
                                       email='u1@example.com',
 
                                       firstname=u'u1', lastname=u'u1',
 
                                       active=True))
 
            u2 = create_test_user(dict(username='u2', password='qweqwe',
 
                                       email='u2@example.com',
 
                                       firstname=u'u2', lastname=u'u2',
 
                                       active=True))
 
            notif = NotificationModel().create(created_by=u1,
 
                                               subject=u'subject',
 
                                               body=u'body',
 
                                               recipients=[u0, u2])
 
            u0_id, u1_id, u2_id = u0.user_id, u1.user_id, u2.user_id
 

	
 
        assert [n.read for n in u0.notifications] == [False]
 
        assert u1.notifications == []
 
        assert [n.read for n in u2.notifications] == [False]
 
            assert [n.read for n in u0.notifications] == [False]
 
            assert u1.notifications == []
 
            assert [n.read for n in u2.notifications] == [False]
 

	
 
        # Mark all read for current user.
 

	
 
        response = self.app.get(url('notifications_mark_all_read'), # TODO: should be POST
 
                                extra_environ=dict(HTTP_X_PARTIAL_XHR='1'))
 

	
 
        assert response.status_int == 200
 
        response.mustcontain('id="notification_%s"' % notif.notification_id)
 

	
 
        u0 = User.get(u0_id)
 
        u1 = User.get(u1_id)
 
        u2 = User.get(u2_id)
 

	
 
        assert [n.read for n in u0.notifications] == [True]
 
        assert u1.notifications == []
 
        assert [n.read for n in u2.notifications] == [False]
kallithea/tests/functional/test_admin_users.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
from sqlalchemy.orm.exc import NoResultFound, ObjectDeletedError
 

	
 
import pytest
 
from kallithea.tests.base import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.tests.test_context import test_context
 
from kallithea.controllers.admin.users import UsersController
 
from kallithea.model.db import User, Permission, UserIpMap, UserApiKeys
 
from kallithea.lib.auth import check_password
 
from kallithea.model.user import UserModel
 
from kallithea.model import validators
 
from kallithea.lib import helpers as h
 
from kallithea.model.meta import Session
 
from webob.exc import HTTPNotFound
 

	
 
fixture = Fixture()
 

	
 
@pytest.yield_fixture
 
def user_and_repo_group_fail():
 
    username = 'repogrouperr'
 
    groupname = u'repogroup_fail'
 
    user = fixture.create_user(name=username)
 
    repo_group = fixture.create_repo_group(name=groupname, cur_user=username)
 
    yield user, repo_group
 
    # cleanup
 
    try:
 
        fixture.destroy_repo_group(repo_group)
 
    except ObjectDeletedError:
 
        # delete already succeeded in test body
 
        pass
 

	
 
class TestAdminUsersController(TestController):
 
    test_user_1 = 'testme'
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        if User.get_by_username(cls.test_user_1):
 
            UserModel().delete(cls.test_user_1)
 
            Session().commit()
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('users'))
 
        # TODO: Test response...
 

	
 
    def test_create(self):
 
        self.log_user()
 
        username = 'newtestuser'
 
        password = 'test12'
 
        password_confirmation = password
 
        name = u'name'
 
        lastname = u'lastname'
 
        email = 'mail@example.com'
 

	
 
@@ -468,112 +469,113 @@ class TestAdminUsersController(TestContr
 
                 {'description': desc, 'lifetime': lifetime, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        try:
 
            response = response.follow()
 
            user = User.get(user_id)
 
            for api_key in user.api_keys:
 
                response.mustcontain(api_key)
 
        finally:
 
            for api_key in UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all():
 
                Session().delete(api_key)
 
                Session().commit()
 

	
 
    def test_remove_api_key(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 

	
 
        response = self.app.post(url('edit_user_api_keys_update', id=user_id),
 
                {'description': 'desc', 'lifetime': -1, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        response = response.follow()
 

	
 
        #now delete our key
 
        keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
 
        assert 1 == len(keys)
 

	
 
        response = self.app.post(url('edit_user_api_keys_delete', id=user_id),
 
                 {'del_api_key': keys[0].api_key, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully deleted')
 
        keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
 
        assert 0 == len(keys)
 

	
 
    def test_reset_main_api_key(self):
 
        self.log_user()
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 
        api_key = user.api_key
 
        response = self.app.get(url('edit_user_api_keys', id=user_id))
 
        response.mustcontain(api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
        response = self.app.post(url('edit_user_api_keys_delete', id=user_id),
 
                 {'del_api_key_builtin': api_key, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully reset')
 
        response = response.follow()
 
        response.mustcontain(no=[api_key])
 

	
 

	
 
class TestAdminUsersController_unittest(object):
 
class TestAdminUsersController_unittest(TestController):
 
    """ Unit tests for the users controller """
 

	
 
    def test_get_user_or_raise_if_default(self, monkeypatch):
 
        # flash complains about an non-existing session
 
        def flash_mock(*args, **kwargs):
 
            pass
 
        monkeypatch.setattr(h, 'flash', flash_mock)
 
        with test_context(self.app):
 
            # flash complains about an non-existing session
 
            def flash_mock(*args, **kwargs):
 
                pass
 
            monkeypatch.setattr(h, 'flash', flash_mock)
 

	
 
        u = UsersController()
 
        # a regular user should work correctly
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        assert u._get_user_or_raise_if_default(user.user_id) == user
 
        # the default user should raise
 
        with pytest.raises(HTTPNotFound):
 
            u._get_user_or_raise_if_default(User.get_default_user().user_id)
 
            u = UsersController()
 
            # a regular user should work correctly
 
            user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
            assert u._get_user_or_raise_if_default(user.user_id) == user
 
            # the default user should raise
 
            with pytest.raises(HTTPNotFound):
 
                u._get_user_or_raise_if_default(User.get_default_user().user_id)
 

	
 

	
 
class TestAdminUsersControllerForDefaultUser(TestController):
 
    """
 
    Edit actions on the default user are not allowed.
 
    Validate that they throw a 404 exception.
 
    """
 
    def test_edit_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user', id=user.user_id), status=404)
 

	
 
    def test_edit_advanced_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_advanced', id=user.user_id), status=404)
 

	
 
    # API keys
 
    def test_edit_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_api_keys', id=user.user_id), status=404)
 

	
 
    def test_add_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_api_keys_update', id=user.user_id),
 
                 {'_authentication_token': self.authentication_token()}, status=404)
 

	
 
    def test_delete_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_api_keys_delete', id=user.user_id),
 
                 {'_authentication_token': self.authentication_token()}, status=404)
 

	
 
    # Permissions
 
    def test_edit_perms_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_perms', id=user.user_id), status=404)
 

	
 
    def test_update_perms_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_perms_update', id=user.user_id),
 
                 {'_authentication_token': self.authentication_token()}, status=404)
 

	
 
    # Emails
kallithea/tests/models/test_notifications.py
Show inline comments
 
import os
 
import re
 

	
 
import mock
 
import routes.util
 

	
 
from kallithea.tests.base import *
 
from kallithea.lib import helpers as h
 
from kallithea.model.db import User, Notification, UserNotification
 
from kallithea.model.user import UserModel
 
from kallithea.model.meta import Session
 
from kallithea.model.notification import NotificationModel, EmailNotificationModel
 

	
 
import kallithea.lib.celerylib
 
import kallithea.lib.celerylib.tasks
 

	
 
from kallithea.tests.test_context import test_context
 

	
 
class TestNotifications(TestController):
 

	
 
    def setup_method(self, method):
 
        Session.remove()
 
        u1 = UserModel().create_or_update(username=u'u1',
 
                                        password=u'qweqwe',
 
                                        email=u'u1@example.com',
 
                                        firstname=u'u1', lastname=u'u1')
 
        Session().commit()
 
        self.u1 = u1.user_id
 

	
 
        u2 = UserModel().create_or_update(username=u'u2',
 
                                        password=u'qweqwe',
 
                                        email=u'u2@example.com',
 
                                        firstname=u'u2', lastname=u'u3')
 
        Session().commit()
 
        self.u2 = u2.user_id
 

	
 
        u3 = UserModel().create_or_update(username=u'u3',
 
                                        password=u'qweqwe',
 
                                        email=u'u3@example.com',
 
                                        firstname=u'u3', lastname=u'u3')
 
        Session().commit()
 
        self.u3 = u3.user_id
 

	
 
        self.remove_all_notifications()
 
        assert [] == Notification.query().all()
 
        assert [] == UserNotification.query().all()
 

	
 
    def test_create_notification(self):
 
        usrs = [self.u1, self.u2]
 
        def send_email(recipients, subject, body='', html_body='', headers=None, author=None):
 
            assert recipients == ['u2@example.com']
 
            assert subject == 'Test Message'
 
            assert body == u"hi there"
 
            assert '>hi there<' in html_body
 
            assert author.username == 'u1'
 
        with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', send_email):
 
            notification = NotificationModel().create(created_by=self.u1,
 
                                               subject=u'subj', body=u'hi there',
 
                                               recipients=usrs)
 
        Session().commit()
 
        u1 = User.get(self.u1)
 
        u2 = User.get(self.u2)
 
        u3 = User.get(self.u3)
 
        notifications = Notification.query().all()
 
        assert len(notifications) == 1
 
        with test_context(self.app):
 
            usrs = [self.u1, self.u2]
 
            def send_email(recipients, subject, body='', html_body='', headers=None, author=None):
 
                assert recipients == ['u2@example.com']
 
                assert subject == 'Test Message'
 
                assert body == u"hi there"
 
                assert '>hi there<' in html_body
 
                assert author.username == 'u1'
 
            with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', send_email):
 
                notification = NotificationModel().create(created_by=self.u1,
 
                                                   subject=u'subj', body=u'hi there',
 
                                                   recipients=usrs)
 
                Session().commit()
 
                u1 = User.get(self.u1)
 
                u2 = User.get(self.u2)
 
                u3 = User.get(self.u3)
 
                notifications = Notification.query().all()
 
                assert len(notifications) == 1
 

	
 
        assert notifications[0].recipients == [u1, u2]
 
        assert notification.notification_id == notifications[0].notification_id
 
                assert notifications[0].recipients == [u1, u2]
 
                assert notification.notification_id == notifications[0].notification_id
 

	
 
        unotification = UserNotification.query() \
 
            .filter(UserNotification.notification == notification).all()
 
                unotification = UserNotification.query() \
 
                    .filter(UserNotification.notification == notification).all()
 

	
 
        assert len(unotification) == len(usrs)
 
        assert set([x.user_id for x in unotification]) == set(usrs)
 
                assert len(unotification) == len(usrs)
 
                assert set([x.user_id for x in unotification]) == set(usrs)
 

	
 
    def test_user_notifications(self):
 
        notification1 = NotificationModel().create(created_by=self.u1,
 
                                            subject=u'subj', body=u'hi there1',
 
                                            recipients=[self.u3])
 
        Session().commit()
 
        notification2 = NotificationModel().create(created_by=self.u1,
 
                                            subject=u'subj', body=u'hi there2',
 
                                            recipients=[self.u3])
 
        Session().commit()
 
        u3 = Session().query(User).get(self.u3)
 
        with test_context(self.app):
 
            notification1 = NotificationModel().create(created_by=self.u1,
 
                                                subject=u'subj', body=u'hi there1',
 
                                                recipients=[self.u3])
 
            Session().commit()
 
            notification2 = NotificationModel().create(created_by=self.u1,
 
                                                subject=u'subj', body=u'hi there2',
 
                                                recipients=[self.u3])
 
            Session().commit()
 
            u3 = Session().query(User).get(self.u3)
 

	
 
        assert sorted([x.notification for x in u3.notifications]) == sorted([notification2, notification1])
 
            assert sorted([x.notification for x in u3.notifications]) == sorted([notification2, notification1])
 

	
 
    def test_delete_notifications(self):
 
        notification = NotificationModel().create(created_by=self.u1,
 
                                           subject=u'title', body=u'hi there3',
 
                                    recipients=[self.u3, self.u1, self.u2])
 
        Session().commit()
 
        notifications = Notification.query().all()
 
        assert notification in notifications
 
        with test_context(self.app):
 
            notification = NotificationModel().create(created_by=self.u1,
 
                                               subject=u'title', body=u'hi there3',
 
                                        recipients=[self.u3, self.u1, self.u2])
 
            Session().commit()
 
            notifications = Notification.query().all()
 
            assert notification in notifications
 

	
 
        Notification.delete(notification.notification_id)
 
        Session().commit()
 
            Notification.delete(notification.notification_id)
 
            Session().commit()
 

	
 
        notifications = Notification.query().all()
 
        assert not notification in notifications
 
            notifications = Notification.query().all()
 
            assert not notification in notifications
 

	
 
        un = UserNotification.query().filter(UserNotification.notification
 
                                             == notification).all()
 
        assert un == []
 
            un = UserNotification.query().filter(UserNotification.notification
 
                                                 == notification).all()
 
            assert un == []
 

	
 
    def test_delete_association(self):
 
        notification = NotificationModel().create(created_by=self.u1,
 
                                           subject=u'title', body=u'hi there3',
 
                                    recipients=[self.u3, self.u1, self.u2])
 
        Session().commit()
 
        with test_context(self.app):
 
            notification = NotificationModel().create(created_by=self.u1,
 
                                               subject=u'title', body=u'hi there3',
 
                                        recipients=[self.u3, self.u1, self.u2])
 
            Session().commit()
 

	
 
        unotification = UserNotification.query() \
 
                            .filter(UserNotification.notification ==
 
                                    notification) \
 
                            .filter(UserNotification.user_id == self.u3) \
 
                            .scalar()
 
            unotification = UserNotification.query() \
 
                                .filter(UserNotification.notification ==
 
                                        notification) \
 
                                .filter(UserNotification.user_id == self.u3) \
 
                                .scalar()
 

	
 
        assert unotification.user_id == self.u3
 
            assert unotification.user_id == self.u3
 

	
 
        NotificationModel().delete(self.u3,
 
                                   notification.notification_id)
 
        Session().commit()
 
            NotificationModel().delete(self.u3,
 
                                       notification.notification_id)
 
            Session().commit()
 

	
 
        u3notification = UserNotification.query() \
 
                            .filter(UserNotification.notification ==
 
                                    notification) \
 
                            .filter(UserNotification.user_id == self.u3) \
 
                            .scalar()
 
            u3notification = UserNotification.query() \
 
                                .filter(UserNotification.notification ==
 
                                        notification) \
 
                                .filter(UserNotification.user_id == self.u3) \
 
                                .scalar()
 

	
 
        assert u3notification == None
 
            assert u3notification == None
 

	
 
        # notification object is still there
 
        assert Notification.query().all() == [notification]
 
            # notification object is still there
 
            assert Notification.query().all() == [notification]
 

	
 
        #u1 and u2 still have assignments
 
        u1notification = UserNotification.query() \
 
                            .filter(UserNotification.notification ==
 
                                    notification) \
 
                            .filter(UserNotification.user_id == self.u1) \
 
                            .scalar()
 
        assert u1notification != None
 
        u2notification = UserNotification.query() \
 
                            .filter(UserNotification.notification ==
 
                                    notification) \
 
                            .filter(UserNotification.user_id == self.u2) \
 
                            .scalar()
 
        assert u2notification != None
 
            #u1 and u2 still have assignments
 
            u1notification = UserNotification.query() \
 
                                .filter(UserNotification.notification ==
 
                                        notification) \
 
                                .filter(UserNotification.user_id == self.u1) \
 
                                .scalar()
 
            assert u1notification != None
 
            u2notification = UserNotification.query() \
 
                                .filter(UserNotification.notification ==
 
                                        notification) \
 
                                .filter(UserNotification.user_id == self.u2) \
 
                                .scalar()
 
            assert u2notification != None
 

	
 
    def test_notification_counter(self):
 
        NotificationModel().create(created_by=self.u1,
 
                            subject=u'title', body=u'hi there_delete',
 
                            recipients=[self.u3, self.u1])
 
        Session().commit()
 

	
 
        assert NotificationModel().get_unread_cnt_for_user(self.u1) == 0
 
        assert NotificationModel().get_unread_cnt_for_user(self.u2) == 0
 
        assert NotificationModel().get_unread_cnt_for_user(self.u3) == 1
 
        with test_context(self.app):
 
            NotificationModel().create(created_by=self.u1,
 
                                subject=u'title', body=u'hi there_delete',
 
                                recipients=[self.u3, self.u1])
 
            Session().commit()
 

	
 
        notification = NotificationModel().create(created_by=self.u1,
 
                                           subject=u'title', body=u'hi there3',
 
                                    recipients=[self.u3, self.u1, self.u2])
 
        Session().commit()
 
            assert NotificationModel().get_unread_cnt_for_user(self.u1) == 0
 
            assert NotificationModel().get_unread_cnt_for_user(self.u2) == 0
 
            assert NotificationModel().get_unread_cnt_for_user(self.u3) == 1
 

	
 
        assert NotificationModel().get_unread_cnt_for_user(self.u1) == 0
 
        assert NotificationModel().get_unread_cnt_for_user(self.u2) == 1
 
        assert NotificationModel().get_unread_cnt_for_user(self.u3) == 2
 
            notification = NotificationModel().create(created_by=self.u1,
 
                                               subject=u'title', body=u'hi there3',
 
                                        recipients=[self.u3, self.u1, self.u2])
 
            Session().commit()
 

	
 
            assert NotificationModel().get_unread_cnt_for_user(self.u1) == 0
 
            assert NotificationModel().get_unread_cnt_for_user(self.u2) == 1
 
            assert NotificationModel().get_unread_cnt_for_user(self.u3) == 2
 

	
 
    @mock.patch.object(h, 'canonical_url', (lambda arg, **kwargs: 'http://%s/?%s' % (arg, '&'.join('%s=%s' % (k, v) for (k, v) in sorted(kwargs.items())))))
 
    def test_dump_html_mails(self):
 
        # Exercise all notification types and dump them to one big html file
 
        l = []
 

	
 
        def send_email(recipients, subject, body='', html_body='', headers=None, author=None):
 
            l.append('<hr/>\n')
 
            l.append('<h1>%s</h1>\n' % desc) # desc is from outer scope
 
            l.append('<pre>\n')
 
            l.append('From: %s\n' % author.username)
 
            l.append('To: %s\n' % ' '.join(recipients))
 
            l.append('Subject: %s\n' % subject)
 
            l.append('</pre>\n')
 
            l.append('<hr/>\n')
 
            l.append('<pre>%s</pre>\n' % body)
 
            l.append('<hr/>\n')
 
            l.append(html_body)
 
            l.append('<hr/>\n')
 

	
 
        with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', send_email):
 
            pr_kwargs = dict(
 
                pr_nice_id='#7',
 
                pr_title='The Title',
 
                pr_title_short='The Title',
 
                pr_url='http://pr.org/7',
 
                pr_target_repo='http://mainline.com/repo',
 
                pr_target_branch='trunk',
 
                pr_source_repo='https://dev.org/repo',
 
                pr_source_branch='devbranch',
 
                pr_owner=User.get(self.u2),
 
                pr_owner_username='u2'
 
                )
 
        with test_context(self.app):
 
            with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', send_email):
 
                pr_kwargs = dict(
 
                    pr_nice_id='#7',
 
                    pr_title='The Title',
 
                    pr_title_short='The Title',
 
                    pr_url='http://pr.org/7',
 
                    pr_target_repo='http://mainline.com/repo',
 
                    pr_target_branch='trunk',
 
                    pr_source_repo='https://dev.org/repo',
 
                    pr_source_branch='devbranch',
 
                    pr_owner=User.get(self.u2),
 
                    pr_owner_username='u2'
 
                    )
 

	
 
            for type_, body, kwargs in [
 
                (Notification.TYPE_CHANGESET_COMMENT,
 
                 u'This is the new comment.\n\n - and here it ends indented.',
 
                 dict(
 
                    short_id='cafe1234',
 
                    raw_id='cafe1234c0ffeecafe',
 
                    branch='brunch',
 
                    cs_comment_user='Opinionated User (jsmith)',
 
                    cs_comment_url='http://comment.org',
 
                    is_mention=[False, True],
 
                    message='This changeset did something clever which is hard to explain',
 
                    message_short='This changeset did something cl...',
 
                    status_change=[None, 'Approved'],
 
                    cs_target_repo='repo_target',
 
                    cs_url='http://changeset.com',
 
                    cs_author=User.get(self.u2))),
 
                (Notification.TYPE_MESSAGE,
 
                 u'This is the body of the test message\n - nothing interesting here except indentation.',
 
                 dict()),
 
                #(Notification.TYPE_MENTION, '$body', None), # not used
 
                (Notification.TYPE_REGISTRATION,
 
                 u'Registration body',
 
                 dict(
 
                    new_username='newbie',
 
                    registered_user_url='http://newbie.org',
 
                    new_email='new@email.com',
 
                    new_full_name='New Full Name')),
 
                (Notification.TYPE_PULL_REQUEST,
 
                 u'This PR is awesome because it does stuff\n - please approve indented!',
 
                 dict(
 
                    pr_user_created='Requesting User (root)', # pr_owner should perhaps be used for @mention in description ...
 
                    is_mention=[False, True],
 
                    pr_revisions=[('123abc'*7, "Introduce one and two\n\nand that's it"), ('567fed'*7, 'Make one plus two equal tree')],
 
                    org_repo_name='repo_org',
 
                    **pr_kwargs)),
 
                (Notification.TYPE_PULL_REQUEST_COMMENT,
 
                 u'Me too!\n\n - and indented on second line',
 
                 dict(
 
                    closing_pr=[False, True],
 
                    is_mention=[False, True],
 
                    pr_comment_user='Opinionated User (jsmith)',
 
                    pr_comment_url='http://pr.org/comment',
 
                    status_change=[None, 'Under Review'],
 
                    **pr_kwargs)),
 
                ]:
 
                kwargs['repo_name'] = u'repo/name'
 
                params = [(type_, type_, body, kwargs)]
 
                for param_name in ['is_mention', 'status_change', 'closing_pr']: # TODO: inline/general
 
                    if not isinstance(kwargs.get(param_name), list):
 
                        continue
 
                    new_params = []
 
                    for v in kwargs[param_name]:
 
                        for desc, type_, body, kwargs in params:
 
                            kwargs = dict(kwargs)
 
                            kwargs[param_name] = v
 
                            new_params.append(('%s, %s=%r' % (desc, param_name, v), type_, body, kwargs))
 
                    params = new_params
 
                for type_, body, kwargs in [
 
                    (Notification.TYPE_CHANGESET_COMMENT,
 
                     u'This is the new comment.\n\n - and here it ends indented.',
 
                     dict(
 
                        short_id='cafe1234',
 
                        raw_id='cafe1234c0ffeecafe',
 
                        branch='brunch',
 
                        cs_comment_user='Opinionated User (jsmith)',
 
                        cs_comment_url='http://comment.org',
 
                        is_mention=[False, True],
 
                        message='This changeset did something clever which is hard to explain',
 
                        message_short='This changeset did something cl...',
 
                        status_change=[None, 'Approved'],
 
                        cs_target_repo='repo_target',
 
                        cs_url='http://changeset.com',
 
                        cs_author=User.get(self.u2))),
 
                    (Notification.TYPE_MESSAGE,
 
                     u'This is the body of the test message\n - nothing interesting here except indentation.',
 
                     dict()),
 
                    #(Notification.TYPE_MENTION, '$body', None), # not used
 
                    (Notification.TYPE_REGISTRATION,
 
                     u'Registration body',
 
                     dict(
 
                        new_username='newbie',
 
                        registered_user_url='http://newbie.org',
 
                        new_email='new@email.com',
 
                        new_full_name='New Full Name')),
 
                    (Notification.TYPE_PULL_REQUEST,
 
                     u'This PR is awesome because it does stuff\n - please approve indented!',
 
                     dict(
 
                        pr_user_created='Requesting User (root)', # pr_owner should perhaps be used for @mention in description ...
 
                        is_mention=[False, True],
 
                        pr_revisions=[('123abc'*7, "Introduce one and two\n\nand that's it"), ('567fed'*7, 'Make one plus two equal tree')],
 
                        org_repo_name='repo_org',
 
                        **pr_kwargs)),
 
                    (Notification.TYPE_PULL_REQUEST_COMMENT,
 
                     u'Me too!\n\n - and indented on second line',
 
                     dict(
 
                        closing_pr=[False, True],
 
                        is_mention=[False, True],
 
                        pr_comment_user='Opinionated User (jsmith)',
 
                        pr_comment_url='http://pr.org/comment',
 
                        status_change=[None, 'Under Review'],
 
                        **pr_kwargs)),
 
                    ]:
 
                    kwargs['repo_name'] = u'repo/name'
 
                    params = [(type_, type_, body, kwargs)]
 
                    for param_name in ['is_mention', 'status_change', 'closing_pr']: # TODO: inline/general
 
                        if not isinstance(kwargs.get(param_name), list):
 
                            continue
 
                        new_params = []
 
                        for v in kwargs[param_name]:
 
                            for desc, type_, body, kwargs in params:
 
                                kwargs = dict(kwargs)
 
                                kwargs[param_name] = v
 
                                new_params.append(('%s, %s=%r' % (desc, param_name, v), type_, body, kwargs))
 
                        params = new_params
 

	
 
                for desc, type_, body, kwargs in params:
 
                    # desc is used as "global" variable
 
                    notification = NotificationModel().create(created_by=self.u1,
 
                                                       subject=u'unused', body=body, email_kwargs=kwargs,
 
                                                       recipients=[self.u2], type_=type_)
 
                    for desc, type_, body, kwargs in params:
 
                        # desc is used as "global" variable
 
                        notification = NotificationModel().create(created_by=self.u1,
 
                                                           subject=u'unused', body=body, email_kwargs=kwargs,
 
                                                           recipients=[self.u2], type_=type_)
 

	
 
            # Email type TYPE_PASSWORD_RESET has no corresponding notification type - test it directly:
 
            desc = 'TYPE_PASSWORD_RESET'
 
            kwargs = dict(user='John Doe', reset_token='decbf64715098db5b0bd23eab44bd792670ab746', reset_url='http://reset.com/decbf64715098db5b0bd23eab44bd792670ab746')
 
            kallithea.lib.celerylib.tasks.send_email(['john@doe.com'],
 
                "Password reset link",
 
                EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'txt', **kwargs),
 
                EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'html', **kwargs),
 
                author=User.get(self.u1))
 
                # Email type TYPE_PASSWORD_RESET has no corresponding notification type - test it directly:
 
                desc = 'TYPE_PASSWORD_RESET'
 
                kwargs = dict(user='John Doe', reset_token='decbf64715098db5b0bd23eab44bd792670ab746', reset_url='http://reset.com/decbf64715098db5b0bd23eab44bd792670ab746')
 
                kallithea.lib.celerylib.tasks.send_email(['john@doe.com'],
 
                    "Password reset link",
 
                    EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'txt', **kwargs),
 
                    EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'html', **kwargs),
 
                    author=User.get(self.u1))
 

	
 
        out = '<!doctype html>\n<html lang="en">\n<head><title>Notifications</title><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"></head>\n<body>\n%s\n</body>\n</html>\n' % \
 
            re.sub(r'<(/?(?:!doctype|html|head|title|meta|body)\b[^>]*)>', r'<!--\1-->', ''.join(l))
 

	
 
        outfn = os.path.join(os.path.dirname(__file__), 'test_dump_html_mails.out.html')
 
        reffn = os.path.join(os.path.dirname(__file__), 'test_dump_html_mails.ref.html')
 
        with file(outfn, 'w') as f:
 
            f.write(out)
 
        with file(reffn) as f:
 
            ref = f.read()
 
        assert ref == out # copy test_dump_html_mails.out.html to test_dump_html_mails.ref.html to update expectations
 
        os.unlink(outfn)
kallithea/tests/other/test_libs.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.tests.other.test_libs
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Package for testing various lib/helper functions in kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 9, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import datetime
 
import hashlib
 
import mock
 
from kallithea.tests.base import *
 
from kallithea.lib.utils2 import AttributeDict
 
from kallithea.model.db import Repository
 
from kallithea.tests.test_context import test_context
 

	
 
proto = 'http'
 
TEST_URLS = [
 
    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
 
     '%s://127.0.0.1:8080' % proto),
 
    ('%s://example.com' % proto, ['%s://' % proto, 'example.com'],
 
     '%s://example.com' % proto),
 
    ('%s://user:pass@example.com:8080' % proto, ['%s://' % proto, 'example.com',
 
                                                '8080'],
 
     '%s://example.com:8080' % proto),
 
]
 

	
 
proto = 'https'
 
TEST_URLS += [
 
    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
 
     '%s://127.0.0.1:8080' % proto),
 
    ('%s://example.com' % proto, ['%s://' % proto, 'example.com'],
 
     '%s://example.com' % proto),
 
    ('%s://user:pass@example.com:8080' % proto, ['%s://' % proto, 'example.com',
 
                                                '8080'],
 
     '%s://example.com:8080' % proto),
 
]
 

	
 
class FakeUrlGenerator(object):
 

	
 
    def __init__(self, current_url=None, default_route=None, **routes):
 
        """Initialize using specified 'current' URL template,
 
        default route template, and all other aguments describing known
 
        routes (format: route=template)"""
 
        self.current_url = current_url
 
        self.default_route = default_route
 
        self.routes = routes
 

	
 
    def __call__(self, route_name, *args, **kwargs):
 
        if route_name in self.routes:
 
            return self.routes[route_name] % kwargs
 
@@ -109,141 +110,144 @@ class TestLibs(TestController):
 
                           ('Y', True),
 
                           ('TRUE', True),
 
                           ('T', True),
 
                           ('False', False),
 
                           ('F', False),
 
                           ('FALSE', False),
 
                           ('0', False),
 
                           ('-1', False),
 
                           ('', False)
 
    ])
 
    def test_str2bool(self, str_bool, expected):
 
        from kallithea.lib.utils2 import str2bool
 
        assert str2bool(str_bool) == expected
 

	
 
    def test_mention_extractor(self):
 
        from kallithea.lib.utils2 import extract_mentioned_usernames
 
        sample = (
 
            "@first hi there @world here's my email username@example.com "
 
            "@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three "
 
            "@UPPER    @cAmEL @2one_more22 @john please see this http://org.pl "
 
            "@marian.user just do it @marco-polo and next extract @marco_polo "
 
            "user.dot  hej ! not-needed maril@example.com"
 
        )
 

	
 
        expected = set([
 
            '2one_more22', 'first', 'lukaszb', 'one', 'one_more22', 'UPPER', 'cAmEL', 'john',
 
            'marian.user', 'marco-polo', 'marco_polo', 'world'])
 
        assert expected == set(extract_mentioned_usernames(sample))
 

	
 
    @parametrize('age_args,expected', [
 
        (dict(), u'just now'),
 
        (dict(seconds= -1), u'1 second ago'),
 
        (dict(seconds= -60 * 2), u'2 minutes ago'),
 
        (dict(hours= -1), u'1 hour ago'),
 
        (dict(hours= -24), u'1 day ago'),
 
        (dict(hours= -24 * 5), u'5 days ago'),
 
        (dict(months= -1), u'1 month ago'),
 
        (dict(months= -1, days= -2), u'1 month and 2 days ago'),
 
        (dict(months= -1, days= -20), u'1 month and 19 days ago'),
 
        (dict(years= -1, months= -1), u'1 year and 1 month ago'),
 
        (dict(years= -1, months= -10), u'1 year and 10 months ago'),
 
        (dict(years= -2, months= -4), u'2 years and 4 months ago'),
 
        (dict(years= -2, months= -11), u'2 years and 11 months ago'),
 
        (dict(years= -3, months= -2), u'3 years and 2 months ago'),
 
    ])
 
    def test_age(self, age_args, expected):
 
        from kallithea.lib.utils2 import age
 
        from dateutil import relativedelta
 
        n = datetime.datetime(year=2012, month=5, day=17)
 
        delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
        assert age(n + delt(**age_args), now=n) == expected
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), now=n) == expected
 

	
 
    @parametrize('age_args,expected', [
 
        (dict(), u'just now'),
 
        (dict(seconds= -1), u'1 second ago'),
 
        (dict(seconds= -60 * 2), u'2 minutes ago'),
 
        (dict(hours= -1), u'1 hour ago'),
 
        (dict(hours= -24), u'1 day ago'),
 
        (dict(hours= -24 * 5), u'5 days ago'),
 
        (dict(months= -1), u'1 month ago'),
 
        (dict(months= -1, days= -2), u'1 month ago'),
 
        (dict(months= -1, days= -20), u'1 month ago'),
 
        (dict(years= -1, months= -1), u'13 months ago'),
 
        (dict(years= -1, months= -10), u'22 months ago'),
 
        (dict(years= -2, months= -4), u'2 years ago'),
 
        (dict(years= -2, months= -11), u'3 years ago'),
 
        (dict(years= -3, months= -2), u'3 years ago'),
 
        (dict(years= -4, months= -8), u'5 years ago'),
 
    ])
 
    def test_age_short(self, age_args, expected):
 
        from kallithea.lib.utils2 import age
 
        from dateutil import relativedelta
 
        n = datetime.datetime(year=2012, month=5, day=17)
 
        delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
        assert age(n + delt(**age_args), show_short_version=True, now=n) == expected
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), show_short_version=True, now=n) == expected
 

	
 
    @parametrize('age_args,expected', [
 
        (dict(), u'just now'),
 
        (dict(seconds=1), u'in 1 second'),
 
        (dict(seconds=60 * 2), u'in 2 minutes'),
 
        (dict(hours=1), u'in 1 hour'),
 
        (dict(hours=24), u'in 1 day'),
 
        (dict(hours=24 * 5), u'in 5 days'),
 
        (dict(months=1), u'in 1 month'),
 
        (dict(months=1, days=1), u'in 1 month and 1 day'),
 
        (dict(years=1, months=1), u'in 1 year and 1 month')
 
    ])
 
    def test_age_in_future(self, age_args, expected):
 
        from kallithea.lib.utils2 import age
 
        from dateutil import relativedelta
 
        n = datetime.datetime(year=2012, month=5, day=17)
 
        delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
        assert age(n + delt(**age_args), now=n) == expected
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), now=n) == expected
 

	
 
    def test_tag_extractor(self):
 
        sample = (
 
            "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]"
 
            "[requires] [stale] [see<>=>] [see => http://example.com]"
 
            "[requires => url] [lang => python] [just a tag]"
 
            "[,d] [ => ULR ] [obsolete] [desc]]"
 
        )
 
        from kallithea.lib.helpers import urlify_text
 
        res = urlify_text(sample, stylize=True)
 
        assert '<div class="metatag" tag="tag">tag</div>' in res
 
        assert '<div class="metatag" tag="obsolete">obsolete</div>' in res
 
        assert '<div class="metatag" tag="stale">stale</div>' in res
 
        assert '<div class="metatag" tag="lang">python</div>' in res
 
        assert '<div class="metatag" tag="requires">requires =&gt; <a href="/url">url</a></div>' in res
 
        assert '<div class="metatag" tag="tag">tag</div>' in res
 

	
 
    def test_alternative_gravatar(self):
 
        from kallithea.lib.helpers import gravatar_url
 
        _md5 = lambda s: hashlib.md5(s).hexdigest()
 

	
 
        #mock pylons.tmpl_context
 
        def fake_tmpl_context(_url):
 
            _c = AttributeDict()
 
            _c.visual = AttributeDict()
 
            _c.visual.use_gravatar = True
 
            _c.visual.gravatar_url = _url
 

	
 
            return _c
 

	
 
        fake_url = FakeUrlGenerator(current_url='https://example.com')
 
        with mock.patch('kallithea.config.routing.url', fake_url):
 
            fake = fake_tmpl_context(_url='http://example.com/{email}')
 
            with mock.patch('pylons.tmpl_context', fake):
 
                    from kallithea.config.routing import url
 
                    assert url.current() == 'https://example.com'
 
                    grav = gravatar_url(email_address='test@example.com', size=24)
 
                    assert grav == 'http://example.com/test@example.com'
 

	
 
            fake = fake_tmpl_context(_url='http://example.com/{email}')
 
            with mock.patch('pylons.tmpl_context', fake):
 
                grav = gravatar_url(email_address='test@example.com', size=24)
 
                assert grav == 'http://example.com/test@example.com'
 

	
 
            fake = fake_tmpl_context(_url='http://example.com/{md5email}')
 
            with mock.patch('pylons.tmpl_context', fake):
 
                em = 'test@example.com'
 
                grav = gravatar_url(email_address=em, size=24)
0 comments (0 inline, 0 general)