Changeset - 802fdeefc8cc
[Not reviewed]
default
0 2 1
Mads Kiilerich - 6 years ago 2020-03-12 22:52:22
mads@kiilerich.com
hg: always show and run Mercurial hooks in alphabetical order (Issue #246)

Mercurial will generally run hooks in the order they are found in the
configuration. For entries found in the database, there is no such order.
Instead, always use alphabetical order for these.

Since we now want to order things explicitly in the db query, we want an index
with a composite key. We do that even though we don't really need it for the
few entries in this table, and even though it might/could use the same index as
the existing unique constraint. This composite UniqueConstraint was added in
b9f4b444a172 where it replaced a wrong UniqueConstraint that could/should have
been removed in c25191aadf92. Fix that while touching this area and running a
migration script.
3 files changed with 45 insertions and 4 deletions:
0 comments (0 inline, 0 general)
kallithea/alembic/versions/a0a1bf09c143_db_add_ui_composite_index_and_drop_.py
Show inline comments
 
new file 100644
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""db: add Ui composite index and drop UniqueConstraint on Ui.ui_key
 

	
 
Revision ID: a0a1bf09c143
 
Revises: d7ec25b66e47
 
Create Date: 2020-03-12 22:41:14.421837
 

	
 
"""
 

	
 
# The following opaque hexadecimal identifiers ("revisions") are used
 
# by Alembic to track this migration script and its relations to others.
 
revision = 'a0a1bf09c143'
 
down_revision = 'd7ec25b66e47'
 
branch_labels = None
 
depends_on = None
 

	
 
from alembic import op
 

	
 

	
 
def upgrade():
 
    with op.batch_alter_table('ui', schema=None) as batch_op:
 
        batch_op.create_index('ui_ui_section_ui_key_idx', ['ui_section', 'ui_key'], unique=False)
 
        batch_op.drop_constraint('uq_ui_ui_key', type_='unique')
 

	
 

	
 
def downgrade():
 
    with op.batch_alter_table('ui', schema=None) as batch_op:
 
        batch_op.create_unique_constraint('uq_ui_ui_key', ['ui_key'])
 
        batch_op.drop_index('ui_ui_section_ui_key_idx')
kallithea/lib/utils.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.utils
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Utilities library for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 18, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import datetime
 
import logging
 
import os
 
import re
 
import sys
 
import traceback
 
import urllib.error
 
from distutils.version import StrictVersion
 

	
 
import beaker.cache
 
import mercurial.config
 
import mercurial.ui
 

	
 
import kallithea.config.conf
 
from kallithea.lib.exceptions import InvalidCloneUriException
 
from kallithea.lib.utils2 import ascii_bytes, aslist, get_current_authuser, safe_bytes, safe_str
 
from kallithea.lib.vcs.backends.git.repository import GitRepository
 
from kallithea.lib.vcs.backends.hg.repository import MercurialRepository
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError
 
from kallithea.lib.vcs.utils.fakemod import create_module
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.model import meta
 
from kallithea.model.db import RepoGroup, Repository, Setting, Ui, User, UserGroup, UserLog
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
REMOVED_REPO_PAT = re.compile(r'rm__\d{8}_\d{6}_\d{6}_.*')
 

	
 

	
 
#==============================================================================
 
# PERM DECORATOR HELPERS FOR EXTRACTING NAMES FOR PERM CHECKS
 
#==============================================================================
 
def get_repo_slug(request):
 
    _repo = request.environ['pylons.routes_dict'].get('repo_name')
 
    if _repo:
 
        _repo = _repo.rstrip('/')
 
    return _repo
 

	
 

	
 
def get_repo_group_slug(request):
 
    _group = request.environ['pylons.routes_dict'].get('group_name')
 
    if _group:
 
        _group = _group.rstrip('/')
 
    return _group
 

	
 

	
 
def get_user_group_slug(request):
 
    _group = request.environ['pylons.routes_dict'].get('id')
 
    _group = UserGroup.get(_group)
 
    if _group:
 
        return _group.users_group_name
 
    return None
 

	
 

	
 
def _get_permanent_id(s):
 
    """Helper for decoding stable URLs with repo ID. For a string like '_123'
 
    return 123.
 
    """
 
    by_id_match = re.match(r'^_(\d+)$', s)
 
    if by_id_match is None:
 
        return None
 
    return int(by_id_match.group(1))
 

	
 

	
 
def fix_repo_id_name(path):
 
    """
 
    Rewrite repo_name for _<ID> permanent URLs.
 

	
 
    Given a path, if the first path element is like _<ID>, return the path with
 
    this part expanded to the corresponding full repo name, else return the
 
    provided path.
 
    """
 
    first, rest = path, ''
 
    if '/' in path:
 
        first, rest_ = path.split('/', 1)
 
        rest = '/' + rest_
 
    repo_id = _get_permanent_id(first)
 
    if repo_id is not None:
 
        repo = Repository.get(repo_id)
 
        if repo is not None:
 
            return repo.repo_name + rest
 
    return path
 

	
 

	
 
def action_logger(user, action, repo, ipaddr='', commit=False):
 
    """
 
    Action logger for various actions made by users
 

	
 
    :param user: user that made this action, can be a unique username string or
 
        object containing user_id attribute
 
    :param action: action to log, should be on of predefined unique actions for
 
        easy translations
 
    :param repo: string name of repository or object containing repo_id,
 
        that action was made on
 
    :param ipaddr: optional IP address from what the action was made
 

	
 
    """
 

	
 
    # if we don't get explicit IP address try to get one from registered user
 
    # in tmpl context var
 
    if not ipaddr:
 
        ipaddr = getattr(get_current_authuser(), 'ip_addr', '')
 

	
 
    if getattr(user, 'user_id', None):
 
        user_obj = User.get(user.user_id)
 
    elif isinstance(user, str):
 
        user_obj = User.get_by_username(user)
 
    else:
 
        raise Exception('You have to provide a user object or a username')
 

	
 
    if getattr(repo, 'repo_id', None):
 
        repo_obj = Repository.get(repo.repo_id)
 
        repo_name = repo_obj.repo_name
 
    elif isinstance(repo, str):
 
        repo_name = repo.lstrip('/')
 
        repo_obj = Repository.get_by_repo_name(repo_name)
 
    else:
 
        repo_obj = None
 
        repo_name = ''
 

	
 
    user_log = UserLog()
 
    user_log.user_id = user_obj.user_id
 
    user_log.username = user_obj.username
 
    user_log.action = action
 

	
 
    user_log.repository = repo_obj
 
    user_log.repository_name = repo_name
 

	
 
    user_log.action_date = datetime.datetime.now()
 
    user_log.user_ip = ipaddr
 
    meta.Session().add(user_log)
 

	
 
    log.info('Logging action:%s on %s by user:%s ip:%s',
 
             action, repo, user_obj, ipaddr)
 
    if commit:
 
        meta.Session().commit()
 

	
 

	
 
def get_filesystem_repos(path):
 
    """
 
    Scans given path for repos and return (name,(type,path)) tuple
 

	
 
    :param path: path to scan for repositories
 
    :param recursive: recursive search and return names with subdirs in front
 
    """
 

	
 
    # remove ending slash for better results
 
    path = path.rstrip(os.sep)
 
    log.debug('now scanning in %s', path)
 

	
 
    def isdir(*n):
 
        return os.path.isdir(os.path.join(*n))
 

	
 
    for root, dirs, _files in os.walk(path):
 
        recurse_dirs = []
 
        for subdir in dirs:
 
            # skip removed repos
 
            if REMOVED_REPO_PAT.match(subdir):
 
                continue
 

	
 
            # skip .<something> dirs TODO: rly? then we should prevent creating them ...
 
            if subdir.startswith('.'):
 
                continue
 

	
 
            cur_path = os.path.join(root, subdir)
 
            if isdir(cur_path, '.git'):
 
                log.warning('ignoring non-bare Git repo: %s', cur_path)
 
                continue
 

	
 
            if (isdir(cur_path, '.hg') or
 
                isdir(cur_path, '.svn') or
 
                isdir(cur_path, 'objects') and (isdir(cur_path, 'refs') or
 
                                                os.path.isfile(os.path.join(cur_path, 'packed-refs')))):
 

	
 
                if not os.access(cur_path, os.R_OK) or not os.access(cur_path, os.X_OK):
 
                    log.warning('ignoring repo path without access: %s', cur_path)
 
                    continue
 

	
 
                if not os.access(cur_path, os.W_OK):
 
                    log.warning('repo path without write access: %s', cur_path)
 

	
 
                try:
 
                    scm_info = get_scm(cur_path)
 
                    assert cur_path.startswith(path)
 
                    repo_path = cur_path[len(path) + 1:]
 
                    yield repo_path, scm_info
 
                    continue # no recursion
 
                except VCSError:
 
                    # We should perhaps ignore such broken repos, but especially
 
                    # the bare git detection is unreliable so we dive into it
 
                    pass
 

	
 
            recurse_dirs.append(subdir)
 

	
 
        dirs[:] = recurse_dirs
 

	
 

	
 
def is_valid_repo_uri(repo_type, url, ui):
 
    """Check if the url seems like a valid remote repo location
 
    Raise InvalidCloneUriException if any problems"""
 
    if repo_type == 'hg':
 
        if url.startswith('http') or url.startswith('ssh'):
 
            # initially check if it's at least the proper URL
 
            # or does it pass basic auth
 
            try:
 
                MercurialRepository._check_url(url, ui)
 
            except urllib.error.URLError as e:
 
                raise InvalidCloneUriException('URI %s URLError: %s' % (url, e))
 
        elif url.startswith('svn+http'):
 
            try:
 
                from hgsubversion.svnrepo import svnremoterepo
 
            except ImportError:
 
                raise InvalidCloneUriException('URI type %s not supported - hgsubversion is not available' % (url,))
 
            svnremoterepo(ui, url).svn.uuid
 
        elif url.startswith('git+http'):
 
            raise InvalidCloneUriException('URI type %s not implemented' % (url,))
 
        else:
 
            raise InvalidCloneUriException('URI %s not allowed' % (url,))
 

	
 
    elif repo_type == 'git':
 
        if url.startswith('http') or url.startswith('git'):
 
            # initially check if it's at least the proper URL
 
            # or does it pass basic auth
 
            try:
 
                GitRepository._check_url(url)
 
            except urllib.error.URLError as e:
 
                raise InvalidCloneUriException('URI %s URLError: %s' % (url, e))
 
        elif url.startswith('svn+http'):
 
            raise InvalidCloneUriException('URI type %s not implemented' % (url,))
 
        elif url.startswith('hg+http'):
 
            raise InvalidCloneUriException('URI type %s not implemented' % (url,))
 
        else:
 
            raise InvalidCloneUriException('URI %s not allowed' % (url))
 

	
 

	
 
def is_valid_repo(repo_name, base_path, scm=None):
 
    """
 
    Returns True if given path is a valid repository False otherwise.
 
    If scm param is given also compare if given scm is the same as expected
 
    from scm parameter
 

	
 
    :param repo_name:
 
    :param base_path:
 
    :param scm:
 

	
 
    :return True: if given path is a valid repository
 
    """
 
    # TODO: paranoid security checks?
 
    full_path = os.path.join(base_path, repo_name)
 

	
 
    try:
 
        scm_ = get_scm(full_path)
 
        if scm:
 
            return scm_[0] == scm
 
        return True
 
    except VCSError:
 
        return False
 

	
 

	
 
def is_valid_repo_group(repo_group_name, base_path, skip_path_check=False):
 
    """
 
    Returns True if given path is a repository group False otherwise
 

	
 
    :param repo_name:
 
    :param base_path:
 
    """
 
    full_path = os.path.join(base_path, repo_group_name)
 

	
 
    # check if it's not a repo
 
    if is_valid_repo(repo_group_name, base_path):
 
        return False
 

	
 
    try:
 
        # we need to check bare git repos at higher level
 
        # since we might match branches/hooks/info/objects or possible
 
        # other things inside bare git repo
 
        get_scm(os.path.dirname(full_path))
 
        return False
 
    except VCSError:
 
        pass
 

	
 
    # check if it's a valid path
 
    if skip_path_check or os.path.isdir(full_path):
 
        return True
 

	
 
    return False
 

	
 

	
 
# propagated from mercurial documentation
 
ui_sections = ['alias', 'auth',
 
                'decode/encode', 'defaults',
 
                'diff', 'email',
 
                'extensions', 'format',
 
                'merge-patterns', 'merge-tools',
 
                'hooks', 'http_proxy',
 
                'smtp', 'patch',
 
                'paths', 'profiling',
 
                'server', 'trusted',
 
                'ui', 'web', ]
 

	
 

	
 
def make_ui(repo_path=None):
 
    """
 
    Create an Mercurial 'ui' object based on database Ui settings, possibly
 
    augmenting with content from a hgrc file.
 
    """
 
    baseui = mercurial.ui.ui()
 

	
 
    # clean the baseui object
 
    baseui._ocfg = mercurial.config.config()
 
    baseui._ucfg = mercurial.config.config()
 
    baseui._tcfg = mercurial.config.config()
 

	
 
    sa = meta.Session()
 
    for ui_ in sa.query(Ui).all():
 
    for ui_ in sa.query(Ui).order_by(Ui.ui_section, Ui.ui_key):
 
        if ui_.ui_active:
 
            log.debug('config from db: [%s] %s=%r', ui_.ui_section,
 
                      ui_.ui_key, ui_.ui_value)
 
            baseui.setconfig(ascii_bytes(ui_.ui_section), ascii_bytes(ui_.ui_key),
 
                             b'' if ui_.ui_value is None else safe_bytes(ui_.ui_value))
 

	
 
    # force set push_ssl requirement to False, Kallithea handles that
 
    baseui.setconfig(b'web', b'push_ssl', False)
 
    baseui.setconfig(b'web', b'allow_push', b'*')
 
    # prevent interactive questions for ssh password / passphrase
 
    ssh = baseui.config(b'ui', b'ssh', default=b'ssh')
 
    baseui.setconfig(b'ui', b'ssh', b'%s -oBatchMode=yes -oIdentitiesOnly=yes' % ssh)
 
    # push / pull hooks
 
    baseui.setconfig(b'hooks', b'changegroup.kallithea_log_push_action', b'python:kallithea.lib.hooks.log_push_action')
 
    baseui.setconfig(b'hooks', b'outgoing.kallithea_log_pull_action', b'python:kallithea.lib.hooks.log_pull_action')
 

	
 
    if repo_path is not None:
 
        hgrc_path = os.path.join(repo_path, '.hg', 'hgrc')
 
        if os.path.isfile(hgrc_path):
 
            log.debug('reading hgrc from %s', hgrc_path)
 
            cfg = mercurial.config.config()
 
            cfg.read(safe_bytes(hgrc_path))
 
            for section in ui_sections:
 
                for k, v in cfg.items(section):
 
                    log.debug('config from file: [%s] %s=%s', section, k, v)
 
                    baseui.setconfig(ascii_bytes(section), ascii_bytes(k), safe_bytes(v))
 
        else:
 
            log.debug('hgrc file is not present at %s, skipping...', hgrc_path)
 

	
 
    assert baseui.plain()  # set by hgcompat.monkey_do (invoked from import of vcs.backends.hg) to minimize potential impact of loading config files
 
    return baseui
 

	
 

	
 
def set_app_settings(config):
 
    """
 
    Updates app config with new settings from database
 

	
 
    :param config:
 
    """
 
    hgsettings = Setting.get_app_settings()
 
    for k, v in hgsettings.items():
 
        config[k] = v
 

	
 

	
 
def set_vcs_config(config):
 
    """
 
    Patch VCS config with some Kallithea specific stuff
 

	
 
    :param config: kallithea.CONFIG
 
    """
 
    settings.BACKENDS = {
 
        'hg': 'kallithea.lib.vcs.backends.hg.MercurialRepository',
 
        'git': 'kallithea.lib.vcs.backends.git.GitRepository',
 
    }
 

	
 
    settings.GIT_EXECUTABLE_PATH = config.get('git_path', 'git')
 
    settings.GIT_REV_FILTER = config.get('git_rev_filter', '--all').strip()
 
    settings.DEFAULT_ENCODINGS = aslist(config.get('default_encoding',
 
                                                        'utf-8'), sep=',')
 

	
 

	
 
def set_indexer_config(config):
 
    """
 
    Update Whoosh index mapping
 

	
 
    :param config: kallithea.CONFIG
 
    """
 
    log.debug('adding extra into INDEX_EXTENSIONS')
 
    kallithea.config.conf.INDEX_EXTENSIONS.extend(re.split(r'\s+', config.get('index.extensions', '')))
 

	
 
    log.debug('adding extra into INDEX_FILENAMES')
 
    kallithea.config.conf.INDEX_FILENAMES.extend(re.split(r'\s+', config.get('index.filenames', '')))
 

	
 

	
 
def map_groups(path):
 
    """
 
    Given a full path to a repository, create all nested groups that this
 
    repo is inside. This function creates parent-child relationships between
 
    groups and creates default perms for all new groups.
 

	
 
    :param paths: full path to repository
 
    """
 
    from kallithea.model.repo_group import RepoGroupModel
 
    sa = meta.Session()
 
    groups = path.split(Repository.url_sep())
 
    parent = None
 
    group = None
 

	
 
    # last element is repo in nested groups structure
 
    groups = groups[:-1]
 
    rgm = RepoGroupModel()
 
    owner = User.get_first_admin()
 
    for lvl, group_name in enumerate(groups):
 
        group_name = '/'.join(groups[:lvl] + [group_name])
 
        group = RepoGroup.get_by_group_name(group_name)
 
        desc = '%s group' % group_name
 

	
 
        # skip folders that are now removed repos
 
        if REMOVED_REPO_PAT.match(group_name):
 
            break
 

	
 
        if group is None:
 
            log.debug('creating group level: %s group_name: %s',
 
                      lvl, group_name)
 
            group = RepoGroup(group_name, parent)
 
            group.group_description = desc
 
            group.owner = owner
 
            sa.add(group)
 
            rgm._create_default_perms(group)
 
            sa.flush()
 

	
 
        parent = group
 
    return group
 

	
 

	
 
def repo2db_mapper(initial_repo_dict, remove_obsolete=False,
 
                   install_git_hooks=False, user=None, overwrite_git_hooks=False):
 
    """
 
    maps all repos given in initial_repo_dict, non existing repositories
 
    are created, if remove_obsolete is True it also check for db entries
 
    that are not in initial_repo_dict and removes them.
 

	
 
    :param initial_repo_dict: mapping with repositories found by scanning methods
 
    :param remove_obsolete: check for obsolete entries in database
 
    :param install_git_hooks: if this is True, also check and install git hook
 
        for a repo if missing
 
    :param overwrite_git_hooks: if this is True, overwrite any existing git hooks
 
        that may be encountered (even if user-deployed)
 
    """
 
    from kallithea.model.repo import RepoModel
 
    from kallithea.model.scm import ScmModel
 
    sa = meta.Session()
 
    repo_model = RepoModel()
 
    if user is None:
 
        user = User.get_first_admin()
 
    added = []
 

	
 
    # creation defaults
 
    defs = Setting.get_default_repo_settings(strip_prefix=True)
 
    enable_statistics = defs.get('repo_enable_statistics')
 
    enable_downloads = defs.get('repo_enable_downloads')
 
    private = defs.get('repo_private')
 

	
 
    for name, repo in initial_repo_dict.items():
 
        group = map_groups(name)
 
        db_repo = repo_model.get_by_repo_name(name)
 
        # found repo that is on filesystem not in Kallithea database
 
        if not db_repo:
 
            log.info('repository %s not found, creating now', name)
 
            added.append(name)
 
            desc = (repo.description
 
                    if repo.description != 'unknown'
 
                    else '%s repository' % name)
 

	
 
            new_repo = repo_model._create_repo(
 
                repo_name=name,
 
                repo_type=repo.alias,
 
                description=desc,
 
                repo_group=getattr(group, 'group_id', None),
 
                owner=user,
 
                enable_downloads=enable_downloads,
 
                enable_statistics=enable_statistics,
 
                private=private,
 
                state=Repository.STATE_CREATED
 
            )
 
            sa.commit()
 
            # we added that repo just now, and make sure it has githook
 
            # installed, and updated server info
 
            if new_repo.repo_type == 'git':
 
                git_repo = new_repo.scm_instance
 
                ScmModel().install_git_hooks(git_repo)
 
                # update repository server-info
 
                log.debug('Running update server info')
 
                git_repo._update_server_info()
 
            new_repo.update_changeset_cache()
 
        elif install_git_hooks:
 
            if db_repo.repo_type == 'git':
 
                ScmModel().install_git_hooks(db_repo.scm_instance, force_create=overwrite_git_hooks)
 

	
 
    removed = []
 
    # remove from database those repositories that are not in the filesystem
 
    for repo in sa.query(Repository).all():
 
        if repo.repo_name not in initial_repo_dict:
 
            if remove_obsolete:
 
                log.debug("Removing non-existing repository found in db `%s`",
 
                          repo.repo_name)
 
                try:
 
                    RepoModel().delete(repo, forks='detach', fs_remove=False)
 
                    sa.commit()
 
                except Exception:
 
                    #don't hold further removals on error
 
                    log.error(traceback.format_exc())
 
                    sa.rollback()
 
            removed.append(repo.repo_name)
 
    return added, removed
 

	
 

	
 
def load_rcextensions(root_path):
 
    path = os.path.join(root_path, 'rcextensions', '__init__.py')
 
    if os.path.isfile(path):
 
        rcext = create_module('rc', path)
 
        EXT = kallithea.EXTENSIONS = rcext
 
        log.debug('Found rcextensions now loading %s...', rcext)
 

	
 
        # Additional mappings that are not present in the pygments lexers
 
        kallithea.config.conf.LANGUAGES_EXTENSIONS_MAP.update(getattr(EXT, 'EXTRA_MAPPINGS', {}))
 

	
 
        # OVERRIDE OUR EXTENSIONS FROM RC-EXTENSIONS (if present)
 

	
 
        if getattr(EXT, 'INDEX_EXTENSIONS', []):
 
            log.debug('settings custom INDEX_EXTENSIONS')
 
            kallithea.config.conf.INDEX_EXTENSIONS = getattr(EXT, 'INDEX_EXTENSIONS', [])
 

	
 
        # ADDITIONAL MAPPINGS
 
        log.debug('adding extra into INDEX_EXTENSIONS')
 
        kallithea.config.conf.INDEX_EXTENSIONS.extend(getattr(EXT, 'EXTRA_INDEX_EXTENSIONS', []))
 

	
 
        # auto check if the module is not missing any data, set to default if is
 
        # this will help autoupdate new feature of rcext module
 
        #from kallithea.config import rcextensions
 
        #for k in dir(rcextensions):
 
        #    if not k.startswith('_') and not hasattr(EXT, k):
 
        #        setattr(EXT, k, getattr(rcextensions, k))
 

	
 

	
 
#==============================================================================
 
# MISC
 
#==============================================================================
 

	
 
git_req_ver = StrictVersion('1.7.4')
 

	
 
def check_git_version():
 
    """
 
    Checks what version of git is installed on the system, and raise a system exit
 
    if it's too old for Kallithea to work properly.
 
    """
 
    if 'git' not in kallithea.BACKENDS:
 
        return None
 

	
 
    if not settings.GIT_EXECUTABLE_PATH:
 
        log.warning('No git executable configured - check "git_path" in the ini file.')
 
        return None
 

	
 
    try:
 
        stdout, stderr = GitRepository._run_git_command(['--version'])
 
    except RepositoryError as e:
 
        # message will already have been logged as error
 
        log.warning('No working git executable found - check "git_path" in the ini file.')
 
        return None
 

	
 
    if stderr:
 
        log.warning('Error/stderr from "%s --version":\n%s', settings.GIT_EXECUTABLE_PATH, safe_str(stderr))
 

	
 
    if not stdout:
 
        log.warning('No working git executable found - check "git_path" in the ini file.')
 
        return None
 

	
 
    output = safe_str(stdout).strip()
 
    m = re.search(r"\d+.\d+.\d+", output)
 
    if m:
 
        ver = StrictVersion(m.group(0))
 
        log.debug('Git executable: "%s", version %s (parsed from: "%s")',
 
                  settings.GIT_EXECUTABLE_PATH, ver, output)
 
        if ver < git_req_ver:
 
            log.error('Kallithea detected %s version %s, which is too old '
 
                      'for the system to function properly. '
 
                      'Please upgrade to version %s or later. '
 
                      'If you strictly need Mercurial repositories, you can '
 
                      'clear the "git_path" setting in the ini file.',
 
                      settings.GIT_EXECUTABLE_PATH, ver, git_req_ver)
 
            log.error("Terminating ...")
 
            sys.exit(1)
 
    else:
 
        ver = StrictVersion('0.0.0')
 
        log.warning('Error finding version number in "%s --version" stdout:\n%s',
 
                    settings.GIT_EXECUTABLE_PATH, output)
 

	
 
    return ver
 

	
 

	
 
#===============================================================================
 
# CACHE RELATED METHODS
 
#===============================================================================
 

	
 
def conditional_cache(region, prefix, condition, func):
 
    """
 

	
 
    Conditional caching function use like::
 
        def _c(arg):
 
            #heavy computation function
 
            return data
 

	
 
        # depending from condition the compute is wrapped in cache or not
 
        compute = conditional_cache('short_term', 'cache_desc', condition=True, func=func)
 
        return compute(arg)
 

	
 
    :param region: name of cache region
 
    :param prefix: cache region prefix
 
    :param condition: condition for cache to be triggered, and return data cached
 
    :param func: wrapped heavy function to compute
 

	
 
    """
 
    wrapped = func
 
    if condition:
 
        log.debug('conditional_cache: True, wrapping call of '
 
                  'func: %s into %s region cache' % (region, func))
 
        wrapped = beaker.cache._cache_decorate((prefix,), None, None, region)(func)
 

	
 
    return wrapped
kallithea/model/db.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.db
 
~~~~~~~~~~~~~~~~~~
 

	
 
Database Models for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 08, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import base64
 
import collections
 
import datetime
 
import functools
 
import hashlib
 
import logging
 
import os
 
import time
 
import traceback
 

	
 
import ipaddr
 
import sqlalchemy
 
from beaker.cache import cache_region, region_invalidate
 
from sqlalchemy import Boolean, Column, DateTime, Float, ForeignKey, Index, Integer, LargeBinary, String, Unicode, UnicodeText, UniqueConstraint
 
from sqlalchemy.ext.hybrid import hybrid_property
 
from sqlalchemy.orm import class_mapper, joinedload, relationship, validates
 
from tg.i18n import lazy_ugettext as _
 
from webob.exc import HTTPNotFound
 

	
 
import kallithea
 
from kallithea.lib import ext_json
 
from kallithea.lib.caching_query import FromCache
 
from kallithea.lib.exceptions import DefaultUserException
 
from kallithea.lib.utils2 import (Optional, ascii_bytes, aslist, get_changeset_safe, get_clone_url, remove_prefix, safe_bytes, safe_int, safe_str, str2bool,
 
                                  urlreadable)
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.model.meta import Base, Session
 

	
 

	
 
URL_SEP = '/'
 
log = logging.getLogger(__name__)
 

	
 
#==============================================================================
 
# BASE CLASSES
 
#==============================================================================
 

	
 
def _hash_key(k):
 
    return hashlib.md5(safe_bytes(k)).hexdigest()
 

	
 

	
 
class BaseDbModel(object):
 
    """
 
    Base Model for all classes
 
    """
 

	
 
    @classmethod
 
    def _get_keys(cls):
 
        """return column names for this model """
 
        # Note: not a normal dict - iterator gives "users.firstname", but keys gives "firstname"
 
        return class_mapper(cls).c.keys()
 

	
 
    def get_dict(self):
 
        """
 
        return dict with keys and values corresponding
 
        to this model data """
 

	
 
        d = {}
 
        for k in self._get_keys():
 
            d[k] = getattr(self, k)
 

	
 
        # also use __json__() if present to get additional fields
 
        _json_attr = getattr(self, '__json__', None)
 
        if _json_attr:
 
            # update with attributes from __json__
 
            if callable(_json_attr):
 
                _json_attr = _json_attr()
 
            for k, val in _json_attr.items():
 
                d[k] = val
 
        return d
 

	
 
    def get_appstruct(self):
 
        """return list with keys and values tuples corresponding
 
        to this model data """
 

	
 
        return [
 
            (k, getattr(self, k))
 
            for k in self._get_keys()
 
        ]
 

	
 
    def populate_obj(self, populate_dict):
 
        """populate model with data from given populate_dict"""
 

	
 
        for k in self._get_keys():
 
            if k in populate_dict:
 
                setattr(self, k, populate_dict[k])
 

	
 
    @classmethod
 
    def query(cls):
 
        return Session().query(cls)
 

	
 
    @classmethod
 
    def get(cls, id_):
 
        if id_:
 
            return cls.query().get(id_)
 

	
 
    @classmethod
 
    def guess_instance(cls, value, callback=None):
 
        """Haphazardly attempt to convert `value` to a `cls` instance.
 

	
 
        If `value` is None or already a `cls` instance, return it. If `value`
 
        is a number (or looks like one if you squint just right), assume it's
 
        a database primary key and let SQLAlchemy sort things out. Otherwise,
 
        fall back to resolving it using `callback` (if specified); this could
 
        e.g. be a function that looks up instances by name (though that won't
 
        work if the name begins with a digit). Otherwise, raise Exception.
 
        """
 

	
 
        if value is None:
 
            return None
 
        if isinstance(value, cls):
 
            return value
 
        if isinstance(value, int):
 
            return cls.get(value)
 
        if isinstance(value, str) and value.isdigit():
 
            return cls.get(int(value))
 
        if callback is not None:
 
            return callback(value)
 

	
 
        raise Exception(
 
            'given object must be int, long or Instance of %s '
 
            'got %s, no callback provided' % (cls, type(value))
 
        )
 

	
 
    @classmethod
 
    def get_or_404(cls, id_):
 
        try:
 
            id_ = int(id_)
 
        except (TypeError, ValueError):
 
            raise HTTPNotFound
 

	
 
        res = cls.query().get(id_)
 
        if res is None:
 
            raise HTTPNotFound
 
        return res
 

	
 
    @classmethod
 
    def delete(cls, id_):
 
        obj = cls.query().get(id_)
 
        Session().delete(obj)
 

	
 
    def __repr__(self):
 
        return '<DB:%s>' % (self.__class__.__name__)
 

	
 

	
 
_table_args_default_dict = {'extend_existing': True,
 
                            'mysql_engine': 'InnoDB',
 
                            'mysql_charset': 'utf8',
 
                            'sqlite_autoincrement': True,
 
                           }
 

	
 
class Setting(Base, BaseDbModel):
 
    __tablename__ = 'settings'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    SETTINGS_TYPES = {
 
        'str': safe_bytes,
 
        'int': safe_int,
 
        'unicode': safe_str,
 
        'bool': str2bool,
 
        'list': functools.partial(aslist, sep=',')
 
    }
 
    DEFAULT_UPDATE_URL = ''
 

	
 
    app_settings_id = Column(Integer(), primary_key=True)
 
    app_settings_name = Column(String(255), nullable=False, unique=True)
 
    _app_settings_value = Column("app_settings_value", Unicode(4096), nullable=False)
 
    _app_settings_type = Column("app_settings_type", String(255), nullable=True) # FIXME: not nullable?
 

	
 
    def __init__(self, key='', val='', type='unicode'):
 
        self.app_settings_name = key
 
        self.app_settings_value = val
 
        self.app_settings_type = type
 

	
 
    @validates('_app_settings_value')
 
    def validate_settings_value(self, key, val):
 
        assert isinstance(val, str)
 
        return val
 

	
 
    @hybrid_property
 
    def app_settings_value(self):
 
        v = self._app_settings_value
 
        _type = self.app_settings_type
 
        converter = self.SETTINGS_TYPES.get(_type) or self.SETTINGS_TYPES['unicode']
 
        return converter(v)
 

	
 
    @app_settings_value.setter
 
    def app_settings_value(self, val):
 
        """
 
        Setter that will always make sure we use str in app_settings_value
 
        """
 
        self._app_settings_value = safe_str(val)
 

	
 
    @hybrid_property
 
    def app_settings_type(self):
 
        return self._app_settings_type
 

	
 
    @app_settings_type.setter
 
    def app_settings_type(self, val):
 
        if val not in self.SETTINGS_TYPES:
 
            raise Exception('type must be one of %s got %s'
 
                            % (list(self.SETTINGS_TYPES), val))
 
        self._app_settings_type = val
 

	
 
    def __repr__(self):
 
        return "<%s %s.%s=%r>" % (
 
            self.__class__.__name__,
 
            self.app_settings_name, self.app_settings_type, self.app_settings_value
 
        )
 

	
 
    @classmethod
 
    def get_by_name(cls, key):
 
        return cls.query() \
 
            .filter(cls.app_settings_name == key).scalar()
 

	
 
    @classmethod
 
    def get_by_name_or_create(cls, key, val='', type='unicode'):
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            res = cls(key, val, type)
 
        return res
 

	
 
    @classmethod
 
    def create_or_update(cls, key, val=Optional(''), type=Optional('unicode')):
 
        """
 
        Creates or updates Kallithea setting. If updates are triggered, it will only
 
        update parameters that are explicitly set. Optional instance will be skipped.
 

	
 
        :param key:
 
        :param val:
 
        :param type:
 
        :return:
 
        """
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            val = Optional.extract(val)
 
            type = Optional.extract(type)
 
            res = cls(key, val, type)
 
            Session().add(res)
 
        else:
 
            res.app_settings_name = key
 
            if not isinstance(val, Optional):
 
                # update if set
 
                res.app_settings_value = val
 
            if not isinstance(type, Optional):
 
                # update if set
 
                res.app_settings_type = type
 
        return res
 

	
 
    @classmethod
 
    def get_app_settings(cls, cache=False):
 

	
 
        ret = cls.query()
 

	
 
        if cache:
 
            ret = ret.options(FromCache("sql_cache_short", "get_hg_settings"))
 

	
 
        if ret is None:
 
            raise Exception('Could not get application settings !')
 
        settings = {}
 
        for each in ret:
 
            settings[each.app_settings_name] = \
 
                each.app_settings_value
 

	
 
        return settings
 

	
 
    @classmethod
 
    def get_auth_settings(cls, cache=False):
 
        ret = cls.query() \
 
                .filter(cls.app_settings_name.startswith('auth_')).all()
 
        fd = {}
 
        for row in ret:
 
            fd[row.app_settings_name] = row.app_settings_value
 
        return fd
 

	
 
    @classmethod
 
    def get_default_repo_settings(cls, cache=False, strip_prefix=False):
 
        ret = cls.query() \
 
                .filter(cls.app_settings_name.startswith('default_')).all()
 
        fd = {}
 
        for row in ret:
 
            key = row.app_settings_name
 
            if strip_prefix:
 
                key = remove_prefix(key, prefix='default_')
 
            fd.update({key: row.app_settings_value})
 

	
 
        return fd
 

	
 
    @classmethod
 
    def get_server_info(cls):
 
        import pkg_resources
 
        import platform
 
        from kallithea.lib.utils import check_git_version
 
        mods = [(p.project_name, p.version) for p in pkg_resources.working_set]
 
        info = {
 
            'modules': sorted(mods, key=lambda k: k[0].lower()),
 
            'py_version': platform.python_version(),
 
            'platform': platform.platform(),
 
            'kallithea_version': kallithea.__version__,
 
            'git_version': str(check_git_version()),
 
            'git_path': kallithea.CONFIG.get('git_path')
 
        }
 
        return info
 

	
 

	
 
class Ui(Base, BaseDbModel):
 
    __tablename__ = 'ui'
 
    __table_args__ = (
 
        # FIXME: ui_key as key is wrong and should be removed when the corresponding
 
        # Ui.get_by_key has been replaced by the composite key
 
        UniqueConstraint('ui_key'),
 
        Index('ui_ui_section_ui_key_idx', 'ui_section', 'ui_key'),
 
        UniqueConstraint('ui_section', 'ui_key'),
 
        _table_args_default_dict,
 
    )
 

	
 
    HOOK_UPDATE = 'changegroup.update'
 
    HOOK_REPO_SIZE = 'changegroup.repo_size'
 

	
 
    ui_id = Column(Integer(), primary_key=True)
 
    ui_section = Column(String(255), nullable=False)
 
    ui_key = Column(String(255), nullable=False)
 
    ui_value = Column(String(255), nullable=True) # FIXME: not nullable?
 
    ui_active = Column(Boolean(), nullable=False, default=True)
 

	
 
    @classmethod
 
    def get_by_key(cls, section, key):
 
        """ Return specified Ui object, or None if not found. """
 
        return cls.query().filter_by(ui_section=section, ui_key=key).scalar()
 

	
 
    @classmethod
 
    def get_or_create(cls, section, key):
 
        """ Return specified Ui object, creating it if necessary. """
 
        setting = cls.get_by_key(section, key)
 
        if setting is None:
 
            setting = cls(ui_section=section, ui_key=key)
 
            Session().add(setting)
 
        return setting
 

	
 
    @classmethod
 
    def get_builtin_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE]))
 
        q = q.filter(cls.ui_section == 'hooks')
 
        q = q.order_by(cls.ui_section, cls.ui_key)
 
        return q.all()
 

	
 
    @classmethod
 
    def get_custom_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE]))
 
        q = q.filter(cls.ui_section == 'hooks')
 
        q = q.order_by(cls.ui_section, cls.ui_key)
 
        return q.all()
 

	
 
    @classmethod
 
    def get_repos_location(cls):
 
        return cls.get_by_key('paths', '/').ui_value
 

	
 
    @classmethod
 
    def create_or_update_hook(cls, key, val):
 
        new_ui = cls.get_or_create('hooks', key)
 
        new_ui.ui_active = True
 
        new_ui.ui_value = val
 

	
 
    def __repr__(self):
 
        return '<%s %s.%s=%r>' % (
 
            self.__class__.__name__,
 
            self.ui_section, self.ui_key, self.ui_value)
 

	
 

	
 
class User(Base, BaseDbModel):
 
    __tablename__ = 'users'
 
    __table_args__ = (
 
        Index('u_username_idx', 'username'),
 
        Index('u_email_idx', 'email'),
 
        _table_args_default_dict,
 
    )
 

	
 
    DEFAULT_USER = 'default'
 
    DEFAULT_GRAVATAR_URL = 'https://secure.gravatar.com/avatar/{md5email}?d=identicon&s={size}'
 
    # The name of the default auth type in extern_type, 'internal' lives in auth_internal.py
 
    DEFAULT_AUTH_TYPE = 'internal'
 

	
 
    user_id = Column(Integer(), primary_key=True)
 
    username = Column(String(255), nullable=False, unique=True)
 
    password = Column(String(255), nullable=False)
 
    active = Column(Boolean(), nullable=False, default=True)
 
    admin = Column(Boolean(), nullable=False, default=False)
 
    name = Column("firstname", Unicode(255), nullable=False)
 
    lastname = Column(Unicode(255), nullable=False)
 
    _email = Column("email", String(255), nullable=True, unique=True) # FIXME: not nullable?
 
    last_login = Column(DateTime(timezone=False), nullable=True)
 
    extern_type = Column(String(255), nullable=True) # FIXME: not nullable?
 
    extern_name = Column(String(255), nullable=True) # FIXME: not nullable?
 
    api_key = Column(String(255), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _user_data = Column("user_data", LargeBinary(), nullable=True)  # JSON data # FIXME: not nullable?
 

	
 
    user_log = relationship('UserLog')
 
    user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
 

	
 
    repositories = relationship('Repository')
 
    repo_groups = relationship('RepoGroup')
 
    user_groups = relationship('UserGroup')
 
    user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
 
    followings = relationship('UserFollowing', primaryjoin='UserFollowing.user_id==User.user_id', cascade='all')
 

	
 
    repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
 
    repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
 

	
 
    group_member = relationship('UserGroupMember', cascade='all')
 

	
 
    # comments created by this user
 
    user_comments = relationship('ChangesetComment', cascade='all')
 
    # extra emails for this user
 
    user_emails = relationship('UserEmailMap', cascade='all')
 
    # extra API keys
 
    user_api_keys = relationship('UserApiKeys', cascade='all')
 
    ssh_keys = relationship('UserSshKeys', cascade='all')
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 
    @property
 
    def firstname(self):
 
        # alias for future
 
        return self.name
 

	
 
    @property
 
    def emails(self):
 
        other = UserEmailMap.query().filter(UserEmailMap.user == self).all()
 
        return [self.email] + [x.email for x in other]
 

	
 
    @property
 
    def api_keys(self):
 
        other = UserApiKeys.query().filter(UserApiKeys.user == self).all()
 
        return [self.api_key] + [x.api_key for x in other]
 

	
 
    @property
 
    def ip_addresses(self):
 
        ret = UserIpMap.query().filter(UserIpMap.user == self).all()
 
        return [x.ip_addr for x in ret]
 

	
 
    @property
 
    def full_name(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def full_name_or_username(self):
 
        """
 
        Show full name.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s' % (self.firstname, self.lastname)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_name_and_username(self):
 
        """
 
        Show full name and username as 'Firstname Lastname (username)'.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s (%s)' % (self.firstname, self.lastname, self.username)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_contact(self):
 
        return '%s %s <%s>' % (self.firstname, self.lastname, self.email)
 

	
 
    @property
 
    def short_contact(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def is_admin(self):
 
        return self.admin
 

	
 
    @hybrid_property
 
    def is_default_user(self):
 
        return self.username == User.DEFAULT_USER
 

	
 
    @hybrid_property
 
    def user_data(self):
 
        if not self._user_data:
 
            return {}
 

	
 
        try:
 
            return ext_json.loads(self._user_data)
 
        except TypeError:
 
            return {}
 

	
 
    @user_data.setter
 
    def user_data(self, val):
 
        try:
 
            self._user_data = ascii_bytes(ext_json.dumps(val))
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __repr__(self):
 
        return "<%s %s: %r')>" % (self.__class__.__name__, self.user_id, self.username)
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(User, cls).guess_instance(value, User.get_by_username)
 

	
 
    @classmethod
 
    def get_or_404(cls, id_, allow_default=True):
 
        '''
 
        Overridden version of BaseDbModel.get_or_404, with an extra check on
 
        the default user.
 
        '''
 
        user = super(User, cls).get_or_404(id_)
 
        if not allow_default and user.is_default_user:
 
            raise DefaultUserException()
 
        return user
 

	
 
    @classmethod
 
    def get_by_username_or_email(cls, username_or_email, case_insensitive=False, cache=False):
 
        """
 
        For anything that looks like an email address, look up by the email address (matching
 
        case insensitively).
 
        For anything else, try to look up by the user name.
 

	
 
        This assumes no normal username can have '@' symbol.
 
        """
 
        if '@' in username_or_email:
 
            return User.get_by_email(username_or_email, cache=cache)
 
        else:
 
            return User.get_by_username(username_or_email, case_insensitive=case_insensitive, cache=cache)
 

	
 
    @classmethod
 
    def get_by_username(cls, username, case_insensitive=False, cache=False):
 
        if case_insensitive:
 
            q = cls.query().filter(sqlalchemy.func.lower(cls.username) == sqlalchemy.func.lower(username))
 
        else:
 
            q = cls.query().filter(cls.username == username)
 

	
 
        if cache:
 
            q = q.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_user_%s" % _hash_key(username)
 
                          )
 
            )
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get_by_api_key(cls, api_key, cache=False, fallback=True):
 
        if len(api_key) != 40 or not api_key.isalnum():
 
            return None
 

	
 
        q = cls.query().filter(cls.api_key == api_key)
 

	
 
        if cache:
 
            q = q.options(FromCache("sql_cache_short",
 
                                    "get_api_key_%s" % api_key))
 
        res = q.scalar()
 

	
 
        if fallback and not res:
 
            # fallback to additional keys
 
            _res = UserApiKeys.query().filter_by(api_key=api_key, is_expired=False).first()
 
            if _res:
 
                res = _res.user
 
        if res is None or not res.active or res.is_default_user:
 
            return None
 
        return res
 

	
 
    @classmethod
 
    def get_by_email(cls, email, cache=False):
 
        q = cls.query().filter(sqlalchemy.func.lower(cls.email) == sqlalchemy.func.lower(email))
 

	
 
        if cache:
 
            q = q.options(FromCache("sql_cache_short",
 
                                    "get_email_key_%s" % email))
 

	
 
        ret = q.scalar()
 
        if ret is None:
 
            q = UserEmailMap.query()
 
            # try fetching in alternate email map
 
            q = q.filter(sqlalchemy.func.lower(UserEmailMap.email) == sqlalchemy.func.lower(email))
 
            q = q.options(joinedload(UserEmailMap.user))
 
            if cache:
 
                q = q.options(FromCache("sql_cache_short",
 
                                        "get_email_map_key_%s" % email))
 
            ret = getattr(q.scalar(), 'user', None)
 

	
 
        return ret
 

	
 
    @classmethod
 
    def get_from_cs_author(cls, author):
 
        """
 
        Tries to get User objects out of commit author string
 

	
 
        :param author:
 
        """
 
        from kallithea.lib.helpers import email, author_name
 
        # Valid email in the attribute passed, see if they're in the system
 
        _email = email(author)
 
        if _email:
 
            user = cls.get_by_email(_email)
 
            if user is not None:
 
                return user
 
        # Maybe we can match by username?
 
        _author = author_name(author)
 
        user = cls.get_by_username(_author, case_insensitive=True)
 
        if user is not None:
 
            return user
 

	
 
    def update_lastlogin(self):
 
        """Update user lastlogin"""
 
        self.last_login = datetime.datetime.now()
 
        log.debug('updated user %s lastlogin', self.username)
 

	
 
    @classmethod
 
    def get_first_admin(cls):
 
        user = User.query().filter(User.admin == True).first()
 
        if user is None:
 
            raise Exception('Missing administrative account!')
 
        return user
 

	
 
    @classmethod
 
    def get_default_user(cls, cache=False):
 
        user = User.get_by_username(User.DEFAULT_USER, cache=cache)
 
        if user is None:
 
            raise Exception('Missing default account!')
 
        return user
 

	
 
    def get_api_data(self, details=False):
 
        """
 
        Common function for generating user related data for API
 
        """
 
        user = self
 
        data = dict(
 
            user_id=user.user_id,
 
            username=user.username,
 
            firstname=user.name,
 
            lastname=user.lastname,
 
            email=user.email,
 
            emails=user.emails,
 
            active=user.active,
 
            admin=user.admin,
 
        )
 
        if details:
 
            data.update(dict(
 
                extern_type=user.extern_type,
 
                extern_name=user.extern_name,
 
                api_key=user.api_key,
 
                api_keys=user.api_keys,
 
                last_login=user.last_login,
 
                ip_addresses=user.ip_addresses
 
                ))
 
        return data
 

	
 
    def __json__(self):
 
        data = dict(
 
            full_name=self.full_name,
 
            full_name_or_username=self.full_name_or_username,
 
            short_contact=self.short_contact,
 
            full_contact=self.full_contact
 
        )
 
        data.update(self.get_api_data())
 
        return data
 

	
 

	
 
class UserApiKeys(Base, BaseDbModel):
 
    __tablename__ = 'user_api_keys'
 
    __table_args__ = (
 
        Index('uak_api_key_idx', 'api_key'),
 
        Index('uak_api_key_expires_idx', 'api_key', 'expires'),
 
        _table_args_default_dict,
 
    )
 

	
 
    user_api_key_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    api_key = Column(String(255), nullable=False, unique=True)
 
    description = Column(UnicodeText(), nullable=False)
 
    expires = Column(Float(53), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    user = relationship('User')
 

	
 
    @hybrid_property
 
    def is_expired(self):
 
        return (self.expires != -1) & (time.time() > self.expires)
 

	
 

	
 
class UserEmailMap(Base, BaseDbModel):
 
    __tablename__ = 'user_email_map'
 
    __table_args__ = (
 
        Index('uem_email_idx', 'email'),
 
        _table_args_default_dict,
 
    )
 

	
 
    email_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    _email = Column("email", String(255), nullable=False, unique=True)
 
    user = relationship('User')
 

	
 
    @validates('_email')
 
    def validate_email(self, key, email):
 
        # check if this email is not main one
 
        main_email = Session().query(User).filter(User.email == email).scalar()
 
        if main_email is not None:
 
            raise AttributeError('email %s is present is user table' % email)
 
        return email
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 

	
 
class UserIpMap(Base, BaseDbModel):
 
    __tablename__ = 'user_ip_map'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'ip_addr'),
 
        _table_args_default_dict,
 
    )
 

	
 
    ip_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    ip_addr = Column(String(255), nullable=False)
 
    active = Column(Boolean(), nullable=False, default=True)
 
    user = relationship('User')
 

	
 
    @classmethod
 
    def _get_ip_range(cls, ip_addr):
 
        net = ipaddr.IPNetwork(address=ip_addr)
 
        return [str(net.network), str(net.broadcast)]
0 comments (0 inline, 0 general)