Changeset - f5841b61a582
[Not reviewed]
default
0 6 0
Thomas De Schampheleire - 7 years ago 2018-12-05 21:37:21
thomas.de_schampheleire@nokia.com
model: move notification types from Notification to NotificationModel

This commit is part of the removal of the UI notification feature from
Kallithea, which is not deemed useful in its current form. Only email
notifications are preserved.

As there is no database storage of notifications anymore, the Notification
class will be removed. However, the notification type definitions are still
used for email notifications, and need to live somewhere. As creating
notifications is always passing via NotificationModel, it makes sense to
move the types there.
6 files changed with 30 insertions and 31 deletions:
0 comments (0 inline, 0 general)
kallithea/model/comment.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.comment
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
comments model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Nov 11, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 

	
 
from tg.i18n import ugettext as _
 
from collections import defaultdict
 

	
 
from kallithea.lib.utils2 import extract_mentioned_users, safe_unicode
 
from kallithea.lib import helpers as h
 
from kallithea.model.db import ChangesetComment, User, \
 
    Notification, PullRequest, Repository
 
    PullRequest, Repository
 
from kallithea.model.notification import NotificationModel
 
from kallithea.model.meta import Session
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def _list_changeset_commenters(revision):
 
    return (Session().query(User)
 
        .join(ChangesetComment.author)
 
        .filter(ChangesetComment.revision == revision)
 
        .all())
 

	
 
def _list_pull_request_commenters(pull_request):
 
    return (Session().query(User)
 
        .join(ChangesetComment.author)
 
        .filter(ChangesetComment.pull_request_id == pull_request.pull_request_id)
 
        .all())
 

	
 

	
 
class ChangesetCommentsModel(object):
 

	
 
    def _get_notification_data(self, repo, comment, author, comment_text,
 
                               line_no=None, revision=None, pull_request=None,
 
                               status_change=None, closing_pr=False):
 
        """
 
        :returns: tuple (subj,body,recipients,notification_type,email_kwargs)
 
        """
 
        # make notification
 
        body = comment_text  # text of the comment
 
        line = ''
 
        if line_no:
 
            line = _('on line %s') % line_no
 

	
 
        # changeset
 
        if revision:
 
            notification_type = Notification.TYPE_CHANGESET_COMMENT
 
            notification_type = NotificationModel.TYPE_CHANGESET_COMMENT
 
            cs = repo.scm_instance.get_changeset(revision)
 
            desc = cs.short_id
 

	
 
            threading = ['%s-rev-%s@%s' % (repo.repo_name, revision, h.canonical_hostname())]
 
            if line_no: # TODO: url to file _and_ line number
 
                threading.append('%s-rev-%s-line-%s@%s' % (repo.repo_name, revision, line_no,
 
                                                           h.canonical_hostname()))
 
            comment_url = h.canonical_url('changeset_home',
 
                repo_name=repo.repo_name,
 
                revision=revision,
 
                anchor='comment-%s' % comment.comment_id)
 
            subj = safe_unicode(
 
                h.link_to('Re changeset: %(desc)s %(line)s' % \
 
                          {'desc': desc, 'line': line},
 
                          comment_url)
 
            )
 
            # get the current participants of this changeset
 
            recipients = _list_changeset_commenters(revision)
 
            # add changeset author if it's known locally
 
            cs_author = User.get_from_cs_author(cs.author)
 
            if not cs_author:
 
                # use repo owner if we cannot extract the author correctly
 
                # FIXME: just use committer name even if not a user
 
                cs_author = repo.owner
 
            recipients.append(cs_author)
 

	
 
            email_kwargs = {
 
                'status_change': status_change,
 
                'cs_comment_user': author.full_name_and_username,
 
                'cs_target_repo': h.canonical_url('summary_home', repo_name=repo.repo_name),
 
                'cs_comment_url': comment_url,
 
                'cs_url': h.canonical_url('changeset_home', repo_name=repo.repo_name, revision=revision),
 
                'raw_id': revision,
 
                'message': cs.message,
 
                'message_short': h.shorter(cs.message, 50, firstline=True),
 
                'cs_author': cs_author,
 
                'repo_name': repo.repo_name,
 
                'short_id': h.short_id(revision),
 
                'branch': cs.branch,
 
                'comment_username': author.username,
 
                'threading': threading,
 
            }
 
        # pull request
 
        elif pull_request:
 
            notification_type = Notification.TYPE_PULL_REQUEST_COMMENT
 
            notification_type = NotificationModel.TYPE_PULL_REQUEST_COMMENT
 
            desc = comment.pull_request.title
 
            _org_ref_type, org_ref_name, _org_rev = comment.pull_request.org_ref.split(':')
 
            _other_ref_type, other_ref_name, _other_rev = comment.pull_request.other_ref.split(':')
 
            threading = ['%s-pr-%s@%s' % (pull_request.other_repo.repo_name,
 
                                          pull_request.pull_request_id,
 
                                          h.canonical_hostname())]
 
            if line_no: # TODO: url to file _and_ line number
 
                threading.append('%s-pr-%s-line-%s@%s' % (pull_request.other_repo.repo_name,
 
                                                          pull_request.pull_request_id, line_no,
 
                                                          h.canonical_hostname()))
 
            comment_url = pull_request.url(canonical=True,
 
                anchor='comment-%s' % comment.comment_id)
 
            subj = safe_unicode(
 
                h.link_to('Re pull request %(pr_nice_id)s: %(desc)s %(line)s' %
 
                          {'desc': desc,
 
                           'pr_nice_id': comment.pull_request.nice_id(),
 
                           'line': line},
 
                          comment_url)
 
            )
 
            # get the current participants of this pull request
 
            recipients = _list_pull_request_commenters(pull_request)
 
            recipients.append(pull_request.owner)
 
            recipients += pull_request.get_reviewer_users()
 

	
 
            # set some variables for email notification
 
            email_kwargs = {
 
                'pr_title': pull_request.title,
 
                'pr_title_short': h.shorter(pull_request.title, 50),
 
                'pr_nice_id': pull_request.nice_id(),
 
                'status_change': status_change,
 
                'closing_pr': closing_pr,
 
                'pr_comment_url': comment_url,
 
                'pr_url': pull_request.url(canonical=True),
 
                'pr_comment_user': author.full_name_and_username,
 
                'pr_target_repo': h.canonical_url('summary_home',
 
                                   repo_name=pull_request.other_repo.repo_name),
 
                'pr_target_branch': other_ref_name,
 
                'pr_source_repo': h.canonical_url('summary_home',
 
                                   repo_name=pull_request.org_repo.repo_name),
 
                'pr_source_branch': org_ref_name,
 
                'pr_owner': pull_request.owner,
 
                'pr_owner_username': pull_request.owner.username,
 
                'repo_name': pull_request.other_repo.repo_name,
 
                'comment_username': author.username,
 
                'threading': threading,
 
            }
 

	
 
        return subj, body, recipients, notification_type, email_kwargs
 

	
 
    def create(self, text, repo, author, revision=None, pull_request=None,
 
               f_path=None, line_no=None, status_change=None, closing_pr=False,
 
               send_email=True):
 
        """
 
        Creates a new comment for either a changeset or a pull request.
 
        status_change and closing_pr is only for the optional email.
 

	
 
        Returns the created comment.
 
        """
 
        if not status_change and not text:
 
            log.warning('Missing text for comment, skipping...')
 
            return None
 

	
 
        repo = Repository.guess_instance(repo)
 
        author = User.guess_instance(author)
 
        comment = ChangesetComment()
 
        comment.repo = repo
 
        comment.author = author
 
        comment.text = text
 
        comment.f_path = f_path
 
        comment.line_no = line_no
 

	
 
        if revision is not None:
 
            comment.revision = revision
 
        elif pull_request is not None:
 
            pull_request = PullRequest.guess_instance(pull_request)
 
            comment.pull_request = pull_request
 
        else:
 
            raise Exception('Please specify revision or pull_request_id')
 

	
 
        Session().add(comment)
 
        Session().flush()
 

	
 
        if send_email:
 
            (subj, body, recipients, notification_type,
 
             email_kwargs) = self._get_notification_data(
 
                                repo, comment, author,
 
                                comment_text=text,
 
                                line_no=line_no,
 
                                revision=revision,
 
                                pull_request=pull_request,
 
                                status_change=status_change,
 
                                closing_pr=closing_pr)
 
            email_kwargs['is_mention'] = False
 
            # create notification objects, and emails
 
            NotificationModel().create(
 
                created_by=author, subject=subj, body=body,
kallithea/model/db.py
Show inline comments
 
@@ -2385,200 +2385,192 @@ class PullRequest(Base, BaseDbModel):
 

	
 
        return q
 

	
 
    def get_reviewer_users(self):
 
        """Like .reviewers, but actually returning the users"""
 
        return User.query() \
 
            .join(PullRequestReviewer) \
 
            .filter(PullRequestReviewer.pull_request == self) \
 
            .order_by(PullRequestReviewer.pull_request_reviewers_id) \
 
            .all()
 

	
 
    def is_closed(self):
 
        return self.status == self.STATUS_CLOSED
 

	
 
    def user_review_status(self, user_id):
 
        """Return the user's latest status votes on PR"""
 
        # note: no filtering on repo - that would be redundant
 
        status = ChangesetStatus.query() \
 
            .filter(ChangesetStatus.pull_request == self) \
 
            .filter(ChangesetStatus.user_id == user_id) \
 
            .order_by(ChangesetStatus.version) \
 
            .first()
 
        return str(status.status) if status else ''
 

	
 
    @classmethod
 
    def make_nice_id(cls, pull_request_id):
 
        '''Return pull request id nicely formatted for displaying'''
 
        return '#%s' % pull_request_id
 

	
 
    def nice_id(self):
 
        '''Return the id of this pull request, nicely formatted for displaying'''
 
        return self.make_nice_id(self.pull_request_id)
 

	
 
    def get_api_data(self):
 
        return self.__json__()
 

	
 
    def __json__(self):
 
        return dict(
 
            pull_request_id=self.pull_request_id,
 
            url=self.url(),
 
            reviewers=self.reviewers,
 
            revisions=self.revisions,
 
            owner=self.owner.username,
 
            title=self.title,
 
            description=self.description,
 
            org_repo_url=self.org_repo.clone_url(),
 
            org_ref_parts=self.org_ref_parts,
 
            other_ref_parts=self.other_ref_parts,
 
            status=self.status,
 
            comments=self.comments,
 
            statuses=self.statuses,
 
        )
 

	
 
    def url(self, **kwargs):
 
        canonical = kwargs.pop('canonical', None)
 
        import kallithea.lib.helpers as h
 
        b = self.org_ref_parts[1]
 
        if b != self.other_ref_parts[1]:
 
            s = '/_/' + b
 
        else:
 
            s = '/_/' + self.title
 
        kwargs['extra'] = urlreadable(s)
 
        if canonical:
 
            return h.canonical_url('pullrequest_show', repo_name=self.other_repo.repo_name,
 
                                   pull_request_id=self.pull_request_id, **kwargs)
 
        return h.url('pullrequest_show', repo_name=self.other_repo.repo_name,
 
                     pull_request_id=self.pull_request_id, **kwargs)
 

	
 

	
 
class PullRequestReviewer(Base, BaseDbModel):
 
    __tablename__ = 'pull_request_reviewers'
 
    __table_args__ = (
 
        Index('pull_request_reviewers_user_id_idx', 'user_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    def __init__(self, user=None, pull_request=None):
 
        self.user = user
 
        self.pull_request = pull_request
 

	
 
    pull_request_reviewers_id = Column('pull_requests_reviewers_id', Integer(), primary_key=True)
 
    pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=False)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    pull_request = relationship('PullRequest')
 

	
 
    def __json__(self):
 
        return dict(
 
            username=self.user.username if self.user else None,
 
        )
 

	
 

	
 
class Notification(object):
 
    __tablename__ = 'notifications'
 

	
 
    TYPE_CHANGESET_COMMENT = u'cs_comment'
 
    TYPE_MESSAGE = u'message'
 
    TYPE_MENTION = u'mention' # not used
 
    TYPE_REGISTRATION = u'registration'
 
    TYPE_PULL_REQUEST = u'pull_request'
 
    TYPE_PULL_REQUEST_COMMENT = u'pull_request_comment'
 

	
 

	
 
class UserNotification(object):
 
    __tablename__ = 'user_to_notification'
 

	
 

	
 
class Gist(Base, BaseDbModel):
 
    __tablename__ = 'gists'
 
    __table_args__ = (
 
        Index('g_gist_access_id_idx', 'gist_access_id'),
 
        Index('g_created_on_idx', 'created_on'),
 
        _table_args_default_dict,
 
    )
 

	
 
    GIST_PUBLIC = u'public'
 
    GIST_PRIVATE = u'private'
 
    DEFAULT_FILENAME = u'gistfile1.txt'
 

	
 
    gist_id = Column(Integer(), primary_key=True)
 
    gist_access_id = Column(Unicode(250), nullable=False)
 
    gist_description = Column(UnicodeText(), nullable=False)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    gist_expires = Column(Float(53), nullable=False)
 
    gist_type = Column(Unicode(128), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    modified_at = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    owner = relationship('User')
 

	
 
    @hybrid_property
 
    def is_expired(self):
 
        return (self.gist_expires != -1) & (time.time() > self.gist_expires)
 

	
 
    def __repr__(self):
 
        return '<Gist:[%s]%s>' % (self.gist_type, self.gist_access_id)
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(Gist, cls).guess_instance(value, Gist.get_by_access_id)
 

	
 
    @classmethod
 
    def get_or_404(cls, id_):
 
        res = cls.query().filter(cls.gist_access_id == id_).scalar()
 
        if res is None:
 
            raise HTTPNotFound
 
        return res
 

	
 
    @classmethod
 
    def get_by_access_id(cls, gist_access_id):
 
        return cls.query().filter(cls.gist_access_id == gist_access_id).scalar()
 

	
 
    def gist_url(self):
 
        import kallithea
 
        alias_url = kallithea.CONFIG.get('gist_alias_url')
 
        if alias_url:
 
            return alias_url.replace('{gistid}', self.gist_access_id)
 

	
 
        import kallithea.lib.helpers as h
 
        return h.canonical_url('gist', gist_id=self.gist_access_id)
 

	
 
    @classmethod
 
    def base_path(cls):
 
        """
 
        Returns base path where all gists are stored
 

	
 
        :param cls:
 
        """
 
        from kallithea.model.gist import GIST_STORE_LOC
 
        q = Session().query(Ui) \
 
            .filter(Ui.ui_key == URL_SEP)
 
        q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
 
        return os.path.join(q.one().ui_value, GIST_STORE_LOC)
 

	
 
    def get_api_data(self):
 
        """
 
        Common function for generating gist related data for API
 
        """
 
        gist = self
 
        data = dict(
 
            gist_id=gist.gist_id,
 
            type=gist.gist_type,
 
            access_id=gist.gist_access_id,
 
            description=gist.gist_description,
 
            url=gist.gist_url(),
 
            expires=gist.gist_expires,
 
            created_on=gist.created_on,
 
        )
 
        return data
 

	
 
    def __json__(self):
 
        data = dict(
 
        )
 
        data.update(self.get_api_data())
 
        return data
 
    ## SCM functions
 

	
 
    @property
 
    def scm_instance(self):
kallithea/model/notification.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.notification
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Model for notifications
 

	
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Nov 20, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import datetime
 
import logging
 
import traceback
 

	
 
from tg import tmpl_context as c, app_globals
 
from tg.i18n import ugettext as _
 
from sqlalchemy.orm import joinedload, subqueryload
 

	
 
import kallithea
 
from kallithea.lib import helpers as h
 
from kallithea.lib.utils2 import safe_unicode
 
from kallithea.model.db import Notification, User
 
from kallithea.model.db import User
 
from kallithea.model.meta import Session
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class NotificationModel(object):
 

	
 
    TYPE_CHANGESET_COMMENT = u'cs_comment'
 
    TYPE_MESSAGE = u'message'
 
    TYPE_MENTION = u'mention' # not used
 
    TYPE_REGISTRATION = u'registration'
 
    TYPE_PULL_REQUEST = u'pull_request'
 
    TYPE_PULL_REQUEST_COMMENT = u'pull_request_comment'
 

	
 
    def create(self, created_by, subject, body, recipients=None,
 
               type_=Notification.TYPE_MESSAGE, with_email=True,
 
               type_=TYPE_MESSAGE, with_email=True,
 
               email_kwargs=None, repo_name=None):
 
        """
 

	
 
        Creates notification of given type
 

	
 
        :param created_by: int, str or User instance. User who created this
 
            notification
 
        :param subject:
 
        :param body:
 
        :param recipients: list of int, str or User objects, when None
 
            is given send to all admins
 
        :param type_: type of notification
 
        :param with_email: send email with this notification
 
        :param email_kwargs: additional dict to pass as args to email template
 
        """
 
        from kallithea.lib.celerylib import tasks
 
        email_kwargs = email_kwargs or {}
 
        if recipients and not getattr(recipients, '__iter__', False):
 
            raise Exception('recipients must be a list or iterable')
 

	
 
        created_by_obj = User.guess_instance(created_by)
 

	
 
        recipients_objs = set()
 
        if recipients:
 
            for u in recipients:
 
                obj = User.guess_instance(u)
 
                if obj is not None:
 
                    recipients_objs.add(obj)
 
                else:
 
                    # TODO: inform user that requested operation couldn't be completed
 
                    log.error('cannot email unknown user %r', u)
 
            log.debug('sending notifications %s to %s',
 
                type_, recipients_objs
 
            )
 
        elif recipients is None:
 
            # empty recipients means to all admins
 
            recipients_objs = User.query().filter(User.admin == True).all()
 
            log.debug('sending notifications %s to admins: %s',
 
                type_, recipients_objs
 
            )
 
        #else: silently skip notification mails?
 

	
 
        if not with_email:
 
            return
 

	
 
        headers = {}
 
        headers['X-Kallithea-Notification-Type'] = type_
 
        if 'threading' in email_kwargs:
 
            headers['References'] = ' '.join('<%s>' % x for x in email_kwargs['threading'])
 

	
 
        # this is passed into template
 
        created_on = h.fmt_date(datetime.datetime.now())
 
        html_kwargs = {
 
                  'subject': subject,
 
                  'body': h.render_w_mentions(body, repo_name),
 
                  'when': created_on,
 
                  'user': created_by_obj.username,
 
                  }
 

	
 
        txt_kwargs = {
 
                  'subject': subject,
 
                  'body': body,
 
                  'when': created_on,
 
                  'user': created_by_obj.username,
 
                  }
 

	
 
        html_kwargs.update(email_kwargs)
 
        txt_kwargs.update(email_kwargs)
 
        email_subject = EmailNotificationModel() \
 
                            .get_email_description(type_, **txt_kwargs)
 
        email_txt_body = EmailNotificationModel() \
 
                            .get_email_tmpl(type_, 'txt', **txt_kwargs)
 
        email_html_body = EmailNotificationModel() \
 
                            .get_email_tmpl(type_, 'html', **html_kwargs)
 

	
 
        # don't send email to person who created this comment
 
        rec_objs = set(recipients_objs).difference(set([created_by_obj]))
 

	
 
        # send email with notification to all other participants
 
        for rec in rec_objs:
 
            tasks.send_email([rec.email], email_subject, email_txt_body,
 
                     email_html_body, headers, author=created_by_obj)
 

	
 

	
 
class EmailNotificationModel(object):
 

	
 
    TYPE_CHANGESET_COMMENT = Notification.TYPE_CHANGESET_COMMENT
 
    TYPE_MESSAGE = Notification.TYPE_MESSAGE # only used for testing
 
    # Notification.TYPE_MENTION is not used
 
    TYPE_CHANGESET_COMMENT = NotificationModel.TYPE_CHANGESET_COMMENT
 
    TYPE_MESSAGE = NotificationModel.TYPE_MESSAGE # only used for testing
 
    # NotificationModel.TYPE_MENTION is not used
 
    TYPE_PASSWORD_RESET = 'password_link'
 
    TYPE_REGISTRATION = Notification.TYPE_REGISTRATION
 
    TYPE_PULL_REQUEST = Notification.TYPE_PULL_REQUEST
 
    TYPE_PULL_REQUEST_COMMENT = Notification.TYPE_PULL_REQUEST_COMMENT
 
    TYPE_REGISTRATION = NotificationModel.TYPE_REGISTRATION
 
    TYPE_PULL_REQUEST = NotificationModel.TYPE_PULL_REQUEST
 
    TYPE_PULL_REQUEST_COMMENT = NotificationModel.TYPE_PULL_REQUEST_COMMENT
 
    TYPE_DEFAULT = 'default'
 

	
 
    def __init__(self):
 
        super(EmailNotificationModel, self).__init__()
 
        self._template_root = kallithea.CONFIG['paths']['templates'][0]
 
        self._tmpl_lookup = app_globals.mako_lookup
 
        self.email_types = {
 
            self.TYPE_CHANGESET_COMMENT: 'changeset_comment',
 
            self.TYPE_PASSWORD_RESET: 'password_reset',
 
            self.TYPE_REGISTRATION: 'registration',
 
            self.TYPE_DEFAULT: 'default',
 
            self.TYPE_PULL_REQUEST: 'pull_request',
 
            self.TYPE_PULL_REQUEST_COMMENT: 'pull_request_comment',
 
        }
 
        self._subj_map = {
 
            self.TYPE_CHANGESET_COMMENT: _('[Comment] %(repo_name)s changeset %(short_id)s "%(message_short)s" on %(branch)s'),
 
            self.TYPE_MESSAGE: 'Test Message',
 
            # self.TYPE_PASSWORD_RESET
 
            self.TYPE_REGISTRATION: _('New user %(new_username)s registered'),
 
            # self.TYPE_DEFAULT
 
            self.TYPE_PULL_REQUEST: _('[Review] %(repo_name)s PR %(pr_nice_id)s "%(pr_title_short)s" from %(pr_source_branch)s by %(pr_owner_username)s'),
 
            self.TYPE_PULL_REQUEST_COMMENT: _('[Comment] %(repo_name)s PR %(pr_nice_id)s "%(pr_title_short)s" from %(pr_source_branch)s by %(pr_owner_username)s'),
 
        }
 

	
 
    def get_email_description(self, type_, **kwargs):
 
        """
 
        return subject for email based on given type
 
        """
 
        tmpl = self._subj_map[type_]
 
        try:
 
            subj = tmpl % kwargs
 
        except KeyError as e:
 
            log.error('error generating email subject for %r from %s: %s', type_, ','.join(self._subj_map.keys()), e)
 
            raise
 
        l = [safe_unicode(x) for x in [kwargs.get('status_change'), kwargs.get('closing_pr') and _('Closing')] if x]
 
        if l:
 
            if subj.startswith('['):
 
                subj = '[' + ', '.join(l) + ': ' + subj[1:]
 
            else:
 
                subj = '[' + ', '.join(l) + '] ' + subj
 
        return subj
 

	
 
    def get_email_tmpl(self, type_, content_type, **kwargs):
 
        """
 
        return generated template for email based on given type
 
        """
 

	
 
        base = 'email_templates/' + self.email_types.get(type_, self.email_types[self.TYPE_DEFAULT]) + '.' + content_type
 
        email_template = self._tmpl_lookup.get_template(base)
 
        # translator and helpers inject
 
        _kwargs = {'_': _,
 
                   'h': h,
 
                   'c': c}
 
        _kwargs.update(kwargs)
 
        if content_type == 'html':
 
            _kwargs.update({
 
                "color_text": "#202020",
 
                "color_emph": "#395fa0",
 
                "color_link": "#395fa0",
 
                "color_border": "#ddd",
 
                "color_background_grey": "#f9f9f9",
 
                "color_button": "#395fa0",
 
                "monospace_style": "font-family:Lucida Console,Consolas,Monaco,Inconsolata,Liberation Mono,monospace",
 
                "sans_style": "font-family:Helvetica,Arial,sans-serif",
 
                })
 
            _kwargs.update({
 
                "default_style": "%(sans_style)s;font-weight:200;font-size:14px;line-height:17px;color:%(color_text)s" % _kwargs,
 
                "comment_style": "%(monospace_style)s;white-space:pre-wrap" % _kwargs,
 
                "data_style": "border:%(color_border)s 1px solid;background:%(color_background_grey)s" % _kwargs,
 
                "emph_style": "font-weight:600;color:%(color_emph)s" % _kwargs,
 
                "link_style": "color:%(color_link)s;text-decoration:none" % _kwargs,
 
                "link_text_style": "color:%(color_text)s;text-decoration:none;border:%(color_border)s 1px solid;background:%(color_background_grey)s" % _kwargs,
 
                })
 

	
 
        log.debug('rendering tmpl %s with kwargs %s', base, _kwargs)
 
        return email_template.render_unicode(**_kwargs)
kallithea/model/pull_request.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.pull_request
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
pull request model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 6, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import datetime
 
import re
 

	
 
from tg import request
 
from tg.i18n import ugettext as _
 

	
 
from sqlalchemy.orm import joinedload
 

	
 
from kallithea.model.meta import Session
 
from kallithea.lib import helpers as h
 
from kallithea.model.db import PullRequest, PullRequestReviewer, Notification, \
 
from kallithea.model.db import PullRequest, PullRequestReviewer, \
 
    ChangesetStatus, User
 
from kallithea.model.notification import NotificationModel
 
from kallithea.lib.utils2 import extract_mentioned_users, safe_str, safe_unicode
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def _assert_valid_reviewers(seq):
 
    """Sanity check: elements are actual User objects, and not the default user."""
 
    assert not any(user.is_default_user for user in seq)
 

	
 

	
 
class PullRequestModel(object):
 

	
 
    def add_reviewers(self, user, pr, reviewers, mention_recipients=None):
 
        """Add reviewer and send notification to them.
 
        """
 
        reviewers = set(reviewers)
 
        _assert_valid_reviewers(reviewers)
 
        if mention_recipients is not None:
 
            mention_recipients = set(mention_recipients) - reviewers
 
            _assert_valid_reviewers(mention_recipients)
 

	
 
        # members
 
        for reviewer in reviewers:
 
            prr = PullRequestReviewer(reviewer, pr)
 
            Session().add(prr)
 

	
 
        # notification to reviewers
 
        pr_url = pr.url(canonical=True)
 
        threading = ['%s-pr-%s@%s' % (pr.other_repo.repo_name,
 
                                      pr.pull_request_id,
 
                                      h.canonical_hostname())]
 
        subject = safe_unicode(
 
            h.link_to(
 
              _('%(user)s wants you to review pull request %(pr_nice_id)s: %(pr_title)s') % \
 
                {'user': user.username,
 
                 'pr_title': pr.title,
 
                 'pr_nice_id': pr.nice_id()},
 
                pr_url)
 
            )
 
        body = pr.description
 
        _org_ref_type, org_ref_name, _org_rev = pr.org_ref.split(':')
 
        _other_ref_type, other_ref_name, _other_rev = pr.other_ref.split(':')
 
        revision_data = [(x.raw_id, x.message)
 
                         for x in map(pr.org_repo.get_changeset, pr.revisions)]
 
        email_kwargs = {
 
            'pr_title': pr.title,
 
            'pr_title_short': h.shorter(pr.title, 50),
 
            'pr_user_created': user.full_name_and_username,
 
            'pr_repo_url': h.canonical_url('summary_home', repo_name=pr.other_repo.repo_name),
 
            'pr_url': pr_url,
 
            'pr_revisions': revision_data,
 
            'repo_name': pr.other_repo.repo_name,
 
            'org_repo_name': pr.org_repo.repo_name,
 
            'pr_nice_id': pr.nice_id(),
 
            'pr_target_repo': h.canonical_url('summary_home',
 
                               repo_name=pr.other_repo.repo_name),
 
            'pr_target_branch': other_ref_name,
 
            'pr_source_repo': h.canonical_url('summary_home',
 
                               repo_name=pr.org_repo.repo_name),
 
            'pr_source_branch': org_ref_name,
 
            'pr_owner': pr.owner,
 
            'pr_owner_username': pr.owner.username,
 
            'pr_username': user.username,
 
            'threading': threading,
 
            'is_mention': False,
 
            }
 
        if reviewers:
 
            NotificationModel().create(created_by=user, subject=subject, body=body,
 
                                       recipients=reviewers,
 
                                       type_=Notification.TYPE_PULL_REQUEST,
 
                                       type_=NotificationModel.TYPE_PULL_REQUEST,
 
                                       email_kwargs=email_kwargs)
 

	
 
        if mention_recipients:
 
            email_kwargs['is_mention'] = True
 
            subject = _('[Mention]') + ' ' + subject
 
            # FIXME: this subject is wrong and unused!
 
            NotificationModel().create(created_by=user, subject=subject, body=body,
 
                                       recipients=mention_recipients,
 
                                       type_=Notification.TYPE_PULL_REQUEST,
 
                                       type_=NotificationModel.TYPE_PULL_REQUEST,
 
                                       email_kwargs=email_kwargs)
 

	
 
    def mention_from_description(self, user, pr, old_description=''):
 
        mention_recipients = (extract_mentioned_users(pr.description) -
 
                              extract_mentioned_users(old_description))
 

	
 
        log.debug("Mentioning %s", mention_recipients)
 
        self.add_reviewers(user, pr, set(), mention_recipients)
 

	
 
    def remove_reviewers(self, user, pull_request, reviewers):
 
        """Remove specified users from being reviewers of the PR."""
 
        if not reviewers:
 
            return # avoid SQLAlchemy warning about empty sequence for IN-predicate
 

	
 
        PullRequestReviewer.query() \
 
            .filter_by(pull_request=pull_request) \
 
            .filter(PullRequestReviewer.user_id.in_(r.user_id for r in reviewers)) \
 
            .delete(synchronize_session='fetch') # the default of 'evaluate' is not available
 

	
 
    def delete(self, pull_request):
 
        pull_request = PullRequest.guess_instance(pull_request)
 
        Session().delete(pull_request)
 
        if pull_request.org_repo.scm_instance.alias == 'git':
 
            # remove a ref under refs/pull/ so that commits can be garbage-collected
 
            try:
 
                del pull_request.org_repo.scm_instance._repo["refs/pull/%d/head" % pull_request.pull_request_id]
 
            except KeyError:
 
                pass
 

	
 
    def close_pull_request(self, pull_request):
 
        pull_request = PullRequest.guess_instance(pull_request)
 
        pull_request.status = PullRequest.STATUS_CLOSED
 
        pull_request.updated_on = datetime.datetime.now()
 

	
 

	
 
class CreatePullRequestAction(object):
 

	
 
    class ValidationError(Exception):
 
        pass
 

	
 
    class Empty(ValidationError):
 
        pass
 

	
 
    class AmbiguousAncestor(ValidationError):
 
        pass
 

	
 
    class Unauthorized(ValidationError):
 
        pass
 

	
 
    @staticmethod
 
    def is_user_authorized(org_repo, other_repo):
 
        """Performs authorization check with only the minimum amount of
 
        information needed for such a check, rather than a full command
 
        object.
 
        """
 
        if (h.HasRepoPermissionLevel('read')(org_repo.repo_name) and
 
            h.HasRepoPermissionLevel('read')(other_repo.repo_name)):
 
            return True
 

	
 
        return False
 

	
 
    def __init__(self, org_repo, other_repo, org_ref, other_ref, title, description, owner, reviewers):
 
        from kallithea.controllers.compare import CompareController
 
        reviewers = set(reviewers)
 
        _assert_valid_reviewers(reviewers)
 

	
 
        (org_ref_type,
 
         org_ref_name,
 
         org_rev) = org_ref.split(':')
 
        org_display = h.short_ref(org_ref_type, org_ref_name)
 
        if org_ref_type == 'rev':
 
            cs = org_repo.scm_instance.get_changeset(org_rev)
 
            org_ref = 'branch:%s:%s' % (cs.branch, cs.raw_id)
 

	
 
        (other_ref_type,
 
         other_ref_name,
 
         other_rev) = other_ref.split(':')
 
        if other_ref_type == 'rev':
 
            cs = other_repo.scm_instance.get_changeset(other_rev)
 
            other_ref_name = cs.raw_id[:12]
 
            other_ref = '%s:%s:%s' % (other_ref_type, other_ref_name, cs.raw_id)
 
        other_display = h.short_ref(other_ref_type, other_ref_name)
 

	
 
        cs_ranges, _cs_ranges_not, ancestor_revs = \
 
            CompareController._get_changesets(org_repo.scm_instance.alias,
 
                                              other_repo.scm_instance, other_rev, # org and other "swapped"
 
                                              org_repo.scm_instance, org_rev,
 
                                              )
 
        if not cs_ranges:
 
            raise self.Empty(_('Cannot create empty pull request'))
 

	
 
        if not ancestor_revs:
 
            ancestor_rev = org_repo.scm_instance.EMPTY_CHANGESET
 
        elif len(ancestor_revs) == 1:
 
            ancestor_rev = ancestor_revs[0]
 
        else:
kallithea/model/user.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.user
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
users model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 9, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import hashlib
 
import hmac
 
import logging
 
import time
 
import traceback
 

	
 
from tg import config
 
from tg.i18n import ugettext as _
 

	
 
from sqlalchemy.exc import DatabaseError
 

	
 
from kallithea.lib.utils2 import safe_str, generate_api_key, get_current_authuser
 
from kallithea.lib.caching_query import FromCache
 
from kallithea.model.db import Permission, User, UserToPerm, Notification, \
 
from kallithea.model.db import Permission, User, UserToPerm, \
 
    UserEmailMap, UserIpMap
 
from kallithea.lib.exceptions import DefaultUserException, \
 
    UserOwnsReposException
 
from kallithea.model.meta import Session
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UserModel(object):
 
    password_reset_token_lifetime = 86400 # 24 hours
 

	
 
    def get(self, user_id, cache=False):
 
        user = User.query()
 
        if cache:
 
            user = user.options(FromCache("sql_cache_short",
 
                                          "get_user_%s" % user_id))
 
        return user.get(user_id)
 

	
 
    def get_user(self, user):
 
        return User.guess_instance(user)
 

	
 
    def create(self, form_data, cur_user=None):
 
        if not cur_user:
 
            cur_user = getattr(get_current_authuser(), 'username', None)
 

	
 
        from kallithea.lib.hooks import log_create_user, \
 
            check_allowed_create_user
 
        _fd = form_data
 
        user_data = {
 
            'username': _fd['username'],
 
            'password': _fd['password'],
 
            'email': _fd['email'],
 
            'firstname': _fd['firstname'],
 
            'lastname': _fd['lastname'],
 
            'active': _fd['active'],
 
            'admin': False
 
        }
 
        # raises UserCreationError if it's not allowed
 
        check_allowed_create_user(user_data, cur_user)
 
        from kallithea.lib.auth import get_crypt_password
 

	
 
        new_user = User()
 
        for k, v in form_data.items():
 
            if k == 'password':
 
                v = get_crypt_password(v)
 
            if k == 'firstname':
 
                k = 'name'
 
            setattr(new_user, k, v)
 

	
 
        new_user.api_key = generate_api_key()
 
        Session().add(new_user)
 
        Session().flush() # make database assign new_user.user_id
 

	
 
        log_create_user(new_user.get_dict(), cur_user)
 
        return new_user
 

	
 
    def create_or_update(self, username, password, email, firstname=u'',
 
                         lastname=u'', active=True, admin=False,
 
                         extern_type=None, extern_name=None, cur_user=None):
 
        """
 
        Creates a new instance if not found, or updates current one
 

	
 
        :param username:
 
        :param password:
 
        :param email:
 
        :param active:
 
        :param firstname:
 
        :param lastname:
 
        :param active:
 
        :param admin:
 
        :param extern_name:
 
        :param extern_type:
 
        :param cur_user:
 
        """
 
        if not cur_user:
 
            cur_user = getattr(get_current_authuser(), 'username', None)
 

	
 
        from kallithea.lib.auth import get_crypt_password, check_password
 
        from kallithea.lib.hooks import log_create_user, \
 
            check_allowed_create_user
 
        user_data = {
 
            'username': username, 'password': password,
 
            'email': email, 'firstname': firstname, 'lastname': lastname,
 
            'active': active, 'admin': admin
 
        }
 
        # raises UserCreationError if it's not allowed
 
        check_allowed_create_user(user_data, cur_user)
 

	
 
        log.debug('Checking for %s account in Kallithea database', username)
 
        user = User.get_by_username(username, case_insensitive=True)
 
        if user is None:
 
            log.debug('creating new user %s', username)
 
            new_user = User()
 
            edit = False
 
        else:
 
            log.debug('updating user %s', username)
 
            new_user = user
 
            edit = True
 

	
 
        try:
 
            new_user.username = username
 
            new_user.admin = admin
 
            new_user.email = email
 
            new_user.active = active
 
            new_user.extern_name = safe_str(extern_name) \
 
                if extern_name else None
 
            new_user.extern_type = safe_str(extern_type) \
 
                if extern_type else None
 
            new_user.name = firstname
 
            new_user.lastname = lastname
 

	
 
            if not edit:
 
                new_user.api_key = generate_api_key()
 

	
 
            # set password only if creating an user or password is changed
 
            password_change = new_user.password and \
 
                not check_password(password, new_user.password)
 
            if not edit or password_change:
 
                reason = 'new password' if edit else 'new user'
 
                log.debug('Updating password reason=>%s', reason)
 
                new_user.password = get_crypt_password(password) \
 
                    if password else ''
 

	
 
            if user is None:
 
                Session().add(new_user)
 
                Session().flush() # make database assign new_user.user_id
 

	
 
            if not edit:
 
                log_create_user(new_user.get_dict(), cur_user)
 

	
 
            return new_user
 
        except (DatabaseError,):
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def create_registration(self, form_data):
 
        from kallithea.model.notification import NotificationModel
 
        import kallithea.lib.helpers as h
 

	
 
        form_data['admin'] = False
 
        form_data['extern_type'] = User.DEFAULT_AUTH_TYPE
 
        form_data['extern_name'] = ''
 
        new_user = self.create(form_data)
 

	
 
        # notification to admins
 
        subject = _('New user registration')
 
        body = (
 
            u'New user registration\n'
 
            '---------------------\n'
 
            '- Username: {user.username}\n'
 
            '- Full Name: {user.full_name}\n'
 
            '- Email: {user.email}\n'
 
            ).format(user=new_user)
 
        edit_url = h.canonical_url('edit_user', id=new_user.user_id)
 
        email_kwargs = {
 
            'registered_user_url': edit_url,
 
            'new_username': new_user.username,
 
            'new_email': new_user.email,
 
            'new_full_name': new_user.full_name}
 
        NotificationModel().create(created_by=new_user, subject=subject,
 
                                   body=body, recipients=None,
 
                                   type_=Notification.TYPE_REGISTRATION,
 
                                   type_=NotificationModel.TYPE_REGISTRATION,
 
                                   email_kwargs=email_kwargs)
 

	
 
    def update(self, user_id, form_data, skip_attrs=None):
 
        from kallithea.lib.auth import get_crypt_password
 
        skip_attrs = skip_attrs or []
 
        user = self.get(user_id, cache=False)
 
        if user.is_default_user:
 
            raise DefaultUserException(
 
                            _("You can't edit this user since it's "
 
                              "crucial for entire application"))
 

	
 
        for k, v in form_data.items():
 
            if k in skip_attrs:
 
                continue
 
            if k == 'new_password' and v:
 
                user.password = get_crypt_password(v)
 
            else:
 
                # old legacy thing orm models store firstname as name,
 
                # need proper refactor to username
 
                if k == 'firstname':
 
                    k = 'name'
 
                setattr(user, k, v)
 

	
 
    def update_user(self, user, **kwargs):
 
        from kallithea.lib.auth import get_crypt_password
 

	
 
        user = User.guess_instance(user)
 
        if user.is_default_user:
 
            raise DefaultUserException(
 
                _("You can't edit this user since it's"
 
                  " crucial for entire application")
 
            )
 

	
 
        for k, v in kwargs.items():
 
            if k == 'password' and v:
 
                v = get_crypt_password(v)
 

	
 
            setattr(user, k, v)
 
        return user
 

	
 
    def delete(self, user, cur_user=None):
 
        if cur_user is None:
 
            cur_user = getattr(get_current_authuser(), 'username', None)
 
        user = User.guess_instance(user)
 

	
 
        if user.is_default_user:
 
            raise DefaultUserException(
 
                _("You can't remove this user since it is"
 
                  " crucial for the entire application"))
 
        if user.repositories:
 
            repos = [x.repo_name for x in user.repositories]
 
            raise UserOwnsReposException(
 
                _('User "%s" still owns %s repositories and cannot be '
 
                  'removed. Switch owners or remove those repositories: %s')
 
                % (user.username, len(repos), ', '.join(repos)))
 
        if user.repo_groups:
 
            repogroups = [x.group_name for x in user.repo_groups]
 
            raise UserOwnsReposException(_(
 
                'User "%s" still owns %s repository groups and cannot be '
 
                'removed. Switch owners or remove those repository groups: %s')
 
                % (user.username, len(repogroups), ', '.join(repogroups)))
 
        if user.user_groups:
 
            usergroups = [x.users_group_name for x in user.user_groups]
 
            raise UserOwnsReposException(
 
                _('User "%s" still owns %s user groups and cannot be '
 
                  'removed. Switch owners or remove those user groups: %s')
 
                % (user.username, len(usergroups), ', '.join(usergroups)))
 
        Session().delete(user)
 

	
 
        from kallithea.lib.hooks import log_delete_user
 
        log_delete_user(user.get_dict(), cur_user)
 

	
 
    def can_change_password(self, user):
 
        from kallithea.lib import auth_modules
 
        managed_fields = auth_modules.get_managed_fields(user)
 
        return 'password' not in managed_fields
 

	
 
    def get_reset_password_token(self, user, timestamp, session_id):
 
        """
 
        The token is a 40-digit hexstring, calculated as a HMAC-SHA1.
 

	
 
        In a traditional HMAC scenario, an attacker is unable to know or
 
        influence the secret key, but can know or influence the message
 
        and token. This scenario is slightly different (in particular
 
        since the message sender is also the message recipient), but
 
        sufficiently similar to use an HMAC. Benefits compared to a plain
 
        SHA1 hash includes resistance against a length extension attack.
 

	
 
        The HMAC key consists of the following values (known only to the
 
        server and authorized users):
 

	
 
        * per-application secret (the `app_instance_uuid` setting), without
 
          which an attacker cannot counterfeit tokens
 
        * hashed user password, invalidating the token upon password change
 

	
 
        The HMAC message consists of the following values (potentially known
kallithea/tests/models/test_notifications.py
Show inline comments
 
import os
 
import re
 

	
 
import mock
 
import routes.util
 

	
 
from kallithea.tests.base import *
 
from kallithea.lib import helpers as h
 
from kallithea.model.db import User, Notification
 
from kallithea.model.db import User
 
from kallithea.model.user import UserModel
 
from kallithea.model.meta import Session
 
from kallithea.model.notification import NotificationModel, EmailNotificationModel
 

	
 
import kallithea.lib.celerylib
 
import kallithea.lib.celerylib.tasks
 

	
 
from tg.util.webtest import test_context
 

	
 

	
 
class TestNotifications(TestController):
 

	
 
    def setup_method(self, method):
 
        Session.remove()
 
        u1 = UserModel().create_or_update(username=u'u1',
 
                                        password=u'qweqwe',
 
                                        email=u'u1@example.com',
 
                                        firstname=u'u1', lastname=u'u1')
 
        Session().commit()
 
        self.u1 = u1.user_id
 

	
 
        u2 = UserModel().create_or_update(username=u'u2',
 
                                        password=u'qweqwe',
 
                                        email=u'u2@example.com',
 
                                        firstname=u'u2', lastname=u'u3')
 
        Session().commit()
 
        self.u2 = u2.user_id
 

	
 
        u3 = UserModel().create_or_update(username=u'u3',
 
                                        password=u'qweqwe',
 
                                        email=u'u3@example.com',
 
                                        firstname=u'u3', lastname=u'u3')
 
        Session().commit()
 
        self.u3 = u3.user_id
 

	
 
    def test_create_notification(self):
 
        with test_context(self.app):
 
            usrs = [self.u1, self.u2]
 

	
 
            def send_email(recipients, subject, body='', html_body='', headers=None, author=None):
 
                assert recipients == ['u2@example.com']
 
                assert subject == 'Test Message'
 
                assert body == u"hi there"
 
                assert '>hi there<' in html_body
 
                assert author.username == 'u1'
 
            with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', send_email):
 
                NotificationModel().create(created_by=self.u1,
 
                                                   subject=u'subj', body=u'hi there',
 
                                                   recipients=usrs)
 

	
 
    @mock.patch.object(h, 'canonical_url', (lambda arg, **kwargs: 'http://%s/?%s' % (arg, '&'.join('%s=%s' % (k, v) for (k, v) in sorted(kwargs.items())))))
 
    def test_dump_html_mails(self):
 
        # Exercise all notification types and dump them to one big html file
 
        l = []
 

	
 
        def send_email(recipients, subject, body='', html_body='', headers=None, author=None):
 
            l.append('<hr/>\n')
 
            l.append('<h1>%s</h1>\n' % desc) # desc is from outer scope
 
            l.append('<pre>\n')
 
            l.append('From: %s\n' % author.username)
 
            l.append('To: %s\n' % ' '.join(recipients))
 
            l.append('Subject: %s\n' % subject)
 
            l.append('</pre>\n')
 
            l.append('<hr/>\n')
 
            l.append('<pre>%s</pre>\n' % body)
 
            l.append('<hr/>\n')
 
            l.append(html_body)
 
            l.append('<hr/>\n')
 

	
 
        with test_context(self.app):
 
            with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', send_email):
 
                pr_kwargs = dict(
 
                    pr_nice_id='#7',
 
                    pr_title='The Title',
 
                    pr_title_short='The Title',
 
                    pr_url='http://pr.org/7',
 
                    pr_target_repo='http://mainline.com/repo',
 
                    pr_target_branch='trunk',
 
                    pr_source_repo='https://dev.org/repo',
 
                    pr_source_branch='devbranch',
 
                    pr_owner=User.get(self.u2),
 
                    pr_owner_username='u2'
 
                    )
 

	
 
                for type_, body, kwargs in [
 
                        (Notification.TYPE_CHANGESET_COMMENT,
 
                        (NotificationModel.TYPE_CHANGESET_COMMENT,
 
                         u'This is the new \'comment\'.\n\n - and here it ends indented.',
 
                         dict(
 
                            short_id='cafe1234',
 
                            raw_id='cafe1234c0ffeecafe',
 
                            branch='brunch',
 
                            cs_comment_user='Opinionated User (jsmith)',
 
                            cs_comment_url='http://comment.org',
 
                            is_mention=[False, True],
 
                            message='This changeset did something clever which is hard to explain',
 
                            message_short='This changeset did something cl...',
 
                            status_change=[None, 'Approved'],
 
                            cs_target_repo='http://example.com/repo_target',
 
                            cs_url='http://changeset.com',
 
                            cs_author=User.get(self.u2))),
 
                        (Notification.TYPE_MESSAGE,
 
                        (NotificationModel.TYPE_MESSAGE,
 
                         u'This is the \'body\' of the "test" message\n - nothing interesting here except indentation.',
 
                         dict()),
 
                        #(Notification.TYPE_MENTION, '$body', None), # not used
 
                        (Notification.TYPE_REGISTRATION,
 
                        #(NotificationModel.TYPE_MENTION, '$body', None), # not used
 
                        (NotificationModel.TYPE_REGISTRATION,
 
                         u'Registration body',
 
                         dict(
 
                            new_username='newbie',
 
                            registered_user_url='http://newbie.org',
 
                            new_email='new@email.com',
 
                            new_full_name='New Full Name')),
 
                        (Notification.TYPE_PULL_REQUEST,
 
                        (NotificationModel.TYPE_PULL_REQUEST,
 
                         u'This PR is \'awesome\' because it does <stuff>\n - please approve indented!',
 
                         dict(
 
                            pr_user_created='Requesting User (root)', # pr_owner should perhaps be used for @mention in description ...
 
                            is_mention=[False, True],
 
                            pr_revisions=[('123abc'*7, "Introduce one and two\n\nand that's it"), ('567fed'*7, 'Make one plus two equal tree')],
 
                            org_repo_name='repo_org',
 
                            **pr_kwargs)),
 
                        (Notification.TYPE_PULL_REQUEST_COMMENT,
 
                        (NotificationModel.TYPE_PULL_REQUEST_COMMENT,
 
                         u'Me too!\n\n - and indented on second line',
 
                         dict(
 
                            closing_pr=[False, True],
 
                            is_mention=[False, True],
 
                            pr_comment_user='Opinionated User (jsmith)',
 
                            pr_comment_url='http://pr.org/comment',
 
                            status_change=[None, 'Under Review'],
 
                            **pr_kwargs)),
 
                        ]:
 
                    kwargs['repo_name'] = u'repo/name'
 
                    params = [(type_, type_, body, kwargs)]
 
                    for param_name in ['is_mention', 'status_change', 'closing_pr']: # TODO: inline/general
 
                        if not isinstance(kwargs.get(param_name), list):
 
                            continue
 
                        new_params = []
 
                        for v in kwargs[param_name]:
 
                            for desc, type_, body, kwargs in params:
 
                                kwargs = dict(kwargs)
 
                                kwargs[param_name] = v
 
                                new_params.append(('%s, %s=%r' % (desc, param_name, v), type_, body, kwargs))
 
                        params = new_params
 

	
 
                    for desc, type_, body, kwargs in params:
 
                        # desc is used as "global" variable
 
                        NotificationModel().create(created_by=self.u1,
 
                                                           subject=u'unused', body=body, email_kwargs=kwargs,
 
                                                           recipients=[self.u2], type_=type_)
 

	
 
                # Email type TYPE_PASSWORD_RESET has no corresponding notification type - test it directly:
 
                desc = 'TYPE_PASSWORD_RESET'
 
                kwargs = dict(user='John Doe', reset_token='decbf64715098db5b0bd23eab44bd792670ab746', reset_url='http://reset.com/decbf64715098db5b0bd23eab44bd792670ab746')
 
                kallithea.lib.celerylib.tasks.send_email(['john@doe.com'],
 
                    "Password reset link",
 
                    EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'txt', **kwargs),
 
                    EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'html', **kwargs),
 
                    author=User.get(self.u1))
 

	
 
        out = '<!doctype html>\n<html lang="en">\n<head><title>Notifications</title><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"></head>\n<body>\n%s\n</body>\n</html>\n' % \
 
            re.sub(r'<(/?(?:!doctype|html|head|title|meta|body)\b[^>]*)>', r'<!--\1-->', ''.join(l))
 

	
 
        outfn = os.path.join(os.path.dirname(__file__), 'test_dump_html_mails.out.html')
 
        reffn = os.path.join(os.path.dirname(__file__), 'test_dump_html_mails.ref.html')
 
        with open(outfn, 'w') as f:
 
            f.write(out)
 
        with open(reffn) as f:
 
            ref = f.read()
 
        assert ref == out # copy test_dump_html_mails.out.html to test_dump_html_mails.ref.html to update expectations
 
        os.unlink(outfn)
0 comments (0 inline, 0 general)