Changeset - 9b2c5e8b37ea
[Not reviewed]
default
0 5 0
Søren Løvborg - 10 years ago 2015-07-01 18:28:28
kwi@kwi.dk
notification tests: delete notifications before (not after) tests

Don't clean notifications (and changeset comments) *after* the test,
but *before* the test. Other unit tests don't care if they leave
notifications in the database, and neither should these. Rather,
they should ensure their *own* preconditions before testing.

Admittedly, currently only one test leaves a notification in the
database, but more could come along at any time (and why worry?):
TestPullrequestsController.test_create_with_existing_reviewer
5 files changed with 18 insertions and 51 deletions:
0 comments (0 inline, 0 general)
kallithea/tests/__init__.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
Pylons application test package
 

	
 
This package assumes the Pylons environment is already loaded, such as
 
when this script is imported from the `nosetests --with-pylons=test.ini`
 
command.
 

	
 
This module initializes the application via ``websetup`` (`paster
 
setup-app`) and provides the base testing objects.
 

	
 
nosetests -x - fail on first error
 
nosetests kallithea.tests.functional.test_admin_settings:TestSettingsController.test_my_account
 
nosetests --pdb --pdb-failures
 
nosetests --with-coverage --cover-package=kallithea.model.validators kallithea.tests.test_validators
 

	
 
optional FLAGS:
 
    KALLITHEA_WHOOSH_TEST_DISABLE=1 - skip whoosh index building and tests
 
    KALLITHEA_NO_TMP_PATH=1 - disable new temp path for tests, used mostly for test_vcs_operations
 

	
 
"""
 
import os
 
import time
 
import logging
 
import datetime
 
import hashlib
 
import tempfile
 
from os.path import join as jn
 

	
 
from tempfile import _RandomNameSequence
 

	
 
import pylons
 
import pylons.test
 
from pylons import config, url
 
from pylons.i18n.translation import _get_translator
 
from pylons.util import ContextObj
 

	
 
from routes.util import URLGenerator
 
from webtest import TestApp
 
from nose.plugins.skip import SkipTest
 

	
 
from kallithea.lib.compat import unittest
 
from kallithea import is_windows
 
from kallithea.model.db import User
 
from kallithea.model.db import Notification, User, UserNotification
 
from kallithea.model.meta import Session
 
from kallithea.tests.parameterized import parameterized
 
from kallithea.lib.utils2 import safe_str
 

	
 

	
 
os.environ['TZ'] = 'UTC'
 
if not is_windows:
 
    time.tzset()
 

	
 
log = logging.getLogger(__name__)
 

	
 
__all__ = [
 
    'parameterized', 'environ', 'url', 'get_new_dir', 'TestController',
 
    'SkipTest', 'ldap_lib_installed', 'BaseTestCase', 'init_stack',
 
    'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
 
    'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
 
    'TEST_USER_ADMIN_EMAIL', 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
 
    'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
 
    'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
 
    'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
 
    'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
 
    'GIT_REMOTE_REPO', 'SCM_TESTS',
 
]
 

	
 
# Invoke websetup with the current config file
 
# SetupCommand('setup-app').run([config_file])
 

	
 
environ = {}
 

	
 
#SOME GLOBALS FOR TESTS
 

	
 
TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
 
TEST_USER_ADMIN_LOGIN = 'test_admin'
 
TEST_USER_ADMIN_PASS = 'test12'
 
TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
 

	
 
TEST_USER_REGULAR_LOGIN = 'test_regular'
 
TEST_USER_REGULAR_PASS = 'test12'
 
TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
 

	
 
TEST_USER_REGULAR2_LOGIN = 'test_regular2'
 
TEST_USER_REGULAR2_PASS = 'test12'
 
TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
 

	
 
HG_REPO = 'vcs_test_hg'
 
GIT_REPO = 'vcs_test_git'
 

	
 
NEW_HG_REPO = 'vcs_test_hg_new'
 
NEW_GIT_REPO = 'vcs_test_git_new'
 

	
 
HG_FORK = 'vcs_test_hg_fork'
 
GIT_FORK = 'vcs_test_git_fork'
 

	
 
## VCS
 
SCM_TESTS = ['hg', 'git']
 
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 

	
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 

	
 
TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
 
TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
 

	
 

	
 
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 

	
 
TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 
TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
 
TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
 

	
 
TEST_DIR = tempfile.gettempdir()
 
TEST_REPO_PREFIX = 'vcs-test'
 

	
 
# cached repos if any !
 
# comment out to get some other repos from bb or github
 
GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 

	
 
#skip ldap tests if LDAP lib is not installed
 
ldap_lib_installed = False
 
try:
 
    import ldap
 
    ldap.API_VERSION
 
    ldap_lib_installed = True
 
except ImportError:
 
    # means that python-ldap is not installed
 
    pass
 

	
 

	
 
def get_new_dir(title):
 
    """
 
    Returns always new directory path.
 
    """
 
    from kallithea.tests.vcs.utils import get_normalized_path
 
    name = TEST_REPO_PREFIX
 
    if title:
 
        name = '-'.join((name, title))
 
    hex = hashlib.sha1(str(time.time())).hexdigest()
 
    name = '-'.join((name, hex))
 
    path = os.path.join(TEST_DIR, name)
 
    return get_normalized_path(path)
 

	
 

	
 
import logging
 

	
 
class NullHandler(logging.Handler):
 
    def emit(self, record):
 
        pass
 

	
 
def init_stack(config=None):
 
    if not config:
 
        config = pylons.test.pylonsapp.config
 
    url._push_object(URLGenerator(config['routes.map'], environ))
 
    pylons.app_globals._push_object(config['pylons.app_globals'])
 
    pylons.config._push_object(config)
 
    pylons.tmpl_context._push_object(ContextObj())
 
    # Initialize a translator for tests that utilize i18n
 
    translator = _get_translator(pylons.config.get('lang'))
 
    pylons.translator._push_object(translator)
 
    h = NullHandler()
 
    logging.getLogger("kallithea").addHandler(h)
 

	
 

	
 
class BaseTestCase(unittest.TestCase):
 
    def __init__(self, *args, **kwargs):
 
        self.wsgiapp = pylons.test.pylonsapp
 
        init_stack(self.wsgiapp.config)
 
        unittest.TestCase.__init__(self, *args, **kwargs)
 

	
 
    def remove_all_notifications(self):
 
        Notification.query().delete()
 

	
 
        # Because query().delete() does not (by default) trigger cascades.
 
        # http://docs.sqlalchemy.org/en/rel_0_7/orm/collections.html#passive-deletes
 
        UserNotification.query().delete()
 

	
 
        Session().commit()
 

	
 

	
 
class TestController(BaseTestCase):
 

	
 
    def __init__(self, *args, **kwargs):
 
        BaseTestCase.__init__(self, *args, **kwargs)
 
        self.app = TestApp(self.wsgiapp)
 
        self.maxDiff = None
 
        self.index_location = config['app_conf']['index_dir']
 

	
 
    def log_user(self, username=TEST_USER_ADMIN_LOGIN,
 
                 password=TEST_USER_ADMIN_PASS):
 
        self._logged_username = username
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': username,
 
                                  'password': password})
 

	
 
        if 'invalid user name' in response.body:
 
            self.fail('could not login using %s %s' % (username, password))
 

	
 
        self.assertEqual(response.status, '302 Found')
 
        ses = response.session['authuser']
 
        self.assertEqual(ses.get('username'), username)
 
        response = response.follow()
 
        self.assertEqual(ses.get('is_authenticated'), True)
 

	
 
        return response.session['authuser']
 

	
 
    def _get_logged_user(self):
 
        return User.get_by_username(self._logged_username)
 

	
 
    def authentication_token(self):
 
        return self.app.get(url('authentication_token')).body
 

	
 
    def checkSessionFlash(self, response, msg, skip=0):
 
        if 'flash' not in response.session:
 
            self.fail(safe_str(u'msg `%s` not found - session has no flash ' % msg))
 
        try:
 
            level, m = response.session['flash'][-1 - skip]
 
            if msg in m:
 
                return
 
        except IndexError:
 
            pass
 
        self.fail(safe_str(u'msg `%s` not found in session flash (skipping %s): %s' %
 
                           (msg, skip,
 
                            ', '.join('`%s`' % m for level, m in response.session['flash']))))
kallithea/tests/functional/test_admin_notifications.py
Show inline comments
 
from kallithea.tests import *
 
from kallithea.model.db import Notification, User
 

	
 
from kallithea.model.user import UserModel
 
from kallithea.model.notification import NotificationModel
 
from kallithea.model.meta import Session
 
from kallithea.lib import helpers as h
 

	
 

	
 
class TestNotificationsController(TestController):
 

	
 
    def tearDown(self):
 
        for n in Notification.query().all():
 
            inst = Notification.get(n.notification_id)
 
            Session().delete(inst)
 
        Session().commit()
 
    def setUp(self):
 
        self.remove_all_notifications()
 

	
 
    def test_index(self):
 
        self.log_user()
 

	
 
        u1 = UserModel().create_or_update(username='u1', password='qweqwe',
 
                                          email='u1@example.com',
 
                                          firstname='u1', lastname='u1')
 
        u1 = u1.user_id
 

	
 
        response = self.app.get(url('notifications'))
 
        response.mustcontain('<div class="table">No notifications here yet</div>')
 

	
 
        cur_user = self._get_logged_user()
 
        notif = NotificationModel().create(created_by=u1, subject=u'test_notification_1',
 
                                           body=u'notification_1', recipients=[cur_user])
 
        Session().commit()
 
        response = self.app.get(url('notifications'))
 
        response.mustcontain('id="notification_%s"' % notif.notification_id)
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 

	
 
        u1 = UserModel().create_or_update(username='u1', password='qweqwe',
 
                                               email='u1@example.com',
 
                                               firstname='u1', lastname='u1')
 
        u2 = UserModel().create_or_update(username='u2', password='qweqwe',
 
                                               email='u2@example.com',
 
                                               firstname='u2', lastname='u2')
 

	
 
        # make notifications
 
        notification = NotificationModel().create(created_by=cur_user,
 
                                                  subject=u'test',
 
                                                  body=u'hi there',
 
                                                  recipients=[cur_user, u1, u2])
 
        Session().commit()
 
        u1 = User.get(u1.user_id)
 
        u2 = User.get(u2.user_id)
 

	
 
        # check DB
 
        get_notif = lambda un: [x.notification for x in un]
 
        self.assertEqual(get_notif(cur_user.notifications), [notification])
 
        self.assertEqual(get_notif(u1.notifications), [notification])
 
        self.assertEqual(get_notif(u2.notifications), [notification])
 
        cur_usr_id = cur_user.user_id
 

	
 
        response = self.app.delete(url('notification',
 
                                       notification_id=
 
                                       notification.notification_id))
 
        self.assertEqual(response.body, 'ok')
 

	
 
        cur_user = User.get(cur_usr_id)
 
        self.assertEqual(cur_user.notifications, [])
 

	
 
    def test_show(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 
        u1 = UserModel().create_or_update(username='u1', password='qweqwe',
 
                                          email='u1@example.com',
 
                                          firstname='u1', lastname='u1')
 
        u2 = UserModel().create_or_update(username='u2', password='qweqwe',
 
                                          email='u2@example.com',
 
                                          firstname='u2', lastname='u2')
 

	
 
        subject = u'test'
 
        notif_body = u'hi there'
 
        notification = NotificationModel().create(created_by=cur_user,
 
                                                  subject=subject,
 
                                                  body=notif_body,
 
                                                  recipients=[cur_user, u1, u2])
 

	
 
        response = self.app.get(url('notification',
 
                                    notification_id=notification.notification_id))
 

	
 
        response.mustcontain(subject)
 
        response.mustcontain(notif_body)
 

	
 
    def test_description_with_age(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 
        subject = u'test'
 
        notify_body = u'hi there'
 
        notification = NotificationModel().create(created_by = cur_user,
 
                                                  subject    = subject,
 
                                                  body       = notify_body)
 

	
 
        description = NotificationModel().make_description(notification)
 
        self.assertEqual(
 
            description,
 
            "{0} sent message {1}".format(
 
                cur_user.username,
 
                h.age(notification.created_on)
 
                )
 
            )
 

	
 
    def test_description_with_datetime(self):
 
        self.log_user()
 
        cur_user = self._get_logged_user()
 
        subject = u'test'
 
        notify_body = u'hi there'
 
        notification = NotificationModel().create(created_by = cur_user,
 
                                                  subject    = subject,
 
                                                  body       = notify_body)
 

	
 
        description = NotificationModel().make_description(notification, False)
 
        self.assertEqual(
 
            description,
 
            "{0} sent message at {1}".format(
 
                cur_user.username,
 
                h.fmt_date(notification.created_on)
 
                )
 
            )
kallithea/tests/functional/test_changeset_comments.py
Show inline comments
 
from kallithea.tests import *
 
from kallithea.model.db import ChangesetComment, Notification, \
 
    UserNotification
 
from kallithea.model.meta import Session
 

	
 

	
 
class TestChangeSetCommentsController(TestController):
 

	
 
    def setUp(self):
 
        for x in ChangesetComment.query().all():
 
            Session().delete(x)
 
        Session().commit()
 

	
 
        for x in Notification.query().all():
 
            Session().delete(x)
 
        Session().commit()
 

	
 
    def tearDown(self):
 
        for x in ChangesetComment.query().all():
 
            Session().delete(x)
 
        Session().commit()
 

	
 
        for x in Notification.query().all():
 
            Session().delete(x)
 
        Session().commit()
 
        self.remove_all_notifications()
 

	
 
    def test_create(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'CommentOnRevision'
 

	
 
        params = {'text': text, '_authentication_token': self.authentication_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
                                     params=params)
 
        # Test response...
 
        self.assertEqual(response.status, '302 Found')
 
        response.follow()
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        # test DB
 
        self.assertEqual(ChangesetComment.query().count(), 1)
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (0 inline, 1 general)'''
 
        )
 

	
 
        self.assertEqual(Notification.query().count(), 1)
 
        self.assertEqual(ChangesetComment.query().count(), 1)
 

	
 
        notification = Notification.query().all()[0]
 

	
 
        ID = ChangesetComment.query().first().comment_id
 
        self.assertEqual(notification.type_,
 
                         Notification.TYPE_CHANGESET_COMMENT)
 
        sbj = (u'/%s/changeset/'
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s' % (HG_REPO, ID))
 
        print "%s vs %s" % (sbj, notification.subject)
 
        self.assertTrue(sbj in notification.subject)
 

	
 
    def test_create_inline(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'CommentOnRevision'
 
        f_path = 'vcs/web/simplevcs/views/repository.py'
 
        line = 'n1'
 

	
 
        params = {'text': text, 'f_path': f_path, 'line': line, '_authentication_token': self.authentication_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
                                     params=params)
 
        # Test response...
 
        self.assertEqual(response.status, '302 Found')
 
        response.follow()
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        #test DB
 
        self.assertEqual(ChangesetComment.query().count(), 1)
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (1 inline, 0 general)'''
 
        )
 
        response.mustcontain(
 
            '''<div style="display:none" class="inline-comment-placeholder" '''
 
            '''path="vcs/web/simplevcs/views/repository.py" '''
 
            '''target_id="vcswebsimplevcsviewsrepositorypy">'''
 
        )
 

	
 
        self.assertEqual(Notification.query().count(), 1)
 
        self.assertEqual(ChangesetComment.query().count(), 1)
 

	
 
        notification = Notification.query().all()[0]
 
        ID = ChangesetComment.query().first().comment_id
 
        self.assertEqual(notification.type_,
 
                         Notification.TYPE_CHANGESET_COMMENT)
 
        sbj = (u'/%s/changeset/'
 
               '27cd5cce30c96924232dffcd24178a07ffeb5dfc#comment-%s' % (HG_REPO, ID))
 
        print "%s vs %s" % (sbj, notification.subject)
 
        self.assertTrue(sbj in notification.subject)
 

	
 
    def test_create_with_mention(self):
 
        self.log_user()
 

	
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'@%s check CommentOnRevision' % TEST_USER_REGULAR_LOGIN
 

	
 
        params = {'text': text, '_authentication_token': self.authentication_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
                                     params=params)
 
        # Test response...
 
        self.assertEqual(response.status, '302 Found')
 
        response.follow()
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        # test DB
 
        self.assertEqual(ChangesetComment.query().count(), 1)
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 1 comment (0 inline, 1 general)'''
 
        )
 

	
 
        self.assertEqual(Notification.query().count(), 2)
 
        users = [x.user.username for x in UserNotification.query().all()]
 

	
 
        # test_regular gets notification by @mention
 
        self.assertEqual(sorted(users), [TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN])
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        text = u'CommentOnRevision'
 

	
 
        params = {'text': text, '_authentication_token': self.authentication_token()}
 
        response = self.app.post(url(controller='changeset', action='comment',
 
                                     repo_name=HG_REPO, revision=rev),
 
                                     params=params)
 

	
 
        comments = ChangesetComment.query().all()
 
        self.assertEqual(len(comments), 1)
 
        comment_id = comments[0].comment_id
 

	
 
        self.app.delete(url(controller='changeset',
 
                                    action='delete_comment',
 
                                    repo_name=HG_REPO,
 
                                    comment_id=comment_id))
 

	
 
        comments = ChangesetComment.query().all()
 
        self.assertEqual(len(comments), 0)
 

	
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                repo_name=HG_REPO, revision=rev))
 
        response.mustcontain(
 
            '''<div class="comments-number">'''
 
            ''' 0 comments (0 inline, 0 general)'''
 
        )
kallithea/tests/functional/test_login.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
from __future__ import with_statement
 
import mock
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.lib.utils2 import generate_api_key
 
from kallithea.lib.auth import check_password
 
from kallithea.lib import helpers as h
 
from kallithea.model.api_key import ApiKeyModel
 
from kallithea.model import validators
 
from kallithea.model.db import User, Notification
 
from kallithea.model.meta import Session
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestLoginController(TestController):
 

	
 
    def tearDown(self):
 
        for n in Notification.query().all():
 
            Session().delete(n)
 

	
 
        Session().commit()
 
    def setUp(self):
 
        self.remove_all_notifications()
 
        self.assertEqual(Notification.query().all(), [])
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='login', action='index'))
 
        self.assertEqual(response.status, '200 OK')
 
        # Test response...
 

	
 
    def test_login_admin_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': TEST_USER_ADMIN_PASS})
 
        self.assertEqual(response.status, '302 Found')
 
        self.assertEqual(response.session['authuser'].get('username'),
 
                         TEST_USER_ADMIN_LOGIN)
 
        response = response.follow()
 
        response.mustcontain('/%s' % HG_REPO)
 

	
 
    def test_login_regular_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS})
 

	
 
        self.assertEqual(response.status, '302 Found')
 
        self.assertEqual(response.session['authuser'].get('username'),
 
                         TEST_USER_REGULAR_LOGIN)
 
        response = response.follow()
 
        response.mustcontain('/%s' % HG_REPO)
 

	
 
    def test_login_ok_came_from(self):
 
        test_came_from = '/_admin/users'
 
        response = self.app.post(url(controller='login', action='index',
 
                                     came_from=test_came_from),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': TEST_USER_ADMIN_PASS})
 
        self.assertEqual(response.status, '302 Found')
 
        response = response.follow()
 

	
 
        self.assertEqual(response.status, '200 OK')
 
        response.mustcontain('Users Administration')
 

	
 
    def test_logout(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_REGULAR_LOGIN,
 
                                  'password': TEST_USER_REGULAR_PASS})
 

	
 
        # Verify that a login session has been established.
 
        response = self.app.get(url(controller='login', action='index'))
 
        response = response.follow()
 
        self.assertIn('authuser', response.session)
 

	
 
        response.click('Log Out')
 

	
 
        # Verify that the login session has been terminated.
 
        response = self.app.get(url(controller='login', action='index'))
 
        self.assertNotIn('authuser', response.session)
 

	
 
    @parameterized.expand([
 
          ('data:text/html,<script>window.alert("xss")</script>',),
 
          ('mailto:test@example.com',),
 
          ('file:///etc/passwd',),
 
          ('ftp://some.ftp.server',),
 
          ('http://other.domain/bl%C3%A5b%C3%A6rgr%C3%B8d',),
 
    ])
 
    def test_login_bad_came_froms(self, url_came_from):
 
        response = self.app.post(url(controller='login', action='index',
 
                                     came_from=url_came_from),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': TEST_USER_ADMIN_PASS})
 
        self.assertEqual(response.status, '302 Found')
 
        self.assertEqual(response._environ['paste.testing_variables']
 
                         ['tmpl_context'].came_from, '/')
 
        response = response.follow()
 

	
 
        self.assertEqual(response.status, '200 OK')
 

	
 
    def test_login_short_password(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': 'as'})
 
        self.assertEqual(response.status, '200 OK')
 

	
 
        response.mustcontain('Enter 3 characters or more')
 

	
 
    def test_login_wrong_username_password(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': 'error',
 
                                  'password': 'test12'})
 

	
 
        response.mustcontain('invalid user name')
 
        response.mustcontain('invalid password')
 

	
 
    # verify that get arguments are correctly passed along login redirection
 

	
 
    @parameterized.expand([
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå'.encode('utf-8'), 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_redirection_to_login_form_preserves_get_args(self, args, args_encoded):
 
        with fixture.anon_access(False):
 
            response = self.app.get(url(controller='summary', action='index',
 
                                        repo_name=HG_REPO,
 
                                        **args))
 
            self.assertEqual(response.status, '302 Found')
 
            for encoded in args_encoded:
 
                self.assertIn(encoded, response.location)
 

	
 
    @parameterized.expand([
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå'.encode('utf-8'), 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_login_form_preserves_get_args(self, args, args_encoded):
 
        response = self.app.get(url(controller='login', action='index',
 
                                    came_from = '/_admin/users',
 
                                    **args))
 
        for encoded in args_encoded:
 
            self.assertIn(encoded, response.form.action)
 

	
 
    @parameterized.expand([
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå'.encode('utf-8'), 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_redirection_after_successful_login_preserves_get_args(self, args, args_encoded):
 
        response = self.app.post(url(controller='login', action='index',
 
                                     came_from = '/_admin/users',
 
                                     **args),
 
                                 {'username': TEST_USER_ADMIN_LOGIN,
 
                                  'password': TEST_USER_ADMIN_PASS})
 
        self.assertEqual(response.status, '302 Found')
 
        for encoded in args_encoded:
 
            self.assertIn(encoded, response.location)
 

	
 
    @parameterized.expand([
 
        ({'foo':'one', 'bar':'two'}, ('foo=one', 'bar=two')),
 
        ({'blue': u'blå'.encode('utf-8'), 'green':u'grøn'},
 
             ('blue=bl%C3%A5', 'green=gr%C3%B8n')),
 
    ])
 
    def test_login_form_after_incorrect_login_preserves_get_args(self, args, args_encoded):
 
        response = self.app.post(url(controller='login', action='index',
 
                                     came_from = '/_admin/users',
 
                                     **args),
 
                                 {'username': 'error',
 
                                  'password': 'test12'})
 

	
 
        response.mustcontain('invalid user name')
 
        response.mustcontain('invalid password')
 
        for encoded in args_encoded:
 
            self.assertIn(encoded, response.form.action)
 

	
 
    #==========================================================================
 
    # REGISTRATIONS
 
    #==========================================================================
 
    def test_register(self):
 
        response = self.app.get(url(controller='login', action='register'))
 
        response.mustcontain('Sign Up')
 

	
 
    def test_register_err_same_username(self):
 
        uname = TEST_USER_ADMIN_LOGIN
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': uname,
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'goodmail@domain.com',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
 

	
 
        msg = validators.ValidUsername()._messages['username_exists']
 
        msg = h.html_escape(msg % {'username': uname})
 
        response.mustcontain(msg)
 

	
 
    def test_register_err_same_email(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': 'test_admin_0',
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': TEST_USER_ADMIN_EMAIL,
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
 

	
 
        msg = validators.UniqSystemEmail()()._messages['email_taken']
 
        response.mustcontain(msg)
 

	
 
    def test_register_err_same_email_case_sensitive(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': 'test_admin_1',
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': TEST_USER_ADMIN_EMAIL.title(),
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
kallithea/tests/models/test_notifications.py
Show inline comments
 
from kallithea.tests import *
 

	
 
from kallithea.model.db import User, Notification, UserNotification
 
from kallithea.model.user import UserModel
 

	
 
from kallithea.model.meta import Session
 
from kallithea.model.notification import NotificationModel
 

	
 

	
 
class TestNotifications(BaseTestCase):
 

	
 
    def __init__(self, methodName='runTest'):
 
        Session.remove()
 
        self.u1 = UserModel().create_or_update(username=u'u1',
 
                                        password=u'qweqwe',
 
                                        email=u'u1@example.com',
 
                                        firstname=u'u1', lastname=u'u1')
 
        Session().commit()
 
        self.u1 = self.u1.user_id
 

	
 
        self.u2 = UserModel().create_or_update(username=u'u2',
 
                                        password=u'qweqwe',
 
                                        email=u'u2@example.com',
 
                                        firstname=u'u2', lastname=u'u3')
 
        Session().commit()
 
        self.u2 = self.u2.user_id
 

	
 
        self.u3 = UserModel().create_or_update(username=u'u3',
 
                                        password=u'qweqwe',
 
                                        email=u'u3@example.com',
 
                                        firstname=u'u3', lastname=u'u3')
 
        Session().commit()
 
        self.u3 = self.u3.user_id
 

	
 
        super(TestNotifications, self).__init__(methodName=methodName)
 

	
 
    def _clean_notifications(self):
 
        for n in Notification.query().all():
 
            Session().delete(n)
 

	
 
        Session().commit()
 
        self.assertEqual(Notification.query().all(), [])
 

	
 
    def setUp(self):
 
        self._clean_notifications()
 

	
 
    def tearDown(self):
 
        self._clean_notifications()
 

	
 
    def test_create_notification(self):
 
        self.remove_all_notifications()
 
        self.assertEqual([], Notification.query().all())
 
        self.assertEqual([], UserNotification.query().all())
 

	
 
    def test_create_notification(self):
 
        usrs = [self.u1, self.u2]
 
        notification = NotificationModel().create(created_by=self.u1,
 
                                           subject=u'subj', body=u'hi there',
 
                                           recipients=usrs)
 
        Session().commit()
 
        u1 = User.get(self.u1)
 
        u2 = User.get(self.u2)
 
        u3 = User.get(self.u3)
 
        notifications = Notification.query().all()
 
        self.assertEqual(len(notifications), 1)
 

	
 
        self.assertEqual(notifications[0].recipients, [u1, u2])
 
        self.assertEqual(notification.notification_id,
 
                         notifications[0].notification_id)
 

	
 
        unotification = UserNotification.query()\
 
            .filter(UserNotification.notification == notification).all()
 

	
 
        self.assertEqual(len(unotification), len(usrs))
 
        self.assertEqual(set([x.user.user_id for x in unotification]),
 
                         set(usrs))
 

	
 
    def test_user_notifications(self):
 
        self.assertEqual([], Notification.query().all())
 
        self.assertEqual([], UserNotification.query().all())
 

	
 
        notification1 = NotificationModel().create(created_by=self.u1,
 
                                            subject=u'subj', body=u'hi there1',
 
                                            recipients=[self.u3])
 
        Session().commit()
 
        notification2 = NotificationModel().create(created_by=self.u1,
 
                                            subject=u'subj', body=u'hi there2',
 
                                            recipients=[self.u3])
 
        Session().commit()
 
        u3 = Session().query(User).get(self.u3)
 

	
 
        self.assertEqual(sorted([x.notification for x in u3.notifications]),
 
                         sorted([notification2, notification1]))
 

	
 
    def test_delete_notifications(self):
 
        self.assertEqual([], Notification.query().all())
 
        self.assertEqual([], UserNotification.query().all())
 

	
 
        notification = NotificationModel().create(created_by=self.u1,
 
                                           subject=u'title', body=u'hi there3',
 
                                    recipients=[self.u3, self.u1, self.u2])
 
        Session().commit()
 
        notifications = Notification.query().all()
 
        self.assertTrue(notification in notifications)
 

	
 
        Notification.delete(notification.notification_id)
 
        Session().commit()
 

	
 
        notifications = Notification.query().all()
 
        self.assertFalse(notification in notifications)
 

	
 
        un = UserNotification.query().filter(UserNotification.notification
 
                                             == notification).all()
 
        self.assertEqual(un, [])
 

	
 
    def test_delete_association(self):
 

	
 
        self.assertEqual([], Notification.query().all())
 
        self.assertEqual([], UserNotification.query().all())
 

	
 
        notification = NotificationModel().create(created_by=self.u1,
 
                                           subject=u'title', body=u'hi there3',
 
                                    recipients=[self.u3, self.u1, self.u2])
 
        Session().commit()
 

	
 
        unotification = UserNotification.query()\
 
                            .filter(UserNotification.notification ==
 
                                    notification)\
 
                            .filter(UserNotification.user_id == self.u3)\
 
                            .scalar()
 

	
 
        self.assertEqual(unotification.user_id, self.u3)
 

	
 
        NotificationModel().delete(self.u3,
 
                                   notification.notification_id)
 
        Session().commit()
 

	
 
        u3notification = UserNotification.query()\
 
                            .filter(UserNotification.notification ==
 
                                    notification)\
 
                            .filter(UserNotification.user_id == self.u3)\
 
                            .scalar()
 

	
 
        self.assertEqual(u3notification, None)
 

	
 
        # notification object is still there
 
        self.assertEqual(Notification.query().all(), [notification])
 

	
 
        #u1 and u2 still have assignments
 
        u1notification = UserNotification.query()\
 
                            .filter(UserNotification.notification ==
 
                                    notification)\
 
                            .filter(UserNotification.user_id == self.u1)\
 
                            .scalar()
 
        self.assertNotEqual(u1notification, None)
 
        u2notification = UserNotification.query()\
 
                            .filter(UserNotification.notification ==
 
                                    notification)\
 
                            .filter(UserNotification.user_id == self.u2)\
 
                            .scalar()
 
        self.assertNotEqual(u2notification, None)
 

	
 
    def test_notification_counter(self):
 
        self.assertEqual([], Notification.query().all())
 
        self.assertEqual([], UserNotification.query().all())
 

	
 
        NotificationModel().create(created_by=self.u1,
 
                            subject=u'title', body=u'hi there_delete',
 
                            recipients=[self.u3, self.u1])
 
        Session().commit()
 

	
 
        self.assertEqual(NotificationModel()
 
                         .get_unread_cnt_for_user(self.u1), 1)
 
        self.assertEqual(NotificationModel()
 
                         .get_unread_cnt_for_user(self.u2), 0)
 
        self.assertEqual(NotificationModel()
 
                         .get_unread_cnt_for_user(self.u3), 1)
 

	
 
        notification = NotificationModel().create(created_by=self.u1,
 
                                           subject=u'title', body=u'hi there3',
 
                                    recipients=[self.u3, self.u1, self.u2])
 
        Session().commit()
 

	
 
        self.assertEqual(NotificationModel()
 
                         .get_unread_cnt_for_user(self.u1), 2)
 
        self.assertEqual(NotificationModel()
 
                         .get_unread_cnt_for_user(self.u2), 1)
 
        self.assertEqual(NotificationModel()
 
                         .get_unread_cnt_for_user(self.u3), 2)
0 comments (0 inline, 0 general)