Changeset - cd6c577ade97
[Not reviewed]
default
0 4 0
Thomas De Schampheleire - 7 years ago 2018-12-05 21:27:58
thomas.de_schampheleire@nokia.com
model: remove UI notification feature

This commit is part of the removal of the UI notification feature from
Kallithea, which is not deemed useful in its current form. Only email
notifications are preserved.
4 files changed with 13 insertions and 213 deletions:
0 comments (0 inline, 0 general)
docs/api/models.rst
Show inline comments
 
.. _models:
 

	
 
========================
 
The :mod:`models` module
 
========================
 

	
 
.. automodule:: kallithea.model
 
   :members:
 

	
 
.. automodule:: kallithea.model.comment
 
   :members:
 

	
 
.. automodule:: kallithea.model.notification
 
   :members:
 

	
 
.. automodule:: kallithea.model.permission
 
   :members:
 

	
 
.. automodule:: kallithea.model.repo_permission
 
   :members:
 

	
 
.. automodule:: kallithea.model.repo
 
   :members:
 

	
 
.. automodule:: kallithea.model.repo_group
 
   :members:
 

	
 
.. automodule:: kallithea.model.scm
 
   :members:
 

	
 
.. automodule:: kallithea.model.user
 
   :members:
 

	
 
.. automodule:: kallithea.model.user_group
 
   :members:
kallithea/model/db.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.db
 
~~~~~~~~~~~~~~~~~~
 

	
 
Database Models for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 08, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import time
 
import logging
 
import datetime
 
import traceback
 
import hashlib
 
import collections
 
import functools
 

	
 
import sqlalchemy
 
from sqlalchemy import *
 
from sqlalchemy.ext.hybrid import hybrid_property
 
from sqlalchemy.orm import relationship, joinedload, class_mapper, validates
 
from beaker.cache import cache_region, region_invalidate
 
from webob.exc import HTTPNotFound
 

	
 
from tg.i18n import lazy_ugettext as _
 

	
 
from kallithea.lib.exceptions import DefaultUserException
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 

	
 
from kallithea.lib.utils2 import str2bool, safe_str, get_changeset_safe, \
 
    safe_unicode, remove_prefix, time_to_datetime, aslist, Optional, safe_int, \
 
    get_clone_url, urlreadable
 
from kallithea.lib.compat import json
 
from kallithea.lib.caching_query import FromCache
 

	
 
from kallithea.model.meta import Base, Session
 

	
 
URL_SEP = '/'
 
log = logging.getLogger(__name__)
 

	
 
#==============================================================================
 
# BASE CLASSES
 
#==============================================================================
 

	
 
_hash_key = lambda k: hashlib.md5(safe_str(k)).hexdigest()
 

	
 

	
 
class BaseDbModel(object):
 
    """
 
    Base Model for all classes
 
    """
 

	
 
    @classmethod
 
    def _get_keys(cls):
 
        """return column names for this model """
 
        return class_mapper(cls).c.keys()
 

	
 
    def get_dict(self):
 
        """
 
        return dict with keys and values corresponding
 
        to this model data """
 

	
 
        d = {}
 
        for k in self._get_keys():
 
            d[k] = getattr(self, k)
 

	
 
        # also use __json__() if present to get additional fields
 
        _json_attr = getattr(self, '__json__', None)
 
        if _json_attr:
 
            # update with attributes from __json__
 
            if callable(_json_attr):
 
                _json_attr = _json_attr()
 
            for k, val in _json_attr.iteritems():
 
                d[k] = val
 
        return d
 

	
 
    def get_appstruct(self):
 
        """return list with keys and values tuples corresponding
 
        to this model data """
 

	
 
        return [
 
            (k, getattr(self, k))
 
            for k in self._get_keys()
 
        ]
 

	
 
    def populate_obj(self, populate_dict):
 
        """populate model with data from given populate_dict"""
 

	
 
        for k in self._get_keys():
 
            if k in populate_dict:
 
                setattr(self, k, populate_dict[k])
 

	
 
    @classmethod
 
    def query(cls):
 
        return Session().query(cls)
 

	
 
    @classmethod
 
    def get(cls, id_):
 
        if id_:
 
            return cls.query().get(id_)
 

	
 
    @classmethod
 
    def guess_instance(cls, value, callback=None):
 
        """Haphazardly attempt to convert `value` to a `cls` instance.
 

	
 
        If `value` is None or already a `cls` instance, return it. If `value`
 
        is a number (or looks like one if you squint just right), assume it's
 
        a database primary key and let SQLAlchemy sort things out. Otherwise,
 
        fall back to resolving it using `callback` (if specified); this could
 
        e.g. be a function that looks up instances by name (though that won't
 
        work if the name begins with a digit). Otherwise, raise Exception.
 
        """
 

	
 
        if value is None:
 
            return None
 
        if isinstance(value, cls):
 
            return value
 
        if isinstance(value, (int, long)) or safe_str(value).isdigit():
 
            return cls.get(value)
 
        if callback is not None:
 
            return callback(value)
 

	
 
        raise Exception(
 
            'given object must be int, long or Instance of %s '
 
            'got %s, no callback provided' % (cls, type(value))
 
        )
 

	
 
    @classmethod
 
    def get_or_404(cls, id_):
 
        try:
 
            id_ = int(id_)
 
        except (TypeError, ValueError):
 
            raise HTTPNotFound
 

	
 
        res = cls.query().get(id_)
 
        if res is None:
 
            raise HTTPNotFound
 
        return res
 

	
 
    @classmethod
 
    def delete(cls, id_):
 
        obj = cls.query().get(id_)
 
        Session().delete(obj)
 

	
 
    def __repr__(self):
 
        if hasattr(self, '__unicode__'):
 
            # python repr needs to return str
 
            try:
 
                return safe_str(self.__unicode__())
 
            except UnicodeDecodeError:
 
                pass
 
        return '<DB:%s>' % (self.__class__.__name__)
 

	
 

	
 
_table_args_default_dict = {'extend_existing': True,
 
                            'mysql_engine': 'InnoDB',
 
                            'mysql_charset': 'utf8',
 
                            'sqlite_autoincrement': True,
 
                           }
 

	
 
class Setting(Base, BaseDbModel):
 
    __tablename__ = 'settings'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    SETTINGS_TYPES = {
 
        'str': safe_str,
 
        'int': safe_int,
 
        'unicode': safe_unicode,
 
        'bool': str2bool,
 
        'list': functools.partial(aslist, sep=',')
 
    }
 
    DEFAULT_UPDATE_URL = ''
 

	
 
    app_settings_id = Column(Integer(), primary_key=True)
 
    app_settings_name = Column(String(255), nullable=False, unique=True)
 
    _app_settings_value = Column("app_settings_value", Unicode(4096), nullable=False)
 
    _app_settings_type = Column("app_settings_type", String(255), nullable=True) # FIXME: not nullable?
 

	
 
    def __init__(self, key='', val='', type='unicode'):
 
        self.app_settings_name = key
 
        self.app_settings_value = val
 
        self.app_settings_type = type
 

	
 
    @validates('_app_settings_value')
 
    def validate_settings_value(self, key, val):
 
        assert type(val) == unicode
 
        return val
 

	
 
    @hybrid_property
 
    def app_settings_value(self):
 
        v = self._app_settings_value
 
        _type = self.app_settings_type
 
        converter = self.SETTINGS_TYPES.get(_type) or self.SETTINGS_TYPES['unicode']
 
        return converter(v)
 

	
 
    @app_settings_value.setter
 
    def app_settings_value(self, val):
 
        """
 
        Setter that will always make sure we use unicode in app_settings_value
 

	
 
        :param val:
 
        """
 
        self._app_settings_value = safe_unicode(val)
 

	
 
    @hybrid_property
 
    def app_settings_type(self):
 
        return self._app_settings_type
 

	
 
    @app_settings_type.setter
 
    def app_settings_type(self, val):
 
        if val not in self.SETTINGS_TYPES:
 
            raise Exception('type must be one of %s got %s'
 
                            % (self.SETTINGS_TYPES.keys(), val))
 
        self._app_settings_type = val
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s[%s]')>" % (
 
            self.__class__.__name__,
 
            self.app_settings_name, self.app_settings_value, self.app_settings_type
 
        )
 

	
 
    @classmethod
 
    def get_by_name(cls, key):
 
        return cls.query() \
 
            .filter(cls.app_settings_name == key).scalar()
 

	
 
    @classmethod
 
    def get_by_name_or_create(cls, key, val='', type='unicode'):
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            res = cls(key, val, type)
 
        return res
 

	
 
    @classmethod
 
    def create_or_update(cls, key, val=Optional(''), type=Optional('unicode')):
 
        """
 
        Creates or updates Kallithea setting. If updates are triggered, it will only
 
        update parameters that are explicitly set. Optional instance will be skipped.
 

	
 
        :param key:
 
        :param val:
 
        :param type:
 
        :return:
 
        """
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            val = Optional.extract(val)
 
            type = Optional.extract(type)
 
            res = cls(key, val, type)
 
            Session().add(res)
 
        else:
 
            res.app_settings_name = key
 
            if not isinstance(val, Optional):
 
                # update if set
 
                res.app_settings_value = val
 
            if not isinstance(type, Optional):
 
                # update if set
 
                res.app_settings_type = type
 
        return res
 

	
 
    @classmethod
 
    def get_app_settings(cls, cache=False):
 

	
 
        ret = cls.query()
 

	
 
        if cache:
 
            ret = ret.options(FromCache("sql_cache_short", "get_hg_settings"))
 

	
 
        if ret is None:
 
            raise Exception('Could not get application settings !')
 
        settings = {}
 
        for each in ret:
 
            settings[each.app_settings_name] = \
 
                each.app_settings_value
 

	
 
        return settings
 

	
 
    @classmethod
 
    def get_auth_settings(cls, cache=False):
 
        ret = cls.query() \
 
                .filter(cls.app_settings_name.startswith('auth_')).all()
 
        fd = {}
 
        for row in ret:
 
            fd[row.app_settings_name] = row.app_settings_value
 
        return fd
 

	
 
    @classmethod
 
    def get_default_repo_settings(cls, cache=False, strip_prefix=False):
 
        ret = cls.query() \
 
                .filter(cls.app_settings_name.startswith('default_')).all()
 
        fd = {}
 
        for row in ret:
 
            key = row.app_settings_name
 
            if strip_prefix:
 
                key = remove_prefix(key, prefix='default_')
 
            fd.update({key: row.app_settings_value})
 

	
 
        return fd
 

	
 
    @classmethod
 
    def get_server_info(cls):
 
        import pkg_resources
 
        import platform
 
        import kallithea
 
        from kallithea.lib.utils import check_git_version
 
        mods = [(p.project_name, p.version) for p in pkg_resources.working_set]
 
        info = {
 
            'modules': sorted(mods, key=lambda k: k[0].lower()),
 
            'py_version': platform.python_version(),
 
            'platform': safe_unicode(platform.platform()),
 
            'kallithea_version': kallithea.__version__,
 
            'git_version': safe_unicode(check_git_version()),
 
            'git_path': kallithea.CONFIG.get('git_path')
 
        }
 
        return info
 

	
 

	
 
class Ui(Base, BaseDbModel):
 
    __tablename__ = 'ui'
 
    __table_args__ = (
 
        # FIXME: ui_key as key is wrong and should be removed when the corresponding
 
        # Ui.get_by_key has been replaced by the composite key
 
        UniqueConstraint('ui_key'),
 
        UniqueConstraint('ui_section', 'ui_key'),
 
        _table_args_default_dict,
 
    )
 

	
 
    HOOK_UPDATE = 'changegroup.update'
 
    HOOK_REPO_SIZE = 'changegroup.repo_size'
 
    HOOK_PUSH_LOG = 'changegroup.push_logger'
 
    HOOK_PUSH_LOCK = 'prechangegroup.push_lock_handling'
 
    HOOK_PULL_LOG = 'outgoing.pull_logger'
 
    HOOK_PULL_LOCK = 'preoutgoing.pull_lock_handling'
 

	
 
    ui_id = Column(Integer(), primary_key=True)
 
    ui_section = Column(String(255), nullable=False)
 
    ui_key = Column(String(255), nullable=False)
 
    ui_value = Column(String(255), nullable=True) # FIXME: not nullable?
 
    ui_active = Column(Boolean(), nullable=False, default=True)
 

	
 
    @classmethod
 
    def get_by_key(cls, section, key):
 
        """ Return specified Ui object, or None if not found. """
 
        return cls.query().filter_by(ui_section=section, ui_key=key).scalar()
 

	
 
    @classmethod
 
    def get_or_create(cls, section, key):
 
        """ Return specified Ui object, creating it if necessary. """
 
        setting = cls.get_by_key(section, key)
 
        if setting is None:
 
            setting = cls(ui_section=section, ui_key=key)
 
            Session().add(setting)
 
        return setting
 

	
 
    @classmethod
 
    def get_builtin_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE,
 
                                     cls.HOOK_PUSH_LOG, cls.HOOK_PUSH_LOCK,
 
                                     cls.HOOK_PULL_LOG, cls.HOOK_PULL_LOCK]))
 
        q = q.filter(cls.ui_section == 'hooks')
 
        return q.all()
 

	
 
    @classmethod
 
    def get_custom_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE,
 
                                      cls.HOOK_PUSH_LOG, cls.HOOK_PUSH_LOCK,
 
                                      cls.HOOK_PULL_LOG, cls.HOOK_PULL_LOCK]))
 
        q = q.filter(cls.ui_section == 'hooks')
 
        return q.all()
 

	
 
    @classmethod
 
    def get_repos_location(cls):
 
        return cls.get_by_key('paths', '/').ui_value
 

	
 
    @classmethod
 
    def create_or_update_hook(cls, key, val):
 
        new_ui = cls.get_or_create('hooks', key)
 
        new_ui.ui_active = True
 
        new_ui.ui_value = val
 

	
 
    def __repr__(self):
 
        return '<%s[%s]%s=>%s]>' % (self.__class__.__name__, self.ui_section,
 
                                    self.ui_key, self.ui_value)
 

	
 

	
 
class User(Base, BaseDbModel):
 
    __tablename__ = 'users'
 
    __table_args__ = (
 
        Index('u_username_idx', 'username'),
 
        Index('u_email_idx', 'email'),
 
        _table_args_default_dict,
 
    )
 

	
 
    DEFAULT_USER = 'default'
 
    DEFAULT_GRAVATAR_URL = 'https://secure.gravatar.com/avatar/{md5email}?d=identicon&s={size}'
 
    # The name of the default auth type in extern_type, 'internal' lives in auth_internal.py
 
    DEFAULT_AUTH_TYPE = 'internal'
 

	
 
    user_id = Column(Integer(), primary_key=True)
 
    username = Column(String(255), nullable=False, unique=True)
 
    password = Column(String(255), nullable=False)
 
    active = Column(Boolean(), nullable=False, default=True)
 
    admin = Column(Boolean(), nullable=False, default=False)
 
    name = Column("firstname", Unicode(255), nullable=False)
 
    lastname = Column(Unicode(255), nullable=False)
 
    _email = Column("email", String(255), nullable=True, unique=True) # FIXME: not nullable?
 
    last_login = Column(DateTime(timezone=False), nullable=True)
 
    extern_type = Column(String(255), nullable=True) # FIXME: not nullable?
 
    extern_name = Column(String(255), nullable=True) # FIXME: not nullable?
 
    api_key = Column(String(255), nullable=False)
 
    inherit_default_permissions = Column(Boolean(), nullable=False, default=True)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _user_data = Column("user_data", LargeBinary(), nullable=True)  # JSON data # FIXME: not nullable?
 

	
 
    user_log = relationship('UserLog')
 
    user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
 

	
 
    repositories = relationship('Repository')
 
    repo_groups = relationship('RepoGroup')
 
    user_groups = relationship('UserGroup')
 
    user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
 
    followings = relationship('UserFollowing', primaryjoin='UserFollowing.user_id==User.user_id', cascade='all')
 

	
 
    repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
 
    repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
 

	
 
    group_member = relationship('UserGroupMember', cascade='all')
 

	
 
    notifications = relationship('UserNotification', cascade='all')
 
    # notifications assigned to this user
 
    user_created_notifications = relationship('Notification', cascade='all')
 
    # comments created by this user
 
    user_comments = relationship('ChangesetComment', cascade='all')
 
    # extra emails for this user
 
    user_emails = relationship('UserEmailMap', cascade='all')
 
    # extra API keys
 
    user_api_keys = relationship('UserApiKeys', cascade='all')
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 
    @property
 
    def firstname(self):
 
        # alias for future
 
        return self.name
 

	
 
    @property
 
    def emails(self):
 
        other = UserEmailMap.query().filter(UserEmailMap.user == self).all()
 
        return [self.email] + [x.email for x in other]
 

	
 
    @property
 
    def api_keys(self):
 
        other = UserApiKeys.query().filter(UserApiKeys.user == self).all()
 
        return [self.api_key] + [x.api_key for x in other]
 

	
 
    @property
 
    def ip_addresses(self):
 
        ret = UserIpMap.query().filter(UserIpMap.user == self).all()
 
        return [x.ip_addr for x in ret]
 

	
 
    @property
 
    def full_name(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def full_name_or_username(self):
 
        """
 
        Show full name.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s' % (self.firstname, self.lastname)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_name_and_username(self):
 
        """
 
        Show full name and username as 'Firstname Lastname (username)'.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s (%s)' % (self.firstname, self.lastname, self.username)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_contact(self):
 
        return '%s %s <%s>' % (self.firstname, self.lastname, self.email)
 

	
 
    @property
 
    def short_contact(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def is_admin(self):
 
        return self.admin
 

	
 
    @hybrid_property
 
    def is_default_user(self):
 
        return self.username == User.DEFAULT_USER
 

	
 
    @property
 
    def AuthUser(self):
 
        """
 
        Returns instance of AuthUser for this user
 
        """
 
        from kallithea.lib.auth import AuthUser
 
        return AuthUser(dbuser=self)
 

	
 
    @hybrid_property
 
    def user_data(self):
 
        if not self._user_data:
 
            return {}
 

	
 
        try:
 
            return json.loads(self._user_data)
 
        except TypeError:
 
            return {}
 

	
 
    @user_data.setter
 
    def user_data(self, val):
 
        try:
 
            self._user_data = json.dumps(val)
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
 
                                      self.user_id, self.username)
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(User, cls).guess_instance(value, User.get_by_username)
 

	
 
    @classmethod
 
    def get_or_404(cls, id_, allow_default=True):
 
        '''
 
        Overridden version of BaseDbModel.get_or_404, with an extra check on
 
        the default user.
 
        '''
 
        user = super(User, cls).get_or_404(id_)
 
        if not allow_default and user.is_default_user:
 
            raise DefaultUserException()
 
        return user
 

	
 
    @classmethod
 
    def get_by_username_or_email(cls, username_or_email, case_insensitive=False, cache=False):
 
        """
 
        For anything that looks like an email address, look up by the email address (matching
 
        case insensitively).
 
        For anything else, try to look up by the user name.
 

	
 
        This assumes no normal username can have '@' symbol.
 
        """
 
        if '@' in username_or_email:
 
            return User.get_by_email(username_or_email, cache=cache)
 
        else:
 
            return User.get_by_username(username_or_email, case_insensitive=case_insensitive, cache=cache)
 

	
 
    @classmethod
 
    def get_by_username(cls, username, case_insensitive=False, cache=False):
 
        if case_insensitive:
 
            q = cls.query().filter(func.lower(cls.username) == func.lower(username))
 
        else:
 
            q = cls.query().filter(cls.username == username)
 

	
 
        if cache:
 
            q = q.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_user_%s" % _hash_key(username)
 
                          )
 
            )
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get_by_api_key(cls, api_key, cache=False, fallback=True):
 
        if len(api_key) != 40 or not api_key.isalnum():
 
            return None
 

	
 
        q = cls.query().filter(cls.api_key == api_key)
 

	
 
        if cache:
 
            q = q.options(FromCache("sql_cache_short",
 
                                    "get_api_key_%s" % api_key))
 
        res = q.scalar()
 

	
 
        if fallback and not res:
 
            # fallback to additional keys
 
            _res = UserApiKeys.query().filter_by(api_key=api_key, is_expired=False).first()
 
            if _res:
 
                res = _res.user
 
        return res
 

	
 
    @classmethod
 
    def get_by_email(cls, email, cache=False):
 
        q = cls.query().filter(func.lower(cls.email) == func.lower(email))
 

	
 
        if cache:
 
            q = q.options(FromCache("sql_cache_short",
 
                                    "get_email_key_%s" % email))
 

	
 
        ret = q.scalar()
 
        if ret is None:
 
            q = UserEmailMap.query()
 
            # try fetching in alternate email map
 
            q = q.filter(func.lower(UserEmailMap.email) == func.lower(email))
 
            q = q.options(joinedload(UserEmailMap.user))
 
            if cache:
 
                q = q.options(FromCache("sql_cache_short",
 
                                        "get_email_map_key_%s" % email))
 
            ret = getattr(q.scalar(), 'user', None)
 

	
 
        return ret
 

	
 
    @classmethod
 
    def get_from_cs_author(cls, author):
 
        """
 
        Tries to get User objects out of commit author string
 

	
 
        :param author:
 
        """
 
        from kallithea.lib.helpers import email, author_name
 
        # Valid email in the attribute passed, see if they're in the system
 
        _email = email(author)
 
        if _email:
 
            user = cls.get_by_email(_email)
 
            if user is not None:
 
                return user
 
        # Maybe we can match by username?
 
        _author = author_name(author)
 
        user = cls.get_by_username(_author, case_insensitive=True)
 
        if user is not None:
 
            return user
 

	
 
    def update_lastlogin(self):
 
        """Update user lastlogin"""
 
        self.last_login = datetime.datetime.now()
 
        log.debug('updated user %s lastlogin', self.username)
 

	
 
    @classmethod
 
    def get_first_admin(cls):
 
        user = User.query().filter(User.admin == True).first()
 
        if user is None:
 
            raise Exception('Missing administrative account!')
 
        return user
 

	
 
    @classmethod
 
    def get_default_user(cls, cache=False):
 
        user = User.get_by_username(User.DEFAULT_USER, cache=cache)
 
        if user is None:
 
            raise Exception('Missing default account!')
 
        return user
 

	
 
    def get_api_data(self, details=False):
 
        """
 
        Common function for generating user related data for API
 
        """
 
        user = self
 
        data = dict(
 
            user_id=user.user_id,
 
            username=user.username,
 
            firstname=user.name,
 
            lastname=user.lastname,
 
            email=user.email,
 
            emails=user.emails,
 
            active=user.active,
 
            admin=user.admin,
 
        )
 
        if details:
 
            data.update(dict(
 
                extern_type=user.extern_type,
 
                extern_name=user.extern_name,
 
                api_key=user.api_key,
 
                api_keys=user.api_keys,
 
                last_login=user.last_login,
 
                ip_addresses=user.ip_addresses
 
                ))
 
        return data
 

	
 
    def __json__(self):
 
        data = dict(
 
            full_name=self.full_name,
 
            full_name_or_username=self.full_name_or_username,
 
            short_contact=self.short_contact,
 
            full_contact=self.full_contact
 
        )
 
        data.update(self.get_api_data())
 
        return data
 

	
 

	
 
class UserApiKeys(Base, BaseDbModel):
 
    __tablename__ = 'user_api_keys'
 
    __table_args__ = (
 
        Index('uak_api_key_idx', 'api_key'),
 
        Index('uak_api_key_expires_idx', 'api_key', 'expires'),
 
        _table_args_default_dict,
 
    )
 
    __mapper_args__ = {}
 

	
 
    user_api_key_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    api_key = Column(String(255), nullable=False, unique=True)
 
    description = Column(UnicodeText(), nullable=False)
 
    expires = Column(Float(53), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    user = relationship('User')
 

	
 
    @hybrid_property
 
    def is_expired(self):
 
        return (self.expires != -1) & (time.time() > self.expires)
 

	
 

	
 
class UserEmailMap(Base, BaseDbModel):
 
    __tablename__ = 'user_email_map'
 
    __table_args__ = (
 
        Index('uem_email_idx', 'email'),
 
        _table_args_default_dict,
 
    )
 
    __mapper_args__ = {}
 

	
 
    email_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    _email = Column("email", String(255), nullable=False, unique=True)
 
    user = relationship('User')
 

	
 
    @validates('_email')
 
    def validate_email(self, key, email):
 
        # check if this email is not main one
 
        main_email = Session().query(User).filter(User.email == email).scalar()
 
        if main_email is not None:
 
            raise AttributeError('email %s is present is user table' % email)
 
        return email
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 

	
 
class UserIpMap(Base, BaseDbModel):
 
    __tablename__ = 'user_ip_map'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'ip_addr'),
 
        _table_args_default_dict,
 
    )
 
    __mapper_args__ = {}
 

	
 
    ip_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    ip_addr = Column(String(255), nullable=False)
 
    active = Column(Boolean(), nullable=False, default=True)
 
    user = relationship('User')
 

	
 
    @classmethod
 
    def _get_ip_range(cls, ip_addr):
 
        from kallithea.lib import ipaddr
 
        net = ipaddr.IPNetwork(address=ip_addr)
 
        return [str(net.network), str(net.broadcast)]
 

	
 
    def __json__(self):
 
        return dict(
 
          ip_addr=self.ip_addr,
 
          ip_range=self._get_ip_range(self.ip_addr)
 
        )
 

	
 
    def __unicode__(self):
 
        return u"<%s('user_id:%s=>%s')>" % (self.__class__.__name__,
 
                                            self.user_id, self.ip_addr)
 

	
 

	
 
class UserLog(Base, BaseDbModel):
 
    __tablename__ = 'user_logs'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    user_log_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=True)
 
    username = Column(String(255), nullable=False)
 
    repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=True)
 
    repository_name = Column(Unicode(255), nullable=False)
 
    user_ip = Column(String(255), nullable=True)
 
    action = Column(UnicodeText(), nullable=False)
 
    action_date = Column(DateTime(timezone=False), nullable=False)
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
 
                                      self.repository_name,
 
                                      self.action)
 

	
 
    @property
 
    def action_as_day(self):
 
        return datetime.date(*self.action_date.timetuple()[:3])
 

	
 
    user = relationship('User')
 
    repository = relationship('Repository', cascade='')
 

	
 

	
 
class UserGroup(Base, BaseDbModel):
 
    __tablename__ = 'users_groups'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    users_group_id = Column(Integer(), primary_key=True)
 
    users_group_name = Column(Unicode(255), nullable=False, unique=True)
 
    user_group_description = Column(Unicode(10000), nullable=True) # FIXME: not nullable?
 
    users_group_active = Column(Boolean(), nullable=False)
 
    inherit_default_permissions = Column("users_group_inherit_default_permissions", Boolean(), nullable=False, default=True)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _group_data = Column("group_data", LargeBinary(), nullable=True)  # JSON data # FIXME: not nullable?
 

	
 
    members = relationship('UserGroupMember', cascade="all, delete-orphan")
 
    users_group_to_perm = relationship('UserGroupToPerm', cascade='all')
 
    users_group_repo_to_perm = relationship('UserGroupRepoToPerm', cascade='all')
 
    users_group_repo_group_to_perm = relationship('UserGroupRepoGroupToPerm', cascade='all')
 
    user_user_group_to_perm = relationship('UserUserGroupToPerm ', cascade='all')
 
    user_group_user_group_to_perm = relationship('UserGroupUserGroupToPerm ', primaryjoin="UserGroupUserGroupToPerm.target_user_group_id==UserGroup.users_group_id", cascade='all')
 

	
 
    owner = relationship('User')
 

	
 
    @hybrid_property
 
    def group_data(self):
 
        if not self._group_data:
 
            return {}
 

	
 
        try:
 
            return json.loads(self._group_data)
 
        except TypeError:
 
            return {}
 

	
 
    @group_data.setter
 
    def group_data(self, val):
 
        try:
 
            self._group_data = json.dumps(val)
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
 
                                      self.users_group_id,
 
                                      self.users_group_name)
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(UserGroup, cls).guess_instance(value, UserGroup.get_by_group_name)
 

	
 
    @classmethod
 
    def get_by_group_name(cls, group_name, cache=False,
 
                          case_insensitive=False):
 
        if case_insensitive:
 
            q = cls.query().filter(func.lower(cls.users_group_name) == func.lower(group_name))
 
        else:
 
            q = cls.query().filter(cls.users_group_name == group_name)
 
        if cache:
 
            q = q.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_group_%s" % _hash_key(group_name)
 
                          )
 
            )
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get(cls, user_group_id, cache=False):
 
        user_group = cls.query()
 
        if cache:
 
            user_group = user_group.options(FromCache("sql_cache_short",
 
                                    "get_users_group_%s" % user_group_id))
 
        return user_group.get(user_group_id)
 

	
 
    def get_api_data(self, with_members=True):
 
        user_group = self
 

	
 
        data = dict(
 
            users_group_id=user_group.users_group_id,
 
            group_name=user_group.users_group_name,
 
            group_description=user_group.user_group_description,
 
            active=user_group.users_group_active,
 
            owner=user_group.owner.username,
 
        )
 
        if with_members:
 
            data['members'] = [
 
                ugm.user.get_api_data()
 
                for ugm in user_group.members
 
            ]
 

	
 
        return data
 

	
 

	
 
class UserGroupMember(Base, BaseDbModel):
 
    __tablename__ = 'users_groups_members'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    users_group_member_id = Column(Integer(), primary_key=True)
 
    users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    users_group = relationship('UserGroup')
 

	
 
    def __init__(self, gr_id='', u_id=''):
 
        self.users_group_id = gr_id
 
        self.user_id = u_id
 

	
 

	
 
class RepositoryField(Base, BaseDbModel):
 
    __tablename__ = 'repositories_fields'
 
    __table_args__ = (
 
        UniqueConstraint('repository_id', 'field_key'),  # no-multi field
 
        _table_args_default_dict,
 
    )
 

	
 
    PREFIX = 'ex_'  # prefix used in form to not conflict with already existing fields
 

	
 
    repo_field_id = Column(Integer(), primary_key=True)
 
    repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    field_key = Column(String(250), nullable=False)
 
    field_label = Column(String(1024), nullable=False)
 
    field_value = Column(String(10000), nullable=False)
 
    field_desc = Column(String(1024), nullable=False)
 
    field_type = Column(String(255), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    repository = relationship('Repository')
 

	
 
    @property
 
    def field_key_prefixed(self):
 
        return 'ex_%s' % self.field_key
 

	
 
    @classmethod
 
    def un_prefix_key(cls, key):
 
        if key.startswith(cls.PREFIX):
 
            return key[len(cls.PREFIX):]
 
        return key
 

	
 
    @classmethod
 
    def get_by_key_name(cls, key, repo):
 
        row = cls.query() \
 
                .filter(cls.repository == repo) \
 
                .filter(cls.field_key == key).scalar()
 
        return row
 

	
 

	
 
class Repository(Base, BaseDbModel):
 
    __tablename__ = 'repositories'
 
    __table_args__ = (
 
        Index('r_repo_name_idx', 'repo_name'),
 
        _table_args_default_dict,
 
    )
 

	
 
    DEFAULT_CLONE_URI = '{scheme}://{user}@{netloc}/{repo}'
 
    DEFAULT_CLONE_URI_ID = '{scheme}://{user}@{netloc}/_{repoid}'
 

	
 
    STATE_CREATED = u'repo_state_created'
 
    STATE_PENDING = u'repo_state_pending'
 
    STATE_ERROR = u'repo_state_error'
 

	
 
    repo_id = Column(Integer(), primary_key=True)
 
    repo_name = Column(Unicode(255), nullable=False, unique=True)
 
    repo_state = Column(String(255), nullable=False)
 

	
 
    clone_uri = Column(String(255), nullable=True) # FIXME: not nullable?
 
    repo_type = Column(String(255), nullable=False)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    private = Column(Boolean(), nullable=False)
 
    enable_statistics = Column("statistics", Boolean(), nullable=False, default=True)
 
    enable_downloads = Column("downloads", Boolean(), nullable=False, default=True)
 
    description = Column(Unicode(10000), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    updated_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _landing_revision = Column("landing_revision", String(255), nullable=False)
 
    enable_locking = Column(Boolean(), nullable=False, default=False)
 
    _locked = Column("locked", String(255), nullable=True) # FIXME: not nullable?
 
    _changeset_cache = Column("changeset_cache", LargeBinary(), nullable=True) # JSON data # FIXME: not nullable?
 

	
 
    fork_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=True)
 
    group_id = Column(Integer(), ForeignKey('groups.group_id'), nullable=True)
 

	
 
    owner = relationship('User')
 
    fork = relationship('Repository', remote_side=repo_id)
 
    group = relationship('RepoGroup')
 
    repo_to_perm = relationship('UserRepoToPerm', cascade='all', order_by='UserRepoToPerm.repo_to_perm_id')
 
    users_group_to_perm = relationship('UserGroupRepoToPerm', cascade='all')
 
    stats = relationship('Statistics', cascade='all', uselist=False)
 

	
 
    followers = relationship('UserFollowing',
 
                             primaryjoin='UserFollowing.follows_repository_id==Repository.repo_id',
 
                             cascade='all')
 
    extra_fields = relationship('RepositoryField',
 
                                cascade="all, delete-orphan")
 

	
 
    logs = relationship('UserLog')
 
    comments = relationship('ChangesetComment', cascade="all, delete-orphan")
 

	
 
    pull_requests_org = relationship('PullRequest',
 
                    primaryjoin='PullRequest.org_repo_id==Repository.repo_id',
 
                    cascade="all, delete-orphan")
 

	
 
    pull_requests_other = relationship('PullRequest',
 
                    primaryjoin='PullRequest.other_repo_id==Repository.repo_id',
 
                    cascade="all, delete-orphan")
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s')>" % (self.__class__.__name__, self.repo_id,
 
                                   safe_unicode(self.repo_name))
 

	
 
    @hybrid_property
 
    def landing_rev(self):
 
        # always should return [rev_type, rev]
 
        if self._landing_revision:
 
            _rev_info = self._landing_revision.split(':')
 
            if len(_rev_info) < 2:
 
                _rev_info.insert(0, 'rev')
 
            return [_rev_info[0], _rev_info[1]]
 
        return [None, None]
 

	
 
    @landing_rev.setter
 
    def landing_rev(self, val):
 
        if ':' not in val:
 
            raise ValueError('value must be delimited with `:` and consist '
 
                             'of <rev_type>:<rev>, got %s instead' % val)
 
        self._landing_revision = val
 

	
 
    @hybrid_property
 
    def locked(self):
 
        # always should return [user_id, timelocked]
 
        if self._locked:
 
            _lock_info = self._locked.split(':')
 
            return int(_lock_info[0]), _lock_info[1]
 
        return [None, None]
 

	
 
    @locked.setter
 
    def locked(self, val):
 
        if val and isinstance(val, (list, tuple)):
 
            self._locked = ':'.join(map(str, val))
 
        else:
 
            self._locked = None
 

	
 
    @hybrid_property
 
    def changeset_cache(self):
 
        try:
 
            cs_cache = json.loads(self._changeset_cache) # might raise on bad data
 
            cs_cache['raw_id'] # verify data, raise exception on error
 
            return cs_cache
 
        except (TypeError, KeyError, ValueError):
 
            return EmptyChangeset().__json__()
 

	
 
    @changeset_cache.setter
 
    def changeset_cache(self, val):
 
        try:
 
            self._changeset_cache = json.dumps(val)
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    @classmethod
 
    def query(cls, sorted=False):
 
        """Add Repository-specific helpers for common query constructs.
 

	
 
        sorted: if True, apply the default ordering (name, case insensitive).
 
        """
 
        q = super(Repository, cls).query()
 

	
 
        if sorted:
 
            q = q.order_by(func.lower(Repository.repo_name))
 

	
 
        return q
 

	
 
    @classmethod
 
    def url_sep(cls):
 
        return URL_SEP
 

	
 
    @classmethod
 
    def normalize_repo_name(cls, repo_name):
 
        """
 
        Normalizes os specific repo_name to the format internally stored inside
 
        database using URL_SEP
 

	
 
        :param cls:
 
        :param repo_name:
 
        """
 
        return cls.url_sep().join(repo_name.split(os.sep))
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(Repository, cls).guess_instance(value, Repository.get_by_repo_name)
 

	
 
    @classmethod
 
    def get_by_repo_name(cls, repo_name, case_insensitive=False):
 
        """Get the repo, defaulting to database case sensitivity.
 
        case_insensitive will be slower and should only be specified if necessary."""
 
        if case_insensitive:
 
            q = Session().query(cls).filter(func.lower(cls.repo_name) == func.lower(repo_name))
 
        else:
 
            q = Session().query(cls).filter(cls.repo_name == repo_name)
 
        q = q.options(joinedload(Repository.fork)) \
 
                .options(joinedload(Repository.owner)) \
 
                .options(joinedload(Repository.group))
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get_by_full_path(cls, repo_full_path):
 
        base_full_path = os.path.realpath(cls.base_path())
 
        repo_full_path = os.path.realpath(repo_full_path)
 
        assert repo_full_path.startswith(base_full_path + os.path.sep)
 
        repo_name = repo_full_path[len(base_full_path) + 1:]
 
        repo_name = cls.normalize_repo_name(repo_name)
 
        return cls.get_by_repo_name(repo_name.strip(URL_SEP))
 

	
 
    @classmethod
 
    def get_repo_forks(cls, repo_id):
 
        return cls.query().filter(Repository.fork_id == repo_id)
 

	
 
    @classmethod
 
    def base_path(cls):
 
        """
 
        Returns base path where all repos are stored
 

	
 
        :param cls:
 
        """
 
        q = Session().query(Ui) \
 
            .filter(Ui.ui_key == cls.url_sep())
 
        q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
 
        return q.one().ui_value
 

	
 
    @property
 
    def forks(self):
 
        """
 
        Return forks of this repo
 
        """
 
        return Repository.get_repo_forks(self.repo_id)
 

	
 
    @property
 
    def parent(self):
 
        """
 
        Returns fork parent
 
        """
 
        return self.fork
 

	
 
    @property
 
    def just_name(self):
 
        return self.repo_name.split(Repository.url_sep())[-1]
 

	
 
    @property
 
    def groups_with_parents(self):
 
        groups = []
 
        group = self.group
 
        while group is not None:
 
            groups.append(group)
 
            group = group.parent_group
 
            assert group not in groups, group # avoid recursion on bad db content
 
        groups.reverse()
 
        return groups
 

	
 
    @LazyProperty
 
    def repo_path(self):
 
        """
 
        Returns base full path for that repository means where it actually
 
        exists on a filesystem
 
        """
 
        q = Session().query(Ui).filter(Ui.ui_key ==
 
                                              Repository.url_sep())
 
        q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
 
        return q.one().ui_value
 

	
 
    @property
 
    def repo_full_path(self):
 
        p = [self.repo_path]
 
        # we need to split the name by / since this is how we store the
 
        # names in the database, but that eventually needs to be converted
 
        # into a valid system path
 
        p += self.repo_name.split(Repository.url_sep())
 
        return os.path.join(*map(safe_unicode, p))
 

	
 
    @property
 
    def cache_keys(self):
 
        """
 
        Returns associated cache keys for that repo
 
        """
 
        return CacheInvalidation.query() \
 
            .filter(CacheInvalidation.cache_args == self.repo_name) \
 
            .order_by(CacheInvalidation.cache_key) \
 
            .all()
 

	
 
    def get_new_name(self, repo_name):
 
        """
 
        returns new full repository name based on assigned group and new new
 

	
 
        :param group_name:
 
        """
 
@@ -1713,944 +1710,879 @@ class Permission(Base, BaseDbModel):
 
    PERMS = [
 
        ('hg.admin', _('Kallithea Administrator')),
 

	
 
        ('repository.none', _('Default user has no access to new repositories')),
 
        ('repository.read', _('Default user has read access to new repositories')),
 
        ('repository.write', _('Default user has write access to new repositories')),
 
        ('repository.admin', _('Default user has admin access to new repositories')),
 

	
 
        ('group.none', _('Default user has no access to new repository groups')),
 
        ('group.read', _('Default user has read access to new repository groups')),
 
        ('group.write', _('Default user has write access to new repository groups')),
 
        ('group.admin', _('Default user has admin access to new repository groups')),
 

	
 
        ('usergroup.none', _('Default user has no access to new user groups')),
 
        ('usergroup.read', _('Default user has read access to new user groups')),
 
        ('usergroup.write', _('Default user has write access to new user groups')),
 
        ('usergroup.admin', _('Default user has admin access to new user groups')),
 

	
 
        ('hg.repogroup.create.false', _('Only admins can create repository groups')),
 
        ('hg.repogroup.create.true', _('Non-admins can create repository groups')),
 

	
 
        ('hg.usergroup.create.false', _('Only admins can create user groups')),
 
        ('hg.usergroup.create.true', _('Non-admins can create user groups')),
 

	
 
        ('hg.create.none', _('Only admins can create top level repositories')),
 
        ('hg.create.repository', _('Non-admins can create top level repositories')),
 

	
 
        ('hg.create.write_on_repogroup.true', _('Repository creation enabled with write permission to a repository group')),
 
        ('hg.create.write_on_repogroup.false', _('Repository creation disabled with write permission to a repository group')),
 

	
 
        ('hg.fork.none', _('Only admins can fork repositories')),
 
        ('hg.fork.repository', _('Non-admins can fork repositories')),
 

	
 
        ('hg.register.none', _('Registration disabled')),
 
        ('hg.register.manual_activate', _('User registration with manual account activation')),
 
        ('hg.register.auto_activate', _('User registration with automatic account activation')),
 

	
 
        ('hg.extern_activate.manual', _('Manual activation of external account')),
 
        ('hg.extern_activate.auto', _('Automatic activation of external account')),
 
    ]
 

	
 
    # definition of system default permissions for DEFAULT user
 
    DEFAULT_USER_PERMISSIONS = [
 
        'repository.read',
 
        'group.read',
 
        'usergroup.read',
 
        'hg.create.repository',
 
        'hg.create.write_on_repogroup.true',
 
        'hg.fork.repository',
 
        'hg.register.manual_activate',
 
        'hg.extern_activate.auto',
 
    ]
 

	
 
    # defines which permissions are more important higher the more important
 
    # Weight defines which permissions are more important.
 
    # The higher number the more important.
 
    PERM_WEIGHTS = {
 
        'repository.none': 0,
 
        'repository.read': 1,
 
        'repository.write': 3,
 
        'repository.admin': 4,
 

	
 
        'group.none': 0,
 
        'group.read': 1,
 
        'group.write': 3,
 
        'group.admin': 4,
 

	
 
        'usergroup.none': 0,
 
        'usergroup.read': 1,
 
        'usergroup.write': 3,
 
        'usergroup.admin': 4,
 

	
 
        'hg.repogroup.create.false': 0,
 
        'hg.repogroup.create.true': 1,
 

	
 
        'hg.usergroup.create.false': 0,
 
        'hg.usergroup.create.true': 1,
 

	
 
        'hg.fork.none': 0,
 
        'hg.fork.repository': 1,
 

	
 
        'hg.create.none': 0,
 
        'hg.create.repository': 1
 
    }
 

	
 
    permission_id = Column(Integer(), primary_key=True)
 
    permission_name = Column(String(255), nullable=False)
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s')>" % (
 
            self.__class__.__name__, self.permission_id, self.permission_name
 
        )
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(Permission, cls).guess_instance(value, Permission.get_by_key)
 

	
 
    @classmethod
 
    def get_by_key(cls, key):
 
        return cls.query().filter(cls.permission_name == key).scalar()
 

	
 
    @classmethod
 
    def get_default_perms(cls, default_user_id):
 
        q = Session().query(UserRepoToPerm, Repository, cls) \
 
         .join((Repository, UserRepoToPerm.repository_id == Repository.repo_id)) \
 
         .join((cls, UserRepoToPerm.permission_id == cls.permission_id)) \
 
         .filter(UserRepoToPerm.user_id == default_user_id)
 

	
 
        return q.all()
 

	
 
    @classmethod
 
    def get_default_group_perms(cls, default_user_id):
 
        q = Session().query(UserRepoGroupToPerm, RepoGroup, cls) \
 
         .join((RepoGroup, UserRepoGroupToPerm.group_id == RepoGroup.group_id)) \
 
         .join((cls, UserRepoGroupToPerm.permission_id == cls.permission_id)) \
 
         .filter(UserRepoGroupToPerm.user_id == default_user_id)
 

	
 
        return q.all()
 

	
 
    @classmethod
 
    def get_default_user_group_perms(cls, default_user_id):
 
        q = Session().query(UserUserGroupToPerm, UserGroup, cls) \
 
         .join((UserGroup, UserUserGroupToPerm.user_group_id == UserGroup.users_group_id)) \
 
         .join((cls, UserUserGroupToPerm.permission_id == cls.permission_id)) \
 
         .filter(UserUserGroupToPerm.user_id == default_user_id)
 

	
 
        return q.all()
 

	
 

	
 
class UserRepoToPerm(Base, BaseDbModel):
 
    __tablename__ = 'repo_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'repository_id', 'permission_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    repo_to_perm_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
 
    repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    repository = relationship('Repository')
 
    permission = relationship('Permission')
 

	
 
    @classmethod
 
    def create(cls, user, repository, permission):
 
        n = cls()
 
        n.user = user
 
        n.repository = repository
 
        n.permission = permission
 
        Session().add(n)
 
        return n
 

	
 
    def __unicode__(self):
 
        return u'<%s => %s >' % (self.user, self.repository)
 

	
 

	
 
class UserUserGroupToPerm(Base, BaseDbModel):
 
    __tablename__ = 'user_user_group_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'user_group_id', 'permission_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    user_user_group_to_perm_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
 
    user_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    user_group = relationship('UserGroup')
 
    permission = relationship('Permission')
 

	
 
    @classmethod
 
    def create(cls, user, user_group, permission):
 
        n = cls()
 
        n.user = user
 
        n.user_group = user_group
 
        n.permission = permission
 
        Session().add(n)
 
        return n
 

	
 
    def __unicode__(self):
 
        return u'<%s => %s >' % (self.user, self.user_group)
 

	
 

	
 
class UserToPerm(Base, BaseDbModel):
 
    __tablename__ = 'user_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'permission_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    user_to_perm_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    permission = relationship('Permission')
 

	
 
    def __unicode__(self):
 
        return u'<%s => %s >' % (self.user, self.permission)
 

	
 

	
 
class UserGroupRepoToPerm(Base, BaseDbModel):
 
    __tablename__ = 'users_group_repo_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('repository_id', 'users_group_id', 'permission_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    users_group_to_perm_id = Column(Integer(), primary_key=True)
 
    users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
 
    permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
 
    repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 

	
 
    users_group = relationship('UserGroup')
 
    permission = relationship('Permission')
 
    repository = relationship('Repository')
 

	
 
    @classmethod
 
    def create(cls, users_group, repository, permission):
 
        n = cls()
 
        n.users_group = users_group
 
        n.repository = repository
 
        n.permission = permission
 
        Session().add(n)
 
        return n
 

	
 
    def __unicode__(self):
 
        return u'<UserGroupRepoToPerm:%s => %s >' % (self.users_group, self.repository)
 

	
 

	
 
class UserGroupUserGroupToPerm(Base, BaseDbModel):
 
    __tablename__ = 'user_group_user_group_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('target_user_group_id', 'user_group_id', 'permission_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    user_group_user_group_to_perm_id = Column(Integer(), primary_key=True)
 
    target_user_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
 
    permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
 
    user_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
 

	
 
    target_user_group = relationship('UserGroup', primaryjoin='UserGroupUserGroupToPerm.target_user_group_id==UserGroup.users_group_id')
 
    user_group = relationship('UserGroup', primaryjoin='UserGroupUserGroupToPerm.user_group_id==UserGroup.users_group_id')
 
    permission = relationship('Permission')
 

	
 
    @classmethod
 
    def create(cls, target_user_group, user_group, permission):
 
        n = cls()
 
        n.target_user_group = target_user_group
 
        n.user_group = user_group
 
        n.permission = permission
 
        Session().add(n)
 
        return n
 

	
 
    def __unicode__(self):
 
        return u'<UserGroupUserGroup:%s => %s >' % (self.target_user_group, self.user_group)
 

	
 

	
 
class UserGroupToPerm(Base, BaseDbModel):
 
    __tablename__ = 'users_group_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('users_group_id', 'permission_id',),
 
        _table_args_default_dict,
 
    )
 

	
 
    users_group_to_perm_id = Column(Integer(), primary_key=True)
 
    users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
 
    permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
 

	
 
    users_group = relationship('UserGroup')
 
    permission = relationship('Permission')
 

	
 

	
 
class UserRepoGroupToPerm(Base, BaseDbModel):
 
    __tablename__ = 'user_repo_group_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'group_id', 'permission_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    group_to_perm_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    group_id = Column(Integer(), ForeignKey('groups.group_id'), nullable=False)
 
    permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    group = relationship('RepoGroup')
 
    permission = relationship('Permission')
 

	
 
    @classmethod
 
    def create(cls, user, repository_group, permission):
 
        n = cls()
 
        n.user = user
 
        n.group = repository_group
 
        n.permission = permission
 
        Session().add(n)
 
        return n
 

	
 

	
 
class UserGroupRepoGroupToPerm(Base, BaseDbModel):
 
    __tablename__ = 'users_group_repo_group_to_perm'
 
    __table_args__ = (
 
        UniqueConstraint('users_group_id', 'group_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    users_group_repo_group_to_perm_id = Column(Integer(), primary_key=True)
 
    users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
 
    group_id = Column(Integer(), ForeignKey('groups.group_id'), nullable=False)
 
    permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
 

	
 
    users_group = relationship('UserGroup')
 
    permission = relationship('Permission')
 
    group = relationship('RepoGroup')
 

	
 
    @classmethod
 
    def create(cls, user_group, repository_group, permission):
 
        n = cls()
 
        n.users_group = user_group
 
        n.group = repository_group
 
        n.permission = permission
 
        Session().add(n)
 
        return n
 

	
 

	
 
class Statistics(Base, BaseDbModel):
 
    __tablename__ = 'statistics'
 
    __table_args__ = (
 
         _table_args_default_dict,
 
    )
 

	
 
    stat_id = Column(Integer(), primary_key=True)
 
    repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False, unique=True)
 
    stat_on_revision = Column(Integer(), nullable=False)
 
    commit_activity = Column(LargeBinary(1000000), nullable=False) # JSON data
 
    commit_activity_combined = Column(LargeBinary(), nullable=False) # JSON data
 
    languages = Column(LargeBinary(1000000), nullable=False) # JSON data
 

	
 
    repository = relationship('Repository', single_parent=True)
 

	
 

	
 
class UserFollowing(Base, BaseDbModel):
 
    __tablename__ = 'user_followings'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'follows_repository_id', name='uq_user_followings_user_repo'),
 
        UniqueConstraint('user_id', 'follows_user_id', name='uq_user_followings_user_user'),
 
        _table_args_default_dict,
 
    )
 

	
 
    user_following_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    follows_repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=True)
 
    follows_user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=True)
 
    follows_from = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    user = relationship('User', primaryjoin='User.user_id==UserFollowing.user_id')
 

	
 
    follows_user = relationship('User', primaryjoin='User.user_id==UserFollowing.follows_user_id')
 
    follows_repository = relationship('Repository', order_by=lambda: func.lower(Repository.repo_name))
 

	
 
    @classmethod
 
    def get_repo_followers(cls, repo_id):
 
        return cls.query().filter(cls.follows_repository_id == repo_id)
 

	
 

	
 
class CacheInvalidation(Base, BaseDbModel):
 
    __tablename__ = 'cache_invalidation'
 
    __table_args__ = (
 
        Index('key_idx', 'cache_key'),
 
        _table_args_default_dict,
 
    )
 

	
 
    # cache_id, not used
 
    cache_id = Column(Integer(), primary_key=True)
 
    # cache_key as created by _get_cache_key
 
    cache_key = Column(Unicode(255), nullable=False, unique=True)
 
    # cache_args is a repo_name
 
    cache_args = Column(Unicode(255), nullable=False)
 
    # instance sets cache_active True when it is caching, other instances set
 
    # cache_active to False to indicate that this cache is invalid
 
    cache_active = Column(Boolean(), nullable=False, default=False)
 

	
 
    def __init__(self, cache_key, repo_name=''):
 
        self.cache_key = cache_key
 
        self.cache_args = repo_name
 
        self.cache_active = False
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s[%s]')>" % (
 
            self.__class__.__name__,
 
            self.cache_id, self.cache_key, self.cache_active)
 

	
 
    def _cache_key_partition(self):
 
        prefix, repo_name, suffix = self.cache_key.partition(self.cache_args)
 
        return prefix, repo_name, suffix
 

	
 
    def get_prefix(self):
 
        """
 
        get prefix that might have been used in _get_cache_key to
 
        generate self.cache_key. Only used for informational purposes
 
        in repo_edit.html.
 
        """
 
        # prefix, repo_name, suffix
 
        return self._cache_key_partition()[0]
 

	
 
    def get_suffix(self):
 
        """
 
        get suffix that might have been used in _get_cache_key to
 
        generate self.cache_key. Only used for informational purposes
 
        in repo_edit.html.
 
        """
 
        # prefix, repo_name, suffix
 
        return self._cache_key_partition()[2]
 

	
 
    @classmethod
 
    def clear_cache(cls):
 
        """
 
        Delete all cache keys from database.
 
        Should only be run when all instances are down and all entries thus stale.
 
        """
 
        cls.query().delete()
 
        Session().commit()
 

	
 
    @classmethod
 
    def _get_cache_key(cls, key):
 
        """
 
        Wrapper for generating a unique cache key for this instance and "key".
 
        key must / will start with a repo_name which will be stored in .cache_args .
 
        """
 
        import kallithea
 
        prefix = kallithea.CONFIG.get('instance_id', '')
 
        return "%s%s" % (prefix, key)
 

	
 
    @classmethod
 
    def set_invalidate(cls, repo_name):
 
        """
 
        Mark all caches of a repo as invalid in the database.
 
        """
 
        inv_objs = Session().query(cls).filter(cls.cache_args == repo_name).all()
 
        log.debug('for repo %s got %s invalidation objects',
 
                  safe_str(repo_name), inv_objs)
 

	
 
        for inv_obj in inv_objs:
 
            log.debug('marking %s key for invalidation based on repo_name=%s',
 
                      inv_obj, safe_str(repo_name))
 
            Session().delete(inv_obj)
 
        Session().commit()
 

	
 
    @classmethod
 
    def test_and_set_valid(cls, repo_name, kind, valid_cache_keys=None):
 
        """
 
        Mark this cache key as active and currently cached.
 
        Return True if the existing cache registration still was valid.
 
        Return False to indicate that it had been invalidated and caches should be refreshed.
 
        """
 

	
 
        key = (repo_name + '_' + kind) if kind else repo_name
 
        cache_key = cls._get_cache_key(key)
 

	
 
        if valid_cache_keys and cache_key in valid_cache_keys:
 
            return True
 

	
 
        inv_obj = cls.query().filter(cls.cache_key == cache_key).scalar()
 
        if inv_obj is None:
 
            inv_obj = cls(cache_key, repo_name)
 
            Session().add(inv_obj)
 
        elif inv_obj.cache_active:
 
            return True
 
        inv_obj.cache_active = True
 
        try:
 
            Session().commit()
 
        except sqlalchemy.exc.IntegrityError:
 
            log.error('commit of CacheInvalidation failed - retrying')
 
            Session().rollback()
 
            inv_obj = cls.query().filter(cls.cache_key == cache_key).scalar()
 
            if inv_obj is None:
 
                log.error('failed to create CacheInvalidation entry')
 
                # TODO: fail badly?
 
            # else: TOCTOU - another thread added the key at the same time; no further action required
 
        return False
 

	
 
    @classmethod
 
    def get_valid_cache_keys(cls):
 
        """
 
        Return opaque object with information of which caches still are valid
 
        and can be used without checking for invalidation.
 
        """
 
        return set(inv_obj.cache_key for inv_obj in cls.query().filter(cls.cache_active).all())
 

	
 

	
 
class ChangesetComment(Base, BaseDbModel):
 
    __tablename__ = 'changeset_comments'
 
    __table_args__ = (
 
        Index('cc_revision_idx', 'revision'),
 
        Index('cc_pull_request_id_idx', 'pull_request_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    comment_id = Column(Integer(), primary_key=True)
 
    repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    revision = Column(String(40), nullable=True)
 
    pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=True)
 
    line_no = Column(Unicode(10), nullable=True)
 
    f_path = Column(Unicode(1000), nullable=True)
 
    author_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    text = Column(UnicodeText(), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    modified_at = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    author = relationship('User')
 
    repo = relationship('Repository')
 
    # status_change is frequently used directly in templates - make it a lazy
 
    # join to avoid fetching each related ChangesetStatus on demand.
 
    # There will only be one ChangesetStatus referencing each comment so the join will not explode.
 
    status_change = relationship('ChangesetStatus',
 
                                 cascade="all, delete-orphan", lazy='joined')
 
    pull_request = relationship('PullRequest')
 

	
 
    def url(self):
 
        anchor = "comment-%s" % self.comment_id
 
        import kallithea.lib.helpers as h
 
        if self.revision:
 
            return h.url('changeset_home', repo_name=self.repo.repo_name, revision=self.revision, anchor=anchor)
 
        elif self.pull_request_id is not None:
 
            return self.pull_request.url(anchor=anchor)
 

	
 
    def __json__(self):
 
        return dict(
 
            comment_id=self.comment_id,
 
            username=self.author.username,
 
            text=self.text,
 
        )
 

	
 
    def deletable(self):
 
        return self.created_on > datetime.datetime.now() - datetime.timedelta(minutes=5)
 

	
 

	
 
class ChangesetStatus(Base, BaseDbModel):
 
    __tablename__ = 'changeset_statuses'
 
    __table_args__ = (
 
        Index('cs_revision_idx', 'revision'),
 
        Index('cs_version_idx', 'version'),
 
        Index('cs_pull_request_id_idx', 'pull_request_id'),
 
        Index('cs_changeset_comment_id_idx', 'changeset_comment_id'),
 
        Index('cs_pull_request_id_user_id_version_idx', 'pull_request_id', 'user_id', 'version'),
 
        Index('cs_repo_id_pull_request_id_idx', 'repo_id', 'pull_request_id'),
 
        UniqueConstraint('repo_id', 'revision', 'version'),
 
        _table_args_default_dict,
 
    )
 

	
 
    STATUS_NOT_REVIEWED = DEFAULT = 'not_reviewed'
 
    STATUS_APPROVED = 'approved'
 
    STATUS_REJECTED = 'rejected' # is shown as "Not approved" - TODO: change database content / scheme
 
    STATUS_UNDER_REVIEW = 'under_review'
 

	
 
    STATUSES = [
 
        (STATUS_NOT_REVIEWED, _("Not reviewed")),  # (no icon) and default
 
        (STATUS_UNDER_REVIEW, _("Under review")),
 
        (STATUS_REJECTED, _("Not approved")),
 
        (STATUS_APPROVED, _("Approved")),
 
    ]
 
    STATUSES_DICT = dict(STATUSES)
 

	
 
    changeset_status_id = Column(Integer(), primary_key=True)
 
    repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    revision = Column(String(40), nullable=True)
 
    status = Column(String(128), nullable=False, default=DEFAULT)
 
    comment_id = Column('changeset_comment_id', Integer(), ForeignKey('changeset_comments.comment_id'), nullable=False)
 
    modified_at = Column(DateTime(), nullable=False, default=datetime.datetime.now)
 
    version = Column(Integer(), nullable=False, default=0)
 
    pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=True)
 

	
 
    author = relationship('User')
 
    repo = relationship('Repository')
 
    comment = relationship('ChangesetComment')
 
    pull_request = relationship('PullRequest')
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s')>" % (
 
            self.__class__.__name__,
 
            self.status, self.author
 
        )
 

	
 
    @classmethod
 
    def get_status_lbl(cls, value):
 
        return cls.STATUSES_DICT.get(value)
 

	
 
    @property
 
    def status_lbl(self):
 
        return ChangesetStatus.get_status_lbl(self.status)
 

	
 
    def __json__(self):
 
        return dict(
 
            status=self.status,
 
            modified_at=self.modified_at.replace(microsecond=0),
 
            reviewer=self.author.username,
 
            )
 

	
 

	
 
class PullRequest(Base, BaseDbModel):
 
    __tablename__ = 'pull_requests'
 
    __table_args__ = (
 
        Index('pr_org_repo_id_idx', 'org_repo_id'),
 
        Index('pr_other_repo_id_idx', 'other_repo_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    # values for .status
 
    STATUS_NEW = u'new'
 
    STATUS_CLOSED = u'closed'
 

	
 
    pull_request_id = Column(Integer(), primary_key=True)
 
    title = Column(Unicode(255), nullable=False)
 
    description = Column(UnicodeText(), nullable=False)
 
    status = Column(Unicode(255), nullable=False, default=STATUS_NEW) # only for closedness, not approve/reject/etc
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    updated_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    _revisions = Column('revisions', UnicodeText(), nullable=False)
 
    org_repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    org_ref = Column(Unicode(255), nullable=False)
 
    other_repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    other_ref = Column(Unicode(255), nullable=False)
 

	
 
    @hybrid_property
 
    def revisions(self):
 
        return self._revisions.split(':')
 

	
 
    @revisions.setter
 
    def revisions(self, val):
 
        self._revisions = safe_unicode(':'.join(val))
 

	
 
    @property
 
    def org_ref_parts(self):
 
        return self.org_ref.split(':')
 

	
 
    @property
 
    def other_ref_parts(self):
 
        return self.other_ref.split(':')
 

	
 
    owner = relationship('User')
 
    reviewers = relationship('PullRequestReviewer',
 
                             cascade="all, delete-orphan")
 
    org_repo = relationship('Repository', primaryjoin='PullRequest.org_repo_id==Repository.repo_id')
 
    other_repo = relationship('Repository', primaryjoin='PullRequest.other_repo_id==Repository.repo_id')
 
    statuses = relationship('ChangesetStatus', order_by='ChangesetStatus.changeset_status_id')
 
    comments = relationship('ChangesetComment', order_by='ChangesetComment.comment_id',
 
                             cascade="all, delete-orphan")
 

	
 
    @classmethod
 
    def query(cls, reviewer_id=None, include_closed=True, sorted=False):
 
        """Add PullRequest-specific helpers for common query constructs.
 

	
 
        reviewer_id: only PRs with the specified user added as reviewer.
 

	
 
        include_closed: if False, do not include closed PRs.
 

	
 
        sorted: if True, apply the default ordering (newest first).
 
        """
 
        q = super(PullRequest, cls).query()
 

	
 
        if reviewer_id is not None:
 
            q = q.join(PullRequestReviewer).filter(PullRequestReviewer.user_id == reviewer_id)
 

	
 
        if not include_closed:
 
            q = q.filter(PullRequest.status != PullRequest.STATUS_CLOSED)
 

	
 
        if sorted:
 
            q = q.order_by(PullRequest.created_on.desc())
 

	
 
        return q
 

	
 
    def get_reviewer_users(self):
 
        """Like .reviewers, but actually returning the users"""
 
        return User.query() \
 
            .join(PullRequestReviewer) \
 
            .filter(PullRequestReviewer.pull_request == self) \
 
            .order_by(PullRequestReviewer.pull_request_reviewers_id) \
 
            .all()
 

	
 
    def is_closed(self):
 
        return self.status == self.STATUS_CLOSED
 

	
 
    def user_review_status(self, user_id):
 
        """Return the user's latest status votes on PR"""
 
        # note: no filtering on repo - that would be redundant
 
        status = ChangesetStatus.query() \
 
            .filter(ChangesetStatus.pull_request == self) \
 
            .filter(ChangesetStatus.user_id == user_id) \
 
            .order_by(ChangesetStatus.version) \
 
            .first()
 
        return str(status.status) if status else ''
 

	
 
    @classmethod
 
    def make_nice_id(cls, pull_request_id):
 
        '''Return pull request id nicely formatted for displaying'''
 
        return '#%s' % pull_request_id
 

	
 
    def nice_id(self):
 
        '''Return the id of this pull request, nicely formatted for displaying'''
 
        return self.make_nice_id(self.pull_request_id)
 

	
 
    def get_api_data(self):
 
        return self.__json__()
 

	
 
    def __json__(self):
 
        return dict(
 
            pull_request_id=self.pull_request_id,
 
            url=self.url(),
 
            reviewers=self.reviewers,
 
            revisions=self.revisions,
 
            owner=self.owner.username,
 
            title=self.title,
 
            description=self.description,
 
            org_repo_url=self.org_repo.clone_url(),
 
            org_ref_parts=self.org_ref_parts,
 
            other_ref_parts=self.other_ref_parts,
 
            status=self.status,
 
            comments=self.comments,
 
            statuses=self.statuses,
 
        )
 

	
 
    def url(self, **kwargs):
 
        canonical = kwargs.pop('canonical', None)
 
        import kallithea.lib.helpers as h
 
        b = self.org_ref_parts[1]
 
        if b != self.other_ref_parts[1]:
 
            s = '/_/' + b
 
        else:
 
            s = '/_/' + self.title
 
        kwargs['extra'] = urlreadable(s)
 
        if canonical:
 
            return h.canonical_url('pullrequest_show', repo_name=self.other_repo.repo_name,
 
                                   pull_request_id=self.pull_request_id, **kwargs)
 
        return h.url('pullrequest_show', repo_name=self.other_repo.repo_name,
 
                     pull_request_id=self.pull_request_id, **kwargs)
 

	
 

	
 
class PullRequestReviewer(Base, BaseDbModel):
 
    __tablename__ = 'pull_request_reviewers'
 
    __table_args__ = (
 
        Index('pull_request_reviewers_user_id_idx', 'user_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    def __init__(self, user=None, pull_request=None):
 
        self.user = user
 
        self.pull_request = pull_request
 

	
 
    pull_request_reviewers_id = Column('pull_requests_reviewers_id', Integer(), primary_key=True)
 
    pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=False)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    pull_request = relationship('PullRequest')
 

	
 
    def __json__(self):
 
        return dict(
 
            username=self.user.username if self.user else None,
 
        )
 

	
 

	
 
class Notification(Base, BaseDbModel):
 
class Notification(object):
 
    __tablename__ = 'notifications'
 
    __table_args__ = (
 
        Index('notification_type_idx', 'type'),
 
        _table_args_default_dict,
 
    )
 

	
 
    TYPE_CHANGESET_COMMENT = u'cs_comment'
 
    TYPE_MESSAGE = u'message'
 
    TYPE_MENTION = u'mention' # not used
 
    TYPE_REGISTRATION = u'registration'
 
    TYPE_PULL_REQUEST = u'pull_request'
 
    TYPE_PULL_REQUEST_COMMENT = u'pull_request_comment'
 

	
 
    notification_id = Column(Integer(), primary_key=True)
 
    subject = Column(Unicode(512), nullable=False)
 
    body = Column(UnicodeText(), nullable=False)
 
    created_by = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    type_ = Column('type', Unicode(255), nullable=False)
 

	
 
    created_by_user = relationship('User')
 
    notifications_to_users = relationship('UserNotification', cascade="all, delete-orphan")
 

	
 
    @property
 
    def recipients(self):
 
        return [x.user for x in UserNotification.query()
 
                .filter(UserNotification.notification == self)
 
                .order_by(UserNotification.user_id.asc()).all()]
 

	
 
    @classmethod
 
    def create(cls, created_by, subject, body, recipients, type_=None):
 
        if type_ is None:
 
            type_ = Notification.TYPE_MESSAGE
 

	
 
        notification = cls()
 
        notification.created_by_user = created_by
 
        notification.subject = subject
 
        notification.body = body
 
        notification.type_ = type_
 
        notification.created_on = datetime.datetime.now()
 

	
 
        for recipient in recipients:
 
            un = UserNotification()
 
            un.notification = notification
 
            un.user_id = recipient.user_id
 
            # Mark notifications to self "pre-read" - should perhaps just be skipped
 
            if recipient == created_by:
 
                un.read = True
 
            Session().add(un)
 

	
 
        Session().add(notification)
 
        Session().flush() # assign notification.notification_id
 
        return notification
 

	
 
    @property
 
    def description(self):
 
        from kallithea.model.notification import NotificationModel
 
        return NotificationModel().make_description(self)
 

	
 

	
 
class UserNotification(Base, BaseDbModel):
 

	
 
class UserNotification(object):
 
    __tablename__ = 'user_to_notification'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'notification_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), primary_key=True)
 
    notification_id = Column(Integer(), ForeignKey('notifications.notification_id'), primary_key=True)
 
    read = Column(Boolean, nullable=False, default=False)
 
    sent_on = Column(DateTime(timezone=False), nullable=True) # FIXME: not nullable?
 

	
 
    user = relationship('User')
 
    notification = relationship('Notification')
 

	
 
    def mark_as_read(self):
 
        self.read = True
 

	
 

	
 
class Gist(Base, BaseDbModel):
 
    __tablename__ = 'gists'
 
    __table_args__ = (
 
        Index('g_gist_access_id_idx', 'gist_access_id'),
 
        Index('g_created_on_idx', 'created_on'),
 
        _table_args_default_dict,
 
    )
 

	
 
    GIST_PUBLIC = u'public'
 
    GIST_PRIVATE = u'private'
 
    DEFAULT_FILENAME = u'gistfile1.txt'
 

	
 
    gist_id = Column(Integer(), primary_key=True)
 
    gist_access_id = Column(Unicode(250), nullable=False)
 
    gist_description = Column(UnicodeText(), nullable=False)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    gist_expires = Column(Float(53), nullable=False)
 
    gist_type = Column(Unicode(128), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    modified_at = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    owner = relationship('User')
 

	
 
    @hybrid_property
 
    def is_expired(self):
 
        return (self.gist_expires != -1) & (time.time() > self.gist_expires)
 

	
 
    def __repr__(self):
 
        return '<Gist:[%s]%s>' % (self.gist_type, self.gist_access_id)
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(Gist, cls).guess_instance(value, Gist.get_by_access_id)
 

	
 
    @classmethod
 
    def get_or_404(cls, id_):
 
        res = cls.query().filter(cls.gist_access_id == id_).scalar()
 
        if res is None:
 
            raise HTTPNotFound
 
        return res
 

	
 
    @classmethod
 
    def get_by_access_id(cls, gist_access_id):
 
        return cls.query().filter(cls.gist_access_id == gist_access_id).scalar()
 

	
 
    def gist_url(self):
 
        import kallithea
 
        alias_url = kallithea.CONFIG.get('gist_alias_url')
 
        if alias_url:
 
            return alias_url.replace('{gistid}', self.gist_access_id)
 

	
 
        import kallithea.lib.helpers as h
 
        return h.canonical_url('gist', gist_id=self.gist_access_id)
 

	
 
    @classmethod
 
    def base_path(cls):
 
        """
 
        Returns base path where all gists are stored
 

	
 
        :param cls:
 
        """
 
        from kallithea.model.gist import GIST_STORE_LOC
 
        q = Session().query(Ui) \
 
            .filter(Ui.ui_key == URL_SEP)
 
        q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
 
        return os.path.join(q.one().ui_value, GIST_STORE_LOC)
 

	
 
    def get_api_data(self):
 
        """
 
        Common function for generating gist related data for API
 
        """
 
        gist = self
 
        data = dict(
 
            gist_id=gist.gist_id,
 
            type=gist.gist_type,
 
            access_id=gist.gist_access_id,
 
            description=gist.gist_description,
 
            url=gist.gist_url(),
 
            expires=gist.gist_expires,
 
            created_on=gist.created_on,
 
        )
 
        return data
 

	
 
    def __json__(self):
 
        data = dict(
 
        )
 
        data.update(self.get_api_data())
 
        return data
 
    ## SCM functions
 

	
 
    @property
 
    def scm_instance(self):
 
        from kallithea.lib.vcs import get_repo
 
        base_path = self.base_path()
 
        return get_repo(os.path.join(*map(safe_str,
 
                                          [base_path, self.gist_access_id])))
kallithea/model/notification.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.notification
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Model for notifications
 

	
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Nov 20, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import datetime
 
import logging
 
import traceback
 

	
 
from tg import tmpl_context as c, app_globals
 
from tg.i18n import ugettext as _
 
from sqlalchemy.orm import joinedload, subqueryload
 

	
 
import kallithea
 
from kallithea.lib import helpers as h
 
from kallithea.lib.utils2 import safe_unicode
 
from kallithea.model.db import Notification, User, UserNotification
 
from kallithea.model.db import Notification, User
 
from kallithea.model.meta import Session
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class NotificationModel(object):
 

	
 
    def create(self, created_by, subject, body, recipients=None,
 
               type_=Notification.TYPE_MESSAGE, with_email=True,
 
               email_kwargs=None, repo_name=None):
 
        """
 

	
 
        Creates notification of given type
 

	
 
        :param created_by: int, str or User instance. User who created this
 
            notification
 
        :param subject:
 
        :param body:
 
        :param recipients: list of int, str or User objects, when None
 
            is given send to all admins
 
        :param type_: type of notification
 
        :param with_email: send email with this notification
 
        :param email_kwargs: additional dict to pass as args to email template
 
        """
 
        from kallithea.lib.celerylib import tasks
 
        email_kwargs = email_kwargs or {}
 
        if recipients and not getattr(recipients, '__iter__', False):
 
            raise Exception('recipients must be a list or iterable')
 

	
 
        created_by_obj = User.guess_instance(created_by)
 

	
 
        recipients_objs = set()
 
        if recipients:
 
            for u in recipients:
 
                obj = User.guess_instance(u)
 
                if obj is not None:
 
                    recipients_objs.add(obj)
 
                else:
 
                    # TODO: inform user that requested operation couldn't be completed
 
                    log.error('cannot email unknown user %r', u)
 
            log.debug('sending notifications %s to %s',
 
                type_, recipients_objs
 
            )
 
        elif recipients is None:
 
            # empty recipients means to all admins
 
            recipients_objs = User.query().filter(User.admin == True).all()
 
            log.debug('sending notifications %s to admins: %s',
 
                type_, recipients_objs
 
            )
 
        #else: silently skip notification mails?
 

	
 
        # TODO: inform user who are notified
 
        notif = Notification.create(
 
            created_by=created_by_obj, subject=subject,
 
            body=body, recipients=recipients_objs, type_=type_
 
        )
 

	
 
        if not with_email:
 
            return notif
 
            return
 

	
 
        headers = {}
 
        headers['X-Kallithea-Notification-Type'] = type_
 
        if 'threading' in email_kwargs:
 
            headers['References'] = ' '.join('<%s>' % x for x in email_kwargs['threading'])
 

	
 
        # this is passed into template
 
        created_on = h.fmt_date(datetime.datetime.now())
 
        html_kwargs = {
 
                  'subject': subject,
 
                  'body': h.render_w_mentions(body, repo_name),
 
                  'when': h.fmt_date(notif.created_on),
 
                  'user': notif.created_by_user.username,
 
                  'when': created_on,
 
                  'user': created_by_obj.username,
 
                  }
 

	
 
        txt_kwargs = {
 
                  'subject': subject,
 
                  'body': body,
 
                  'when': h.fmt_date(notif.created_on),
 
                  'user': notif.created_by_user.username,
 
                  'when': created_on,
 
                  'user': created_by_obj.username,
 
                  }
 

	
 
        html_kwargs.update(email_kwargs)
 
        txt_kwargs.update(email_kwargs)
 
        email_subject = EmailNotificationModel() \
 
                            .get_email_description(type_, **txt_kwargs)
 
        email_txt_body = EmailNotificationModel() \
 
                            .get_email_tmpl(type_, 'txt', **txt_kwargs)
 
        email_html_body = EmailNotificationModel() \
 
                            .get_email_tmpl(type_, 'html', **html_kwargs)
 

	
 
        # don't send email to person who created this comment
 
        rec_objs = set(recipients_objs).difference(set([created_by_obj]))
 

	
 
        # send email with notification to all other participants
 
        for rec in rec_objs:
 
            tasks.send_email([rec.email], email_subject, email_txt_body,
 
                     email_html_body, headers, author=created_by_obj)
 

	
 
        return notif
 

	
 
    def delete(self, user, notification):
 
        # we don't want to remove actual notification just the assignment
 
        try:
 
            notification = Notification.guess_instance(notification)
 
            user = User.guess_instance(user)
 
            if notification and user:
 
                obj = UserNotification.query() \
 
                        .filter(UserNotification.user == user) \
 
                        .filter(UserNotification.notification
 
                                == notification) \
 
                        .one()
 
                Session().delete(obj)
 
                return True
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def query_for_user(self, user, filter_=None):
 
        """
 
        Get notifications for given user, filter them if filter dict is given
 

	
 
        :param user:
 
        :param filter:
 
        """
 
        user = User.guess_instance(user)
 

	
 
        q = UserNotification.query() \
 
            .filter(UserNotification.user == user) \
 
            .join((Notification, UserNotification.notification_id ==
 
                                 Notification.notification_id)) \
 
            .options(joinedload('notification')) \
 
            .options(subqueryload('notification.created_by_user')) \
 
            .order_by(Notification.created_on.desc())
 

	
 
        if filter_:
 
            q = q.filter(Notification.type_.in_(filter_))
 

	
 
        return q
 

	
 
    def mark_read(self, user, notification):
 
        try:
 
            notification = Notification.guess_instance(notification)
 
            user = User.guess_instance(user)
 
            if notification and user:
 
                obj = UserNotification.query() \
 
                        .filter(UserNotification.user == user) \
 
                        .filter(UserNotification.notification
 
                                == notification) \
 
                        .one()
 
                obj.read = True
 
                return True
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def mark_all_read_for_user(self, user, filter_=None):
 
        user = User.guess_instance(user)
 
        q = UserNotification.query() \
 
            .filter(UserNotification.user == user) \
 
            .filter(UserNotification.read == False) \
 
            .join((Notification, UserNotification.notification_id ==
 
                                 Notification.notification_id))
 
        if filter_:
 
            q = q.filter(Notification.type_.in_(filter_))
 

	
 
        # this is a little inefficient but sqlalchemy doesn't support
 
        # update on joined tables :(
 
        for obj in q:
 
            obj.read = True
 

	
 
    def get_unread_cnt_for_user(self, user):
 
        user = User.guess_instance(user)
 
        return UserNotification.query() \
 
                .filter(UserNotification.read == False) \
 
                .filter(UserNotification.user == user).count()
 

	
 
    def get_unread_for_user(self, user):
 
        user = User.guess_instance(user)
 
        return [x.notification for x in UserNotification.query() \
 
                .filter(UserNotification.read == False) \
 
                .filter(UserNotification.user == user).all()]
 

	
 
    def get_user_notification(self, user, notification):
 
        user = User.guess_instance(user)
 
        notification = Notification.guess_instance(notification)
 

	
 
        return UserNotification.query() \
 
            .filter(UserNotification.notification == notification) \
 
            .filter(UserNotification.user == user).scalar()
 

	
 
    def make_description(self, notification, show_age=True):
 
        """
 
        Creates a human readable description based on properties
 
        of notification object
 
        """
 
        # alias
 
        _n = notification
 

	
 
        if show_age:
 
            return {
 
                    _n.TYPE_CHANGESET_COMMENT: _('%(user)s commented on changeset %(age)s'),
 
                    _n.TYPE_MESSAGE: _('%(user)s sent message %(age)s'),
 
                    _n.TYPE_MENTION: _('%(user)s mentioned you %(age)s'),
 
                    _n.TYPE_REGISTRATION: _('%(user)s registered in Kallithea %(age)s'),
 
                    _n.TYPE_PULL_REQUEST: _('%(user)s opened new pull request %(age)s'),
 
                    _n.TYPE_PULL_REQUEST_COMMENT: _('%(user)s commented on pull request %(age)s'),
 
                }[notification.type_] % dict(
 
                    user=notification.created_by_user.username,
 
                    age=h.age(notification.created_on),
 
                )
 
        else:
 
            return {
 
                    _n.TYPE_CHANGESET_COMMENT: _('%(user)s commented on changeset at %(when)s'),
 
                    _n.TYPE_MESSAGE: _('%(user)s sent message at %(when)s'),
 
                    _n.TYPE_MENTION: _('%(user)s mentioned you at %(when)s'),
 
                    _n.TYPE_REGISTRATION: _('%(user)s registered in Kallithea at %(when)s'),
 
                    _n.TYPE_PULL_REQUEST: _('%(user)s opened new pull request at %(when)s'),
 
                    _n.TYPE_PULL_REQUEST_COMMENT: _('%(user)s commented on pull request at %(when)s'),
 
                }[notification.type_] % dict(
 
                    user=notification.created_by_user.username,
 
                    when=h.fmt_date(notification.created_on),
 
                )
 

	
 

	
 
class EmailNotificationModel(object):
 

	
 
    TYPE_CHANGESET_COMMENT = Notification.TYPE_CHANGESET_COMMENT
 
    TYPE_MESSAGE = Notification.TYPE_MESSAGE # only used for testing
 
    # Notification.TYPE_MENTION is not used
 
    TYPE_PASSWORD_RESET = 'password_link'
 
    TYPE_REGISTRATION = Notification.TYPE_REGISTRATION
 
    TYPE_PULL_REQUEST = Notification.TYPE_PULL_REQUEST
 
    TYPE_PULL_REQUEST_COMMENT = Notification.TYPE_PULL_REQUEST_COMMENT
 
    TYPE_DEFAULT = 'default'
 

	
 
    def __init__(self):
 
        super(EmailNotificationModel, self).__init__()
 
        self._template_root = kallithea.CONFIG['paths']['templates'][0]
 
        self._tmpl_lookup = app_globals.mako_lookup
 
        self.email_types = {
 
            self.TYPE_CHANGESET_COMMENT: 'changeset_comment',
 
            self.TYPE_PASSWORD_RESET: 'password_reset',
 
            self.TYPE_REGISTRATION: 'registration',
 
            self.TYPE_DEFAULT: 'default',
 
            self.TYPE_PULL_REQUEST: 'pull_request',
 
            self.TYPE_PULL_REQUEST_COMMENT: 'pull_request_comment',
 
        }
 
        self._subj_map = {
 
            self.TYPE_CHANGESET_COMMENT: _('[Comment] %(repo_name)s changeset %(short_id)s "%(message_short)s" on %(branch)s'),
 
            self.TYPE_MESSAGE: 'Test Message',
 
            # self.TYPE_PASSWORD_RESET
 
            self.TYPE_REGISTRATION: _('New user %(new_username)s registered'),
 
            # self.TYPE_DEFAULT
 
            self.TYPE_PULL_REQUEST: _('[Review] %(repo_name)s PR %(pr_nice_id)s "%(pr_title_short)s" from %(pr_source_branch)s by %(pr_owner_username)s'),
 
            self.TYPE_PULL_REQUEST_COMMENT: _('[Comment] %(repo_name)s PR %(pr_nice_id)s "%(pr_title_short)s" from %(pr_source_branch)s by %(pr_owner_username)s'),
 
        }
 

	
 
    def get_email_description(self, type_, **kwargs):
 
        """
 
        return subject for email based on given type
 
        """
 
        tmpl = self._subj_map[type_]
 
        try:
 
            subj = tmpl % kwargs
 
        except KeyError as e:
 
            log.error('error generating email subject for %r from %s: %s', type_, ','.join(self._subj_map.keys()), e)
 
            raise
 
        l = [safe_unicode(x) for x in [kwargs.get('status_change'), kwargs.get('closing_pr') and _('Closing')] if x]
 
        if l:
 
            if subj.startswith('['):
 
                subj = '[' + ', '.join(l) + ': ' + subj[1:]
 
            else:
 
                subj = '[' + ', '.join(l) + '] ' + subj
 
        return subj
 

	
 
    def get_email_tmpl(self, type_, content_type, **kwargs):
 
        """
 
        return generated template for email based on given type
 
        """
 

	
 
        base = 'email_templates/' + self.email_types.get(type_, self.email_types[self.TYPE_DEFAULT]) + '.' + content_type
 
        email_template = self._tmpl_lookup.get_template(base)
 
        # translator and helpers inject
 
        _kwargs = {'_': _,
 
                   'h': h,
 
                   'c': c}
 
        _kwargs.update(kwargs)
 
        if content_type == 'html':
 
            _kwargs.update({
 
                "color_text": "#202020",
 
                "color_emph": "#395fa0",
 
                "color_link": "#395fa0",
 
                "color_border": "#ddd",
 
                "color_background_grey": "#f9f9f9",
 
                "color_button": "#395fa0",
 
                "monospace_style": "font-family:Lucida Console,Consolas,Monaco,Inconsolata,Liberation Mono,monospace",
 
                "sans_style": "font-family:Helvetica,Arial,sans-serif",
 
                })
 
            _kwargs.update({
 
                "default_style": "%(sans_style)s;font-weight:200;font-size:14px;line-height:17px;color:%(color_text)s" % _kwargs,
 
                "comment_style": "%(monospace_style)s;white-space:pre-wrap" % _kwargs,
 
                "data_style": "border:%(color_border)s 1px solid;background:%(color_background_grey)s" % _kwargs,
 
                "emph_style": "font-weight:600;color:%(color_emph)s" % _kwargs,
 
                "link_style": "color:%(color_link)s;text-decoration:none" % _kwargs,
 
                "link_text_style": "color:%(color_text)s;text-decoration:none;border:%(color_border)s 1px solid;background:%(color_background_grey)s" % _kwargs,
 
                })
 

	
 
        log.debug('rendering tmpl %s with kwargs %s', base, _kwargs)
 
        return email_template.render_unicode(**_kwargs)
kallithea/tests/models/test_notifications.py
Show inline comments
 
import os
 
import re
 

	
 
import mock
 
import routes.util
 

	
 
from kallithea.tests.base import *
 
from kallithea.lib import helpers as h
 
from kallithea.model.db import User, Notification
 
from kallithea.model.user import UserModel
 
from kallithea.model.meta import Session
 
from kallithea.model.notification import NotificationModel, EmailNotificationModel
 

	
 
import kallithea.lib.celerylib
 
import kallithea.lib.celerylib.tasks
 

	
 
from tg.util.webtest import test_context
 

	
 

	
 
class TestNotifications(TestController):
 

	
 
    def setup_method(self, method):
 
        Session.remove()
 
        u1 = UserModel().create_or_update(username=u'u1',
 
                                        password=u'qweqwe',
 
                                        email=u'u1@example.com',
 
                                        firstname=u'u1', lastname=u'u1')
 
        Session().commit()
 
        self.u1 = u1.user_id
 

	
 
        u2 = UserModel().create_or_update(username=u'u2',
 
                                        password=u'qweqwe',
 
                                        email=u'u2@example.com',
 
                                        firstname=u'u2', lastname=u'u3')
 
        Session().commit()
 
        self.u2 = u2.user_id
 

	
 
        u3 = UserModel().create_or_update(username=u'u3',
 
                                        password=u'qweqwe',
 
                                        email=u'u3@example.com',
 
                                        firstname=u'u3', lastname=u'u3')
 
        Session().commit()
 
        self.u3 = u3.user_id
 

	
 
    def test_create_notification(self):
 
        with test_context(self.app):
 
            usrs = [self.u1, self.u2]
 

	
 
            def send_email(recipients, subject, body='', html_body='', headers=None, author=None):
 
                assert recipients == ['u2@example.com']
 
                assert subject == 'Test Message'
 
                assert body == u"hi there"
 
                assert '>hi there<' in html_body
 
                assert author.username == 'u1'
 
            with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', send_email):
 
                notification = NotificationModel().create(created_by=self.u1,
 
                NotificationModel().create(created_by=self.u1,
 
                                                   subject=u'subj', body=u'hi there',
 
                                                   recipients=usrs)
 

	
 
    @mock.patch.object(h, 'canonical_url', (lambda arg, **kwargs: 'http://%s/?%s' % (arg, '&'.join('%s=%s' % (k, v) for (k, v) in sorted(kwargs.items())))))
 
    def test_dump_html_mails(self):
 
        # Exercise all notification types and dump them to one big html file
 
        l = []
 

	
 
        def send_email(recipients, subject, body='', html_body='', headers=None, author=None):
 
            l.append('<hr/>\n')
 
            l.append('<h1>%s</h1>\n' % desc) # desc is from outer scope
 
            l.append('<pre>\n')
 
            l.append('From: %s\n' % author.username)
 
            l.append('To: %s\n' % ' '.join(recipients))
 
            l.append('Subject: %s\n' % subject)
 
            l.append('</pre>\n')
 
            l.append('<hr/>\n')
 
            l.append('<pre>%s</pre>\n' % body)
 
            l.append('<hr/>\n')
 
            l.append(html_body)
 
            l.append('<hr/>\n')
 

	
 
        with test_context(self.app):
 
            with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', send_email):
 
                pr_kwargs = dict(
 
                    pr_nice_id='#7',
 
                    pr_title='The Title',
 
                    pr_title_short='The Title',
 
                    pr_url='http://pr.org/7',
 
                    pr_target_repo='http://mainline.com/repo',
 
                    pr_target_branch='trunk',
 
                    pr_source_repo='https://dev.org/repo',
 
                    pr_source_branch='devbranch',
 
                    pr_owner=User.get(self.u2),
 
                    pr_owner_username='u2'
 
                    )
 

	
 
                for type_, body, kwargs in [
 
                        (Notification.TYPE_CHANGESET_COMMENT,
 
                         u'This is the new \'comment\'.\n\n - and here it ends indented.',
 
                         dict(
 
                            short_id='cafe1234',
 
                            raw_id='cafe1234c0ffeecafe',
 
                            branch='brunch',
 
                            cs_comment_user='Opinionated User (jsmith)',
 
                            cs_comment_url='http://comment.org',
 
                            is_mention=[False, True],
 
                            message='This changeset did something clever which is hard to explain',
 
                            message_short='This changeset did something cl...',
 
                            status_change=[None, 'Approved'],
 
                            cs_target_repo='http://example.com/repo_target',
 
                            cs_url='http://changeset.com',
 
                            cs_author=User.get(self.u2))),
 
                        (Notification.TYPE_MESSAGE,
 
                         u'This is the \'body\' of the "test" message\n - nothing interesting here except indentation.',
 
                         dict()),
 
                        #(Notification.TYPE_MENTION, '$body', None), # not used
 
                        (Notification.TYPE_REGISTRATION,
 
                         u'Registration body',
 
                         dict(
 
                            new_username='newbie',
 
                            registered_user_url='http://newbie.org',
 
                            new_email='new@email.com',
 
                            new_full_name='New Full Name')),
 
                        (Notification.TYPE_PULL_REQUEST,
 
                         u'This PR is \'awesome\' because it does <stuff>\n - please approve indented!',
 
                         dict(
 
                            pr_user_created='Requesting User (root)', # pr_owner should perhaps be used for @mention in description ...
 
                            is_mention=[False, True],
 
                            pr_revisions=[('123abc'*7, "Introduce one and two\n\nand that's it"), ('567fed'*7, 'Make one plus two equal tree')],
 
                            org_repo_name='repo_org',
 
                            **pr_kwargs)),
 
                        (Notification.TYPE_PULL_REQUEST_COMMENT,
 
                         u'Me too!\n\n - and indented on second line',
 
                         dict(
 
                            closing_pr=[False, True],
 
                            is_mention=[False, True],
 
                            pr_comment_user='Opinionated User (jsmith)',
 
                            pr_comment_url='http://pr.org/comment',
 
                            status_change=[None, 'Under Review'],
 
                            **pr_kwargs)),
 
                        ]:
 
                    kwargs['repo_name'] = u'repo/name'
 
                    params = [(type_, type_, body, kwargs)]
 
                    for param_name in ['is_mention', 'status_change', 'closing_pr']: # TODO: inline/general
 
                        if not isinstance(kwargs.get(param_name), list):
 
                            continue
 
                        new_params = []
 
                        for v in kwargs[param_name]:
 
                            for desc, type_, body, kwargs in params:
 
                                kwargs = dict(kwargs)
 
                                kwargs[param_name] = v
 
                                new_params.append(('%s, %s=%r' % (desc, param_name, v), type_, body, kwargs))
 
                        params = new_params
 

	
 
                    for desc, type_, body, kwargs in params:
 
                        # desc is used as "global" variable
 
                        notification = NotificationModel().create(created_by=self.u1,
 
                        NotificationModel().create(created_by=self.u1,
 
                                                           subject=u'unused', body=body, email_kwargs=kwargs,
 
                                                           recipients=[self.u2], type_=type_)
 

	
 
                # Email type TYPE_PASSWORD_RESET has no corresponding notification type - test it directly:
 
                desc = 'TYPE_PASSWORD_RESET'
 
                kwargs = dict(user='John Doe', reset_token='decbf64715098db5b0bd23eab44bd792670ab746', reset_url='http://reset.com/decbf64715098db5b0bd23eab44bd792670ab746')
 
                kallithea.lib.celerylib.tasks.send_email(['john@doe.com'],
 
                    "Password reset link",
 
                    EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'txt', **kwargs),
 
                    EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'html', **kwargs),
 
                    author=User.get(self.u1))
 

	
 
        out = '<!doctype html>\n<html lang="en">\n<head><title>Notifications</title><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"></head>\n<body>\n%s\n</body>\n</html>\n' % \
 
            re.sub(r'<(/?(?:!doctype|html|head|title|meta|body)\b[^>]*)>', r'<!--\1-->', ''.join(l))
 

	
 
        outfn = os.path.join(os.path.dirname(__file__), 'test_dump_html_mails.out.html')
 
        reffn = os.path.join(os.path.dirname(__file__), 'test_dump_html_mails.ref.html')
 
        with open(outfn, 'w') as f:
 
            f.write(out)
 
        with open(reffn) as f:
 
            ref = f.read()
 
        assert ref == out # copy test_dump_html_mails.out.html to test_dump_html_mails.ref.html to update expectations
 
        os.unlink(outfn)
0 comments (0 inline, 0 general)