Changeset - fed129fb8533
[Not reviewed]
default
0 5 0
Thomas De Schampheleire - 10 years ago 2016-05-14 21:59:47
thomas.de.schampheleire@gmail.com
pytest migration: backout declassification of remove_all_notifications

In order to accomodate both nose/unittest and pytest at the same time,
method remove_all_notifications has been extracted from BaseTestCase in
commit 37d713674f63.
Now that nose/unittest is no longer used, we can backout this change again.
5 files changed with 13 insertions and 13 deletions:
0 comments (0 inline, 0 general)
kallithea/tests/__init__.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
Pylons application test package
 

	
 
This package assumes the Pylons environment is already loaded.
 

	
 
This module initializes the application via ``websetup`` (`paster
 
setup-app`) and provides the base testing objects.
 

	
 
Refer to docs/contributing.rst for details on running the test suite.
 
"""
 
import os
 
import re
 
import time
 
import logging
 
import datetime
 
import hashlib
 
import tempfile
 
from os.path import join as jn
 

	
 
from tempfile import _RandomNameSequence
 

	
 
import pylons
 
import pylons.test
 
from pylons import config, url
 
from pylons.i18n.translation import _get_translator
 
from pylons.util import ContextObj
 

	
 
from routes.util import URLGenerator
 
from webtest import TestApp
 
import pytest
 

	
 
from kallithea.lib.compat import unittest
 
from kallithea import is_windows
 
from kallithea.model.db import Notification, User, UserNotification
 
from kallithea.model.meta import Session
 
from kallithea.tests.parameterized import parameterized
 
from kallithea.lib.utils2 import safe_str
 

	
 

	
 
os.environ['TZ'] = 'UTC'
 
if not is_windows:
 
    time.tzset()
 

	
 
log = logging.getLogger(__name__)
 

	
 
skipif = pytest.mark.skipif
 
parametrize = pytest.mark.parametrize
 

	
 
__all__ = [
 
    'skipif', 'parametrize', 'environ', 'url', 'TestControllerPytest',
 
    'ldap_lib_installed', 'pam_lib_installed', 'init_stack',
 
    'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
 
    'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
 
    'TEST_USER_ADMIN_EMAIL', 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
 
    'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
 
    'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
 
    'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
 
    'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
 
    'GIT_REMOTE_REPO', 'SCM_TESTS', 'remove_all_notifications',
 
    'GIT_REMOTE_REPO', 'SCM_TESTS',
 
]
 

	
 
# Invoke websetup with the current config file
 
# SetupCommand('setup-app').run([config_file])
 

	
 
environ = {}
 

	
 
#SOME GLOBALS FOR TESTS
 

	
 
TESTS_TMP_PATH = jn(tempfile.gettempdir(), 'rc_test_%s' % _RandomNameSequence().next())
 
TEST_USER_ADMIN_LOGIN = 'test_admin'
 
TEST_USER_ADMIN_PASS = 'test12'
 
TEST_USER_ADMIN_EMAIL = 'test_admin@example.com'
 

	
 
TEST_USER_REGULAR_LOGIN = 'test_regular'
 
TEST_USER_REGULAR_PASS = 'test12'
 
TEST_USER_REGULAR_EMAIL = 'test_regular@example.com'
 

	
 
TEST_USER_REGULAR2_LOGIN = 'test_regular2'
 
TEST_USER_REGULAR2_PASS = 'test12'
 
TEST_USER_REGULAR2_EMAIL = 'test_regular2@example.com'
 

	
 
HG_REPO = u'vcs_test_hg'
 
GIT_REPO = u'vcs_test_git'
 

	
 
NEW_HG_REPO = u'vcs_test_hg_new'
 
NEW_GIT_REPO = u'vcs_test_git_new'
 

	
 
HG_FORK = u'vcs_test_hg_fork'
 
GIT_FORK = u'vcs_test_git_fork'
 

	
 
## VCS
 
SCM_TESTS = ['hg', 'git']
 
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 

	
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 

	
 
TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
 
TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
 

	
 

	
 
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 

	
 
TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 
TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
 
TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
 

	
 
TEST_DIR = tempfile.gettempdir()
 
TEST_REPO_PREFIX = 'vcs-test'
 

	
 
# cached repos if any !
 
# comment out to get some other repos from bb or github
 
GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 

	
 
#skip ldap tests if LDAP lib is not installed
 
ldap_lib_installed = False
 
try:
 
    import ldap
 
    ldap.API_VERSION
 
    ldap_lib_installed = True
 
except ImportError:
 
    # means that python-ldap is not installed
 
    pass
 

	
 
try:
 
    import pam
 
    pam.PAM_TEXT_INFO
 
    pam_lib_installed = True
 
except ImportError:
 
    pam_lib_installed = False
 

	
 
class NullHandler(logging.Handler):
 
    def emit(self, record):
 
        pass
 

	
 
def init_stack(config=None):
 
    if not config:
 
        config = pylons.test.pylonsapp.config
 
    url._push_object(URLGenerator(config['routes.map'], environ))
 
    pylons.app_globals._push_object(config['pylons.app_globals'])
 
    pylons.config._push_object(config)
 
    pylons.tmpl_context._push_object(ContextObj())
 
    # Initialize a translator for tests that utilize i18n
 
    translator = _get_translator(pylons.config.get('lang'))
 
    pylons.translator._push_object(translator)
 
    h = NullHandler()
 
    logging.getLogger("kallithea").addHandler(h)
 

	
 
def remove_all_notifications():
 
    Notification.query().delete()
 

	
 
    # Because query().delete() does not (by default) trigger cascades.
 
    # http://docs.sqlalchemy.org/en/rel_0_7/orm/collections.html#passive-deletes
 
    UserNotification.query().delete()
 

	
 
    Session().commit()
 

	
 
class TestControllerPytest(object):
 
    """Pytest-style test controller"""
 

	
 
    # Note: pytest base classes cannot have an __init__ method
 

	
 
    @pytest.fixture(autouse=True)
 
    def app_fixture(self):
 
        self.wsgiapp = pylons.test.pylonsapp
 
        init_stack(self.wsgiapp.config)
 
        self.app = TestApp(self.wsgiapp)
 
        self.maxDiff = None
 
        self.index_location = config['app_conf']['index_dir']
 
        return self.app
 

	
 
    def remove_all_notifications(self):
 
        Notification.query().delete()
 

	
 
        # Because query().delete() does not (by default) trigger cascades.
 
        # http://docs.sqlalchemy.org/en/rel_0_7/orm/collections.html#passive-deletes
 
        UserNotification.query().delete()
 
        Session().commit()
 

	
 
    def log_user(self, username=TEST_USER_ADMIN_LOGIN,
 
                 password=TEST_USER_ADMIN_PASS):
 
        self._logged_username = username
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': username,
 
                                  'password': password})
 

	
 
        if 'Invalid username or password' in response.body:
 
            pytest.fail('could not login using %s %s' % (username, password))
 

	
 
        assert response.status == '302 Found'
 
        self.assert_authenticated_user(response, username)
 

	
 
        response = response.follow()
 
        return response.session['authuser']
 

	
 
    def _get_logged_user(self):
 
        return User.get_by_username(self._logged_username)
 

	
 
    def assert_authenticated_user(self, response, expected_username):
 
        cookie = response.session.get('authuser')
 
        user = cookie and cookie.get('user_id')
 
        user = user and User.get(user)
 
        user = user and user.username
 
        assert user == expected_username
 

	
 
    def authentication_token(self):
 
        return self.app.get(url('authentication_token')).body
 

	
 
    def checkSessionFlash(self, response, msg=None, skip=0, _matcher=lambda msg, m: msg in m):
 
        if 'flash' not in response.session:
 
            pytest.fail(safe_str(u'msg `%s` not found - session has no flash:\n%s' % (msg, response)))
 
        try:
 
            level, m = response.session['flash'][-1 - skip]
 
            if _matcher(msg, m):
 
                return
 
        except IndexError:
 
            pass
 
        pytest.fail(safe_str(u'msg `%s` not found in session flash (skipping %s): %s' %
 
                           (msg, skip,
 
                            ', '.join('`%s`' % m for level, m in response.session['flash']))))
 

	
 
    def checkSessionFlashRegex(self, response, regex, skip=0):
 
        self.checkSessionFlash(response, regex, skip=skip, _matcher=re.search)
 

	
kallithea/tests/functional/test_admin_notifications.py
Show inline comments
 
from kallithea.tests import *
 
from kallithea.model.db import User
 

	
 
from kallithea.model.user import UserModel
 
from kallithea.model.notification import NotificationModel
 
from kallithea.model.meta import Session
 
from kallithea.lib import helpers as h
 

	
 

	
 
class TestNotificationsController(TestControllerPytest):
 
    def setup_method(self, method):
 
        remove_all_notifications()
 
        self.remove_all_notifications()
 

	
 
    def test_index(self):
 
        self.log_user()
 

	
 
        u1 = UserModel().create_or_update(username='u1', password='qweqwe',
 
                                          email='u1@example.com',
 
                                          firstname=u'u1', lastname=u'u1')
 
        u1 = u1.user_id
 
        Session().commit()
 

	
 
        response = self.app.get(url('notifications'))
 
        response.mustcontain('<div class="table">No notifications here yet</div>')
 

	
 
        cur_user = self._get_logged_user()
 
        notif = NotificationModel().create(created_by=u1, subject=u'test_notification_1',
 
                                           body=u'notification_1', recipients=[cur_user])
 
        Session().commit()
 
        response = self.app.get(url('notifications'))
 
        response.mustcontain('id="notification_%s"' % notif.notification_id)
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 

	
 
        u1 = UserModel().create_or_update(username='u1', password='qweqwe',
 
                                               email='u1@example.com',
 
                                               firstname=u'u1', lastname=u'u1')
 
        u2 = UserModel().create_or_update(username='u2', password='qweqwe',
 
                                               email='u2@example.com',
 
                                               firstname=u'u2', lastname=u'u2')
 

	
 
        # make notifications
 
        notification = NotificationModel().create(created_by=cur_user,
 
                                                  subject=u'test',
 
                                                  body=u'hi there',
 
                                                  recipients=[cur_user, u1, u2])
 
        Session().commit()
 
        u1 = User.get(u1.user_id)
 
        u2 = User.get(u2.user_id)
 

	
 
        # check DB
 
        get_notif = lambda un: [x.notification for x in un]
 
        assert get_notif(cur_user.notifications) == [notification]
 
        assert get_notif(u1.notifications) == [notification]
 
        assert get_notif(u2.notifications) == [notification]
 
        cur_usr_id = cur_user.user_id
 

	
 
        response = self.app.post(
 
            url('notification', notification_id=notification.notification_id),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
        assert response.body == 'ok'
 

	
 
        cur_user = User.get(cur_usr_id)
 
        assert cur_user.notifications == []
 

	
 
    def test_show(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 
        u1 = UserModel().create_or_update(username='u1', password='qweqwe',
 
                                          email='u1@example.com',
 
                                          firstname=u'u1', lastname=u'u1')
 
        u2 = UserModel().create_or_update(username='u2', password='qweqwe',
 
                                          email='u2@example.com',
 
                                          firstname=u'u2', lastname=u'u2')
 

	
 
        subject = u'test'
 
        notif_body = u'hi there'
 
        notification = NotificationModel().create(created_by=cur_user,
 
                                                  subject=subject,
 
                                                  body=notif_body,
 
                                                  recipients=[cur_user, u1, u2])
 

	
 
        response = self.app.get(url('notification',
 
                                    notification_id=notification.notification_id))
 

	
 
        response.mustcontain(subject)
 
        response.mustcontain(notif_body)
 

	
 
    def test_description_with_age(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 
        subject = u'test'
 
        notify_body = u'hi there'
 
        notification = NotificationModel().create(created_by = cur_user,
 
                                                  subject    = subject,
 
                                                  body       = notify_body)
 

	
 
        description = NotificationModel().make_description(notification)
 
        assert description == "{0} sent message {1}".format(
 
                cur_user.username,
 
                h.age(notification.created_on)
 
                )
 

	
 
    def test_description_with_datetime(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
kallithea/tests/functional/test_changeset_comments.py
Show inline comments
 
from kallithea.tests import *
 
from kallithea.model.db import ChangesetComment, Notification, \
 
    UserNotification
 
from kallithea.model.meta import Session
 

	
 

	
 
class TestChangeSetCommentsController(TestControllerPytest):
 

	
 
    def setup_method(self, method):
 
        for x in ChangesetComment.query().all():
 
            Session().delete(x)
 
        Session().commit()
 

	
 
        remove_all_notifications()
 
        self.remove_all_notifications()
 

	
 
    def test_create(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'CommentOnRevision'
 

	
 
        params = {'text': text, '_authentication_token': self.authentication_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        # Test response...
 
        assert response.status == '200 OK'
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        # test DB
 
        assert ChangesetComment.query().count() == 1
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (0 inline, 1 general)'''
 
        )
 

	
 
        assert Notification.query().count() == 1
 
        assert ChangesetComment.query().count() == 1
 

	
 
        notification = Notification.query().all()[0]
 

	
 
        ID = ChangesetComment.query().first().comment_id
 
        assert notification.type_ == Notification.TYPE_CHANGESET_COMMENT
 
        sbj = (u'/%s/changeset/'
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s' % (HG_REPO, ID))
 
        print "%s vs %s" % (sbj, notification.subject)
 
        assert sbj in notification.subject
 

	
 
    def test_create_inline(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'CommentOnRevision'
 
        f_path = 'vcs/web/simplevcs/views/repository.py'
 
        line = 'n1'
 

	
 
        params = {'text': text, 'f_path': f_path, 'line': line, '_authentication_token': self.authentication_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        # Test response...
 
        assert response.status == '200 OK'
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        #test DB
 
        assert ChangesetComment.query().count() == 1
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (1 inline, 0 general)'''
 
        )
 
        response.mustcontain(
 
            '''<div class="comments-list-chunk" '''
 
            '''data-f_path="vcs/web/simplevcs/views/repository.py" '''
 
            '''data-line_no="n1" data-target-id="vcswebsimplevcsviewsrepositorypy_n1">'''
 
        )
 

	
 
        assert Notification.query().count() == 1
 
        assert ChangesetComment.query().count() == 1
 

	
 
        notification = Notification.query().all()[0]
 
        ID = ChangesetComment.query().first().comment_id
 
        assert notification.type_ == Notification.TYPE_CHANGESET_COMMENT
 
        sbj = (u'/%s/changeset/'
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s' % (HG_REPO, ID))
 
        print "%s vs %s" % (sbj, notification.subject)
 
        assert sbj in notification.subject
 

	
 
    def test_create_with_mention(self):
 
        self.log_user()
 

	
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'@%s check CommentOnRevision' % TEST_USER_REGULAR_LOGIN
 

	
 
        params = {'text': text, '_authentication_token': self.authentication_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
                                     params=params, extra_environ={'HTTP_X_PARTIAL_XHR': '1'})
 
        # Test response...
 
        assert response.status == '200 OK'
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        # test DB
 
        assert ChangesetComment.query().count() == 1
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (0 inline, 1 general)'''
 
        )
 

	
 
        assert Notification.query().count() == 2
kallithea/tests/functional/test_login.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
import re
 
import time
 
import urlparse
 

	
 
import mock
 

	
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.lib.utils2 import generate_api_key
 
from kallithea.lib.auth import check_password
 
from kallithea.lib import helpers as h
 
from kallithea.model.api_key import ApiKeyModel
 
from kallithea.model import validators
 
from kallithea.model.db import User, Notification
 
from kallithea.model.meta import Session
 
from kallithea.model.user import UserModel
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestLoginController(TestControllerPytest):
 
    def setup_method(self, method):
 
        remove_all_notifications()
 
        self.remove_all_notifications()
 
        assert Notification.query().all() == []
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='login', action='index'))
 
        assert response.status == '200 OK'
 
        # Test response...
 

	
 
    def test_login_admin_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': TEST_USER_ADMIN_PASS})
 
        assert response.status == '302 Found'
 
        self.assert_authenticated_user(response, TEST_USER_ADMIN_LOGIN)
 

	
 
        response = response.follow()
 
        response.mustcontain('/%s' % HG_REPO)
 

	
 
    def test_login_regular_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS})
 

	
 
        assert response.status == '302 Found'
 
        self.assert_authenticated_user(response, TEST_USER_REGULAR_LOGIN)
 

	
 
        response = response.follow()
 
        response.mustcontain('/%s' % HG_REPO)
 

	
 
    def test_login_regular_email_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_EMAIL,
 
                                  'password': TEST_USER_REGULAR_PASS})
 

	
 
        assert response.status == '302 Found'
 
        self.assert_authenticated_user(response, TEST_USER_REGULAR_LOGIN)
 

	
 
        response = response.follow()
 
        response.mustcontain('/%s' % HG_REPO)
 

	
 
    def test_login_ok_came_from(self):
 
        test_came_from = '/_admin/users'
 
        response = self.app.post(url(controller='login', action='index',
 
                                     came_from=test_came_from),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': TEST_USER_ADMIN_PASS})
 
        assert response.status == '302 Found'
 
        response = response.follow()
 

	
 
        assert response.status == '200 OK'
 
        response.mustcontain('Users Administration')
 

	
 
    def test_login_do_not_remember(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS,
 
                                  'remember': False})
 

	
 
        assert 'Set-Cookie' in response.headers
 
        for cookie in response.headers.getall('Set-Cookie'):
 
            assert not re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE), 'Cookie %r has expiration date, but should be a session cookie' % cookie
 

	
 
    def test_login_remember(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS,
 
                                  'remember': True})
 

	
 
        assert 'Set-Cookie' in response.headers
 
        for cookie in response.headers.getall('Set-Cookie'):
 
            assert re.search(r';\s+(Max-Age|Expires)=', cookie, re.IGNORECASE), 'Cookie %r should have expiration date, but is a session cookie' % cookie
 

	
 
    def test_logout(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS})
 

	
 
        # Verify that a login session has been established.
 
        response = self.app.get(url(controller='login', action='index'))
 
        response = response.follow()
 
        assert 'authuser' in response.session
 

	
 
        response.click('Log Out')
 

	
 
        # Verify that the login session has been terminated.
 
        response = self.app.get(url(controller='login', action='index'))
 
        assert 'authuser' not in response.session
 

	
 
    @parametrize('url_came_from', [
 
          ('data:text/html,<script>window.alert("xss")</script>',),
 
          ('mailto:test@example.com',),
 
          ('file:///etc/passwd',),
 
          ('ftp://ftp.example.com',),
 
          ('http://other.example.com/bl%C3%A5b%C3%A6rgr%C3%B8d',),
 
          ('//evil.example.com/',),
 
          ('/\r\nX-Header-Injection: boo',),
 
          ('/invälid_url_bytes',),
kallithea/tests/models/test_notifications.py
Show inline comments
 
from kallithea.tests import *
 

	
 
from kallithea.model.db import User, Notification, UserNotification
 
from kallithea.model.user import UserModel
 

	
 
from kallithea.model.meta import Session
 
from kallithea.model.notification import NotificationModel
 

	
 

	
 
class TestNotifications(TestControllerPytest):
 

	
 
    def setup_method(self, method):
 
        Session.remove()
 
        self.u1 = UserModel().create_or_update(username=u'u1',
 
                                        password=u'qweqwe',
 
                                        email=u'u1@example.com',
 
                                        firstname=u'u1', lastname=u'u1')
 
        Session().commit()
 
        self.u1 = self.u1.user_id
 

	
 
        self.u2 = UserModel().create_or_update(username=u'u2',
 
                                        password=u'qweqwe',
 
                                        email=u'u2@example.com',
 
                                        firstname=u'u2', lastname=u'u3')
 
        Session().commit()
 
        self.u2 = self.u2.user_id
 

	
 
        self.u3 = UserModel().create_or_update(username=u'u3',
 
                                        password=u'qweqwe',
 
                                        email=u'u3@example.com',
 
                                        firstname=u'u3', lastname=u'u3')
 
        Session().commit()
 
        self.u3 = self.u3.user_id
 

	
 
        remove_all_notifications()
 
        self.remove_all_notifications()
 
        assert [] == Notification.query().all()
 
        assert [] == UserNotification.query().all()
 

	
 
    def test_create_notification(self):
 
        usrs = [self.u1, self.u2]
 
        notification = NotificationModel().create(created_by=self.u1,
 
                                           subject=u'subj', body=u'hi there',
 
                                           recipients=usrs)
 
        Session().commit()
 
        u1 = User.get(self.u1)
 
        u2 = User.get(self.u2)
 
        u3 = User.get(self.u3)
 
        notifications = Notification.query().all()
 
        assert len(notifications) == 1
 

	
 
        assert notifications[0].recipients == [u1, u2]
 
        assert notification.notification_id == notifications[0].notification_id
 

	
 
        unotification = UserNotification.query() \
 
            .filter(UserNotification.notification == notification).all()
 

	
 
        assert len(unotification) == len(usrs)
 
        assert set([x.user.user_id for x in unotification]) == set(usrs)
 

	
 
    def test_user_notifications(self):
 
        notification1 = NotificationModel().create(created_by=self.u1,
 
                                            subject=u'subj', body=u'hi there1',
 
                                            recipients=[self.u3])
 
        Session().commit()
 
        notification2 = NotificationModel().create(created_by=self.u1,
 
                                            subject=u'subj', body=u'hi there2',
 
                                            recipients=[self.u3])
 
        Session().commit()
 
        u3 = Session().query(User).get(self.u3)
 

	
 
        assert sorted([x.notification for x in u3.notifications]) == sorted([notification2, notification1])
 

	
 
    def test_delete_notifications(self):
 
        notification = NotificationModel().create(created_by=self.u1,
 
                                           subject=u'title', body=u'hi there3',
 
                                    recipients=[self.u3, self.u1, self.u2])
 
        Session().commit()
 
        notifications = Notification.query().all()
 
        assert notification in notifications
 

	
 
        Notification.delete(notification.notification_id)
 
        Session().commit()
 

	
 
        notifications = Notification.query().all()
 
        assert not notification in notifications
 

	
 
        un = UserNotification.query().filter(UserNotification.notification
 
                                             == notification).all()
 
        assert un == []
 

	
 
    def test_delete_association(self):
 
        notification = NotificationModel().create(created_by=self.u1,
 
                                           subject=u'title', body=u'hi there3',
 
                                    recipients=[self.u3, self.u1, self.u2])
 
        Session().commit()
 

	
 
        unotification = UserNotification.query() \
 
                            .filter(UserNotification.notification ==
 
                                    notification) \
 
                            .filter(UserNotification.user_id == self.u3) \
 
                            .scalar()
 

	
 
        assert unotification.user_id == self.u3
 

	
 
        NotificationModel().delete(self.u3,
 
                                   notification.notification_id)
 
        Session().commit()
 

	
 
        u3notification = UserNotification.query() \
 
                            .filter(UserNotification.notification ==
 
                                    notification) \
 
                            .filter(UserNotification.user_id == self.u3) \
 
                            .scalar()
 

	
 
        assert u3notification == None
 

	
 
        # notification object is still there
 
        assert Notification.query().all() == [notification]
 

	
 
        #u1 and u2 still have assignments
 
        u1notification = UserNotification.query() \
 
                            .filter(UserNotification.notification ==
 
                                    notification) \
 
                            .filter(UserNotification.user_id == self.u1) \
 
                            .scalar()
 
        assert u1notification != None
 
        u2notification = UserNotification.query() \
 
                            .filter(UserNotification.notification ==
 
                                    notification) \
 
                            .filter(UserNotification.user_id == self.u2) \
 
                            .scalar()
0 comments (0 inline, 0 general)