Changeset - d5c7930e3d5a
[Not reviewed]
default
0 3 0
Mads Kiilerich - 6 years ago 2020-03-30 16:44:10
mads@kiilerich.com
db: consistently use base_path from config instead of repeatedly getting from the database

Avoid using the inefficient Repository.base_path ... and avoid even more the
misleading Repository.repo_path .
3 files changed with 15 insertions and 44 deletions:
0 comments (0 inline, 0 general)
kallithea/bin/kallithea_cli_repo.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
This file was forked by the Kallithea project in July 2014 and later moved.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Feb 9, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 
import datetime
 
import os
 
import re
 
import shutil
 

	
 
import click
 

	
 
import kallithea
 
import kallithea.bin.kallithea_cli_base as cli_base
 
from kallithea.lib.utils import REMOVED_REPO_PAT, repo2db_mapper
 
from kallithea.lib.utils2 import ask_ok
 
from kallithea.model.db import Repository, Ui
 
from kallithea.model.db import Repository
 
from kallithea.model.meta import Session
 
from kallithea.model.scm import ScmModel
 

	
 

	
 
@cli_base.register_command(config_file_initialize_app=True)
 
@click.option('--remove-missing', is_flag=True,
 
        help='Remove missing repositories from the Kallithea database.')
 
def repo_scan(remove_missing):
 
    """Scan filesystem for repositories.
 

	
 
    Search the configured repository root for new repositories and add them
 
    into Kallithea.
 
    Additionally, report repositories that were previously known to Kallithea
 
    but are no longer present on the filesystem. If option --remove-missing is
 
    given, remove the missing repositories from the Kallithea database.
 
    """
 
    click.echo('Now scanning root location for new repos ...')
 
    added, removed = repo2db_mapper(ScmModel().repo_scan(),
 
                                    remove_obsolete=remove_missing)
 
    click.echo('Scan completed.')
 
    if added:
 
        click.echo('Added: %s' % ', '.join(added))
 
    if removed:
 
        click.echo('%s: %s' % ('Removed' if remove_missing else 'Missing',
 
                          ', '.join(removed)))
 

	
 
@cli_base.register_command(config_file_initialize_app=True)
 
@click.argument('repositories', nargs=-1)
 
def repo_update_metadata(repositories):
 
    """
 
    Update repository metadata in database from repository content.
 

	
 
    In normal operation, Kallithea will keep caches up-to-date
 
    automatically. However, if repositories are externally modified, e.g. by
 
    a direct push via the filesystem rather than via a Kallithea URL,
 
    Kallithea is not aware of it. In this case, you should manually run this
 
    command to update the repository cache.
 

	
 
    If no repositories are specified, the caches of all repositories are
 
    updated.
 
    """
 
    if not repositories:
 
        repo_list = Repository.query().all()
 
    else:
 
        repo_names = [n.strip() for n in repositories]
 
        repo_list = list(Repository.query()
 
                        .filter(Repository.repo_name.in_(repo_names)))
 

	
 
    for repo in repo_list:
 
        # update latest revision metadata in database
 
        repo.update_changeset_cache()
 
        # invalidate in-memory VCS object cache... will be repopulated on
 
        # first access
 
        repo.set_invalidate()
 

	
 
    Session().commit()
 

	
 
    click.echo('Updated database with information about latest change in the following %s repositories:' % (len(repo_list)))
 
    click.echo('\n'.join(repo.repo_name for repo in repo_list))
 

	
 
@cli_base.register_command(config_file_initialize_app=True)
 
@click.option('--ask/--no-ask', default=True, help='Ask for confirmation or not. Default is --ask.')
 
@click.option('--older-than',
 
        help="""Only purge repositories that have been removed at least the given time ago.
 
        For example, '--older-than=30d' purges repositories deleted 30 days ago or longer.
 
        Possible suffixes: d (days), h (hours), m (minutes), s (seconds).""")
 
def repo_purge_deleted(ask, older_than):
 
    """Purge backups of deleted repositories.
 

	
 
    When a repository is deleted via the Kallithea web interface, the actual
 
    data is still present on the filesystem but set aside using a special name.
 
    This command allows to delete these files permanently.
 
    """
 
    def _parse_older_than(val):
 
        regex = re.compile(r'((?P<days>\d+?)d)?((?P<hours>\d+?)h)?((?P<minutes>\d+?)m)?((?P<seconds>\d+?)s)?')
 
        parts = regex.match(val)
 
        if not parts:
 
            return
 
        parts = parts.groupdict()
 
        time_params = {}
 
        for name, param in parts.items():
 
            if param:
 
                time_params[name] = int(param)
 
        return datetime.timedelta(**time_params)
 

	
 
    def _extract_date(name):
 
        """
 
        Extract the date part from rm__<date> pattern of removed repos,
 
        and convert it to datetime object
 

	
 
        :param name:
 
        """
 
        date_part = name[4:19]  # 4:19 since we don't parse milliseconds
 
        return datetime.datetime.strptime(date_part, '%Y%m%d_%H%M%S')
 

	
 
    repos_location = Ui.get_repos_location()
 
    repos_location = kallithea.CONFIG['base_path']
 
    to_remove = []
 
    for dn_, dirs, f in os.walk(repos_location):
 
        alldirs = list(dirs)
 
        del dirs[:]
 
        if ('.hg' in alldirs or
 
            '.git' in alldirs or
 
            '.svn' in alldirs or
 
            'objects' in alldirs and ('refs' in alldirs or 'packed-refs' in f)
 
        ):
 
            continue
 
        for loc in alldirs:
 
            if REMOVED_REPO_PAT.match(loc):
 
                to_remove.append([os.path.join(dn_, loc),
 
                                  _extract_date(loc)])
 
            else:
 
                dirs.append(loc)
 
        if dirs:
 
            click.echo('Scanning: %s' % dn_)
 

	
 
    if not to_remove:
 
        click.echo('There are no deleted repositories.')
 
        return
 

	
 
    # filter older than (if present)!
 
    if older_than:
 
        now = datetime.datetime.now()
 
        to_remove_filtered = []
 
        older_than_date = _parse_older_than(older_than)
 
        for name, date_ in to_remove:
 
            repo_age = now - date_
 
            if repo_age > older_than_date:
 
                to_remove_filtered.append([name, date_])
 

	
 
        to_remove = to_remove_filtered
 

	
 
        if not to_remove:
 
            click.echo('There are no deleted repositories older than %s (%s)'
 
                    % (older_than, older_than_date))
 
            return
 

	
 
        click.echo('Considering %s deleted repositories older than %s (%s).'
 
            % (len(to_remove), older_than, older_than_date))
 
    else:
 
        click.echo('Considering %s deleted repositories.' % len(to_remove))
 

	
 
    if not ask:
 
        remove = True
 
    else:
 
        remove = ask_ok('The following repositories will be removed completely:\n%s\n'
 
            'Do you want to proceed? [y/n] ' %
 
            '\n'.join('%s deleted on %s' % (path, date_) for path, date_ in to_remove))
 

	
 
    if remove:
 
        for path, date_ in to_remove:
 
            click.echo('Purging repository %s' % path)
 
            shutil.rmtree(path)
 
    else:
 
        click.echo('Nothing done, exiting...')
kallithea/lib/celerylib/tasks.py
Show inline comments
 
@@ -321,178 +321,178 @@ def send_email(recipients, subject, body
 

	
 

	
 
@celerylib.task
 
@celerylib.dbsession
 
def create_repo(form_data, cur_user):
 
    from kallithea.model.repo import RepoModel
 
    from kallithea.model.db import Setting
 

	
 
    DBS = celerylib.get_session()
 

	
 
    cur_user = User.guess_instance(cur_user)
 

	
 
    owner = cur_user
 
    repo_name = form_data['repo_name']
 
    repo_name_full = form_data['repo_name_full']
 
    repo_type = form_data['repo_type']
 
    description = form_data['repo_description']
 
    private = form_data['repo_private']
 
    clone_uri = form_data.get('clone_uri')
 
    repo_group = form_data['repo_group']
 
    landing_rev = form_data['repo_landing_rev']
 
    copy_fork_permissions = form_data.get('copy_permissions')
 
    copy_group_permissions = form_data.get('repo_copy_permissions')
 
    fork_of = form_data.get('fork_parent_id')
 
    state = form_data.get('repo_state', Repository.STATE_PENDING)
 

	
 
    # repo creation defaults, private and repo_type are filled in form
 
    defs = Setting.get_default_repo_settings(strip_prefix=True)
 
    enable_statistics = defs.get('repo_enable_statistics')
 
    enable_downloads = defs.get('repo_enable_downloads')
 

	
 
    try:
 
        repo = RepoModel()._create_repo(
 
            repo_name=repo_name_full,
 
            repo_type=repo_type,
 
            description=description,
 
            owner=owner,
 
            private=private,
 
            clone_uri=clone_uri,
 
            repo_group=repo_group,
 
            landing_rev=landing_rev,
 
            fork_of=fork_of,
 
            copy_fork_permissions=copy_fork_permissions,
 
            copy_group_permissions=copy_group_permissions,
 
            enable_statistics=enable_statistics,
 
            enable_downloads=enable_downloads,
 
            state=state
 
        )
 

	
 
        action_logger(cur_user, 'user_created_repo',
 
                      form_data['repo_name_full'], '')
 

	
 
        DBS.commit()
 
        # now create this repo on Filesystem
 
        RepoModel()._create_filesystem_repo(
 
            repo_name=repo_name,
 
            repo_type=repo_type,
 
            repo_group=RepoGroup.guess_instance(repo_group),
 
            clone_uri=clone_uri,
 
        )
 
        repo = Repository.get_by_repo_name(repo_name_full)
 
        log_create_repository(repo.get_dict(), created_by=owner.username)
 

	
 
        # update repo changeset caches initially
 
        repo.update_changeset_cache()
 

	
 
        # set new created state
 
        repo.set_state(Repository.STATE_CREATED)
 
        DBS.commit()
 
    except Exception as e:
 
        log.warning('Exception %s occurred when forking repository, '
 
                    'doing cleanup...' % e)
 
        # rollback things manually !
 
        repo = Repository.get_by_repo_name(repo_name_full)
 
        if repo:
 
            Repository.delete(repo.repo_id)
 
            DBS.commit()
 
            RepoModel()._delete_filesystem_repo(repo)
 
        raise
 

	
 
    return True
 

	
 

	
 
@celerylib.task
 
@celerylib.dbsession
 
def create_repo_fork(form_data, cur_user):
 
    """
 
    Creates a fork of repository using interval VCS methods
 

	
 
    :param form_data:
 
    :param cur_user:
 
    """
 
    from kallithea.model.repo import RepoModel
 

	
 
    DBS = celerylib.get_session()
 

	
 
    base_path = Repository.base_path()
 
    base_path = kallithea.CONFIG['base_path']
 
    cur_user = User.guess_instance(cur_user)
 

	
 
    repo_name = form_data['repo_name']  # fork in this case
 
    repo_name_full = form_data['repo_name_full']
 

	
 
    repo_type = form_data['repo_type']
 
    owner = cur_user
 
    private = form_data['private']
 
    clone_uri = form_data.get('clone_uri')
 
    repo_group = form_data['repo_group']
 
    landing_rev = form_data['landing_rev']
 
    copy_fork_permissions = form_data.get('copy_permissions')
 

	
 
    try:
 
        fork_of = Repository.guess_instance(form_data.get('fork_parent_id'))
 

	
 
        RepoModel()._create_repo(
 
            repo_name=repo_name_full,
 
            repo_type=repo_type,
 
            description=form_data['description'],
 
            owner=owner,
 
            private=private,
 
            clone_uri=clone_uri,
 
            repo_group=repo_group,
 
            landing_rev=landing_rev,
 
            fork_of=fork_of,
 
            copy_fork_permissions=copy_fork_permissions
 
        )
 
        action_logger(cur_user, 'user_forked_repo:%s' % repo_name_full,
 
                      fork_of.repo_name, '')
 
        DBS.commit()
 

	
 
        source_repo_path = os.path.join(base_path, fork_of.repo_name)
 

	
 
        # now create this repo on Filesystem
 
        RepoModel()._create_filesystem_repo(
 
            repo_name=repo_name,
 
            repo_type=repo_type,
 
            repo_group=RepoGroup.guess_instance(repo_group),
 
            clone_uri=source_repo_path,
 
        )
 
        repo = Repository.get_by_repo_name(repo_name_full)
 
        log_create_repository(repo.get_dict(), created_by=owner.username)
 

	
 
        # update repo changeset caches initially
 
        repo.update_changeset_cache()
 

	
 
        # set new created state
 
        repo.set_state(Repository.STATE_CREATED)
 
        DBS.commit()
 
    except Exception as e:
 
        log.warning('Exception %s occurred when forking repository, '
 
                    'doing cleanup...' % e)
 
        # rollback things manually !
 
        repo = Repository.get_by_repo_name(repo_name_full)
 
        if repo:
 
            Repository.delete(repo.repo_id)
 
            DBS.commit()
 
            RepoModel()._delete_filesystem_repo(repo)
 
        raise
 

	
 
    return True
 

	
 

	
 
def __get_codes_stats(repo_name):
 
    from kallithea.config.conf import LANGUAGES_EXTENSIONS_MAP
 
    repo = Repository.get_by_repo_name(repo_name).scm_instance
 

	
 
    tip = repo.get_changeset()
 
    code_stats = {}
 

	
 
    for _topnode, _dirnodes, filenodes in tip.walk('/'):
 
        for filenode in filenodes:
 
            ext = filenode.extension.lower()
 
            if ext in LANGUAGES_EXTENSIONS_MAP and not filenode.is_binary:
 
                if ext in code_stats:
 
                    code_stats[ext] += 1
 
                else:
 
                    code_stats[ext] = 1
 

	
 
    return code_stats or {}
kallithea/model/db.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.db
 
~~~~~~~~~~~~~~~~~~
 

	
 
Database Models for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 08, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import base64
 
import collections
 
import datetime
 
import functools
 
import hashlib
 
import logging
 
import os
 
import time
 
import traceback
 

	
 
import ipaddr
 
import sqlalchemy
 
from beaker.cache import cache_region, region_invalidate
 
from sqlalchemy import Boolean, Column, DateTime, Float, ForeignKey, Index, Integer, LargeBinary, String, Unicode, UnicodeText, UniqueConstraint
 
from sqlalchemy.ext.hybrid import hybrid_property
 
from sqlalchemy.orm import class_mapper, joinedload, relationship, validates
 
from tg.i18n import lazy_ugettext as _
 
from webob.exc import HTTPNotFound
 

	
 
import kallithea
 
from kallithea.lib import ext_json
 
from kallithea.lib.caching_query import FromCache
 
from kallithea.lib.exceptions import DefaultUserException
 
from kallithea.lib.utils2 import (Optional, ascii_bytes, aslist, get_changeset_safe, get_clone_url, remove_prefix, safe_bytes, safe_int, safe_str, str2bool,
 
                                  urlreadable)
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.model.meta import Base, Session
 

	
 

	
 
URL_SEP = '/'
 
log = logging.getLogger(__name__)
 

	
 
#==============================================================================
 
# BASE CLASSES
 
#==============================================================================
 

	
 
def _hash_key(k):
 
    return hashlib.md5(safe_bytes(k)).hexdigest()
 

	
 

	
 
class BaseDbModel(object):
 
    """
 
    Base Model for all classes
 
    """
 

	
 
    @classmethod
 
    def _get_keys(cls):
 
        """return column names for this model """
 
        # Note: not a normal dict - iterator gives "users.firstname", but keys gives "firstname"
 
        return class_mapper(cls).c.keys()
 

	
 
    def get_dict(self):
 
        """
 
        return dict with keys and values corresponding
 
        to this model data """
 

	
 
        d = {}
 
        for k in self._get_keys():
 
            d[k] = getattr(self, k)
 

	
 
        # also use __json__() if present to get additional fields
 
        _json_attr = getattr(self, '__json__', None)
 
        if _json_attr:
 
            # update with attributes from __json__
 
            if callable(_json_attr):
 
                _json_attr = _json_attr()
 
            for k, val in _json_attr.items():
 
                d[k] = val
 
        return d
 

	
 
    def get_appstruct(self):
 
        """return list with keys and values tuples corresponding
 
        to this model data """
 

	
 
        return [
 
            (k, getattr(self, k))
 
            for k in self._get_keys()
 
        ]
 

	
 
    def populate_obj(self, populate_dict):
 
        """populate model with data from given populate_dict"""
 

	
 
        for k in self._get_keys():
 
            if k in populate_dict:
 
                setattr(self, k, populate_dict[k])
 

	
 
    @classmethod
 
    def query(cls):
 
        return Session().query(cls)
 

	
 
    @classmethod
 
    def get(cls, id_):
 
        if id_:
 
            return cls.query().get(id_)
 

	
 
    @classmethod
 
    def guess_instance(cls, value, callback=None):
 
        """Haphazardly attempt to convert `value` to a `cls` instance.
 

	
 
        If `value` is None or already a `cls` instance, return it. If `value`
 
        is a number (or looks like one if you squint just right), assume it's
 
        a database primary key and let SQLAlchemy sort things out. Otherwise,
 
        fall back to resolving it using `callback` (if specified); this could
 
        e.g. be a function that looks up instances by name (though that won't
 
        work if the name begins with a digit). Otherwise, raise Exception.
 
        """
 

	
 
        if value is None:
 
            return None
 
        if isinstance(value, cls):
 
            return value
 
        if isinstance(value, int):
 
            return cls.get(value)
 
        if isinstance(value, str) and value.isdigit():
 
            return cls.get(int(value))
 
        if callback is not None:
 
            return callback(value)
 

	
 
        raise Exception(
 
            'given object must be int, long or Instance of %s '
 
            'got %s, no callback provided' % (cls, type(value))
 
        )
 
@@ -993,301 +992,283 @@ class Repository(Base, BaseDbModel):
 
                             primaryjoin='UserFollowing.follows_repository_id==Repository.repo_id',
 
                             cascade='all')
 
    extra_fields = relationship('RepositoryField',
 
                                cascade="all, delete-orphan")
 

	
 
    logs = relationship('UserLog')
 
    comments = relationship('ChangesetComment', cascade="all, delete-orphan")
 

	
 
    pull_requests_org = relationship('PullRequest',
 
                    primaryjoin='PullRequest.org_repo_id==Repository.repo_id',
 
                    cascade="all, delete-orphan")
 

	
 
    pull_requests_other = relationship('PullRequest',
 
                    primaryjoin='PullRequest.other_repo_id==Repository.repo_id',
 
                    cascade="all, delete-orphan")
 

	
 
    def __repr__(self):
 
        return "<%s %s: %r>" % (self.__class__.__name__,
 
                                  self.repo_id, self.repo_name)
 

	
 
    @hybrid_property
 
    def landing_rev(self):
 
        # always should return [rev_type, rev]
 
        if self._landing_revision:
 
            _rev_info = self._landing_revision.split(':')
 
            if len(_rev_info) < 2:
 
                _rev_info.insert(0, 'rev')
 
            return [_rev_info[0], _rev_info[1]]
 
        return [None, None]
 

	
 
    @landing_rev.setter
 
    def landing_rev(self, val):
 
        if ':' not in val:
 
            raise ValueError('value must be delimited with `:` and consist '
 
                             'of <rev_type>:<rev>, got %s instead' % val)
 
        self._landing_revision = val
 

	
 
    @hybrid_property
 
    def changeset_cache(self):
 
        try:
 
            cs_cache = ext_json.loads(self._changeset_cache) # might raise on bad data
 
            cs_cache['raw_id'] # verify data, raise exception on error
 
            return cs_cache
 
        except (TypeError, KeyError, ValueError):
 
            return EmptyChangeset().__json__()
 

	
 
    @changeset_cache.setter
 
    def changeset_cache(self, val):
 
        try:
 
            self._changeset_cache = ascii_bytes(ext_json.dumps(val))
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    @classmethod
 
    def query(cls, sorted=False):
 
        """Add Repository-specific helpers for common query constructs.
 

	
 
        sorted: if True, apply the default ordering (name, case insensitive).
 
        """
 
        q = super(Repository, cls).query()
 

	
 
        if sorted:
 
            q = q.order_by(sqlalchemy.func.lower(Repository.repo_name))
 

	
 
        return q
 

	
 
    @classmethod
 
    def normalize_repo_name(cls, repo_name):
 
        """
 
        Normalizes os specific repo_name to the format internally stored inside
 
        database using URL_SEP
 

	
 
        :param cls:
 
        :param repo_name:
 
        """
 
        return URL_SEP.join(repo_name.split(os.sep))
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(Repository, cls).guess_instance(value, Repository.get_by_repo_name)
 

	
 
    @classmethod
 
    def get_by_repo_name(cls, repo_name, case_insensitive=False):
 
        """Get the repo, defaulting to database case sensitivity.
 
        case_insensitive will be slower and should only be specified if necessary."""
 
        if case_insensitive:
 
            q = Session().query(cls).filter(sqlalchemy.func.lower(cls.repo_name) == sqlalchemy.func.lower(repo_name))
 
        else:
 
            q = Session().query(cls).filter(cls.repo_name == repo_name)
 
        q = q.options(joinedload(Repository.fork)) \
 
                .options(joinedload(Repository.owner)) \
 
                .options(joinedload(Repository.group))
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get_by_full_path(cls, repo_full_path):
 
        base_full_path = os.path.realpath(cls.base_path())
 
        base_full_path = os.path.realpath(kallithea.CONFIG['base_path'])
 
        repo_full_path = os.path.realpath(repo_full_path)
 
        assert repo_full_path.startswith(base_full_path + os.path.sep)
 
        repo_name = repo_full_path[len(base_full_path) + 1:]
 
        repo_name = cls.normalize_repo_name(repo_name)
 
        return cls.get_by_repo_name(repo_name.strip(URL_SEP))
 

	
 
    @classmethod
 
    def get_repo_forks(cls, repo_id):
 
        return cls.query().filter(Repository.fork_id == repo_id)
 

	
 
    @classmethod
 
    def base_path(cls):
 
        """
 
        Returns base path where all repos are stored
 

	
 
        :param cls:
 
        """
 
        q = Session().query(Ui) \
 
            .filter(Ui.ui_key == URL_SEP)
 
        q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
 
        return q.one().ui_value
 

	
 
    @property
 
    def forks(self):
 
        """
 
        Return forks of this repo
 
        """
 
        return Repository.get_repo_forks(self.repo_id)
 

	
 
    @property
 
    def parent(self):
 
        """
 
        Returns fork parent
 
        """
 
        return self.fork
 

	
 
    @property
 
    def just_name(self):
 
        return self.repo_name.split(URL_SEP)[-1]
 

	
 
    @property
 
    def groups_with_parents(self):
 
        groups = []
 
        group = self.group
 
        while group is not None:
 
            groups.append(group)
 
            group = group.parent_group
 
            assert group not in groups, group # avoid recursion on bad db content
 
        groups.reverse()
 
        return groups
 

	
 
    @LazyProperty
 
    def repo_path(self):
 
        """
 
        Returns base full path for that repository means where it actually
 
        exists on a filesystem
 
        """
 
        q = Session().query(Ui).filter(Ui.ui_key == URL_SEP)
 
        q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
 
        return q.one().ui_value
 

	
 
    @property
 
    def repo_full_path(self):
 
        p = [self.repo_path]
 
        """
 
        Returns base full path for the repository - where it actually
 
        exists on a filesystem.
 
        """
 
        p = [kallithea.CONFIG['base_path']]
 
        # we need to split the name by / since this is how we store the
 
        # names in the database, but that eventually needs to be converted
 
        # into a valid system path
 
        p += self.repo_name.split(URL_SEP)
 
        return os.path.join(*p)
 

	
 
    @property
 
    def cache_keys(self):
 
        """
 
        Returns associated cache keys for that repo
 
        """
 
        return CacheInvalidation.query() \
 
            .filter(CacheInvalidation.cache_args == self.repo_name) \
 
            .order_by(CacheInvalidation.cache_key) \
 
            .all()
 

	
 
    def get_new_name(self, repo_name):
 
        """
 
        returns new full repository name based on assigned group and new new
 

	
 
        :param group_name:
 
        """
 
        path_prefix = self.group.full_path_splitted if self.group else []
 
        return URL_SEP.join(path_prefix + [repo_name])
 

	
 
    @property
 
    def _ui(self):
 
        """
 
        Creates an db based ui object for this repository
 
        """
 
        from kallithea.lib.utils import make_ui
 
        return make_ui()
 

	
 
    @classmethod
 
    def is_valid(cls, repo_name):
 
        """
 
        returns True if given repo name is a valid filesystem repository
 

	
 
        :param cls:
 
        :param repo_name:
 
        """
 
        from kallithea.lib.utils import is_valid_repo
 

	
 
        return is_valid_repo(repo_name, cls.base_path())
 
        return is_valid_repo(repo_name, kallithea.CONFIG['base_path'])
 

	
 
    def get_api_data(self, with_revision_names=False,
 
                           with_pullrequests=False):
 
        """
 
        Common function for generating repo api data.
 
        Optionally, also return tags, branches, bookmarks and PRs.
 
        """
 
        repo = self
 
        data = dict(
 
            repo_id=repo.repo_id,
 
            repo_name=repo.repo_name,
 
            repo_type=repo.repo_type,
 
            clone_uri=repo.clone_uri,
 
            private=repo.private,
 
            created_on=repo.created_on,
 
            description=repo.description,
 
            landing_rev=repo.landing_rev,
 
            owner=repo.owner.username,
 
            fork_of=repo.fork.repo_name if repo.fork else None,
 
            enable_statistics=repo.enable_statistics,
 
            enable_downloads=repo.enable_downloads,
 
            last_changeset=repo.changeset_cache,
 
        )
 
        if with_revision_names:
 
            scm_repo = repo.scm_instance_no_cache()
 
            data.update(dict(
 
                tags=scm_repo.tags,
 
                branches=scm_repo.branches,
 
                bookmarks=scm_repo.bookmarks,
 
            ))
 
        if with_pullrequests:
 
            data['pull_requests'] = repo.pull_requests_other
 
        rc_config = Setting.get_app_settings()
 
        repository_fields = str2bool(rc_config.get('repository_fields'))
 
        if repository_fields:
 
            for f in self.extra_fields:
 
                data[f.field_key_prefixed] = f.field_value
 

	
 
        return data
 

	
 
    @property
 
    def last_db_change(self):
 
        return self.updated_on
 

	
 
    @property
 
    def clone_uri_hidden(self):
 
        clone_uri = self.clone_uri
 
        if clone_uri:
 
            import urlobject
 
            url_obj = urlobject.URLObject(self.clone_uri)
 
            if url_obj.password:
 
                clone_uri = url_obj.with_password('*****')
 
        return clone_uri
 

	
 
    def clone_url(self, clone_uri_tmpl, with_id=False, username=None):
 
        if '{repo}' not in clone_uri_tmpl and '_{repoid}' not in clone_uri_tmpl:
 
            log.error("Configured clone_uri_tmpl %r has no '{repo}' or '_{repoid}' and cannot toggle to use repo id URLs", clone_uri_tmpl)
 
        elif with_id:
 
            clone_uri_tmpl = clone_uri_tmpl.replace('{repo}', '_{repoid}')
 
        else:
 
            clone_uri_tmpl = clone_uri_tmpl.replace('_{repoid}', '{repo}')
 

	
 
        import kallithea.lib.helpers as h
 
        prefix_url = h.canonical_url('home')
 

	
 
        return get_clone_url(clone_uri_tmpl=clone_uri_tmpl,
 
                             prefix_url=prefix_url,
 
                             repo_name=self.repo_name,
 
                             repo_id=self.repo_id,
 
                             username=username)
 

	
 
    def set_state(self, state):
 
        self.repo_state = state
 

	
 
    #==========================================================================
 
    # SCM PROPERTIES
 
    #==========================================================================
 

	
 
    def get_changeset(self, rev=None):
 
        return get_changeset_safe(self.scm_instance, rev)
 

	
 
    def get_landing_changeset(self):
 
        """
 
        Returns landing changeset, or if that doesn't exist returns the tip
 
        """
 
        _rev_type, _rev = self.landing_rev
 
        cs = self.get_changeset(_rev)
 
        if isinstance(cs, EmptyChangeset):
 
            return self.get_changeset()
 
        return cs
 

	
 
    def update_changeset_cache(self, cs_cache=None):
 
        """
 
        Update cache of last changeset for repository, keys should be::
 

	
 
            short_id
 
@@ -2373,169 +2354,158 @@ class PullRequest(Base, BaseDbModel):
 
        else:
 
            s = '/_/' + self.title
 
        kwargs['extra'] = urlreadable(s)
 
        if canonical:
 
            return h.canonical_url('pullrequest_show', repo_name=self.other_repo.repo_name,
 
                                   pull_request_id=self.pull_request_id, **kwargs)
 
        return h.url('pullrequest_show', repo_name=self.other_repo.repo_name,
 
                     pull_request_id=self.pull_request_id, **kwargs)
 

	
 

	
 
class PullRequestReviewer(Base, BaseDbModel):
 
    __tablename__ = 'pull_request_reviewers'
 
    __table_args__ = (
 
        Index('pull_request_reviewers_user_id_idx', 'user_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    def __init__(self, user=None, pull_request=None):
 
        self.user = user
 
        self.pull_request = pull_request
 

	
 
    pull_request_reviewers_id = Column('pull_requests_reviewers_id', Integer(), primary_key=True)
 
    pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=False)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    pull_request = relationship('PullRequest')
 

	
 
    def __json__(self):
 
        return dict(
 
            username=self.user.username if self.user else None,
 
        )
 

	
 

	
 
class Notification(object):
 
    __tablename__ = 'notifications'
 

	
 
class UserNotification(object):
 
    __tablename__ = 'user_to_notification'
 

	
 

	
 
class Gist(Base, BaseDbModel):
 
    __tablename__ = 'gists'
 
    __table_args__ = (
 
        Index('g_gist_access_id_idx', 'gist_access_id'),
 
        Index('g_created_on_idx', 'created_on'),
 
        _table_args_default_dict,
 
    )
 

	
 
    GIST_PUBLIC = 'public'
 
    GIST_PRIVATE = 'private'
 
    DEFAULT_FILENAME = 'gistfile1.txt'
 

	
 
    gist_id = Column(Integer(), primary_key=True)
 
    gist_access_id = Column(Unicode(250), nullable=False)
 
    gist_description = Column(UnicodeText(), nullable=False)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    gist_expires = Column(Float(53), nullable=False)
 
    gist_type = Column(Unicode(128), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    modified_at = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    owner = relationship('User')
 

	
 
    @hybrid_property
 
    def is_expired(self):
 
        return (self.gist_expires != -1) & (time.time() > self.gist_expires)
 

	
 
    def __repr__(self):
 
        return "<%s %s %s>" % (
 
            self.__class__.__name__,
 
            self.gist_type, self.gist_access_id)
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(Gist, cls).guess_instance(value, Gist.get_by_access_id)
 

	
 
    @classmethod
 
    def get_or_404(cls, id_):
 
        res = cls.query().filter(cls.gist_access_id == id_).scalar()
 
        if res is None:
 
            raise HTTPNotFound
 
        return res
 

	
 
    @classmethod
 
    def get_by_access_id(cls, gist_access_id):
 
        return cls.query().filter(cls.gist_access_id == gist_access_id).scalar()
 

	
 
    def gist_url(self):
 
        alias_url = kallithea.CONFIG.get('gist_alias_url')
 
        if alias_url:
 
            return alias_url.replace('{gistid}', self.gist_access_id)
 

	
 
        import kallithea.lib.helpers as h
 
        return h.canonical_url('gist', gist_id=self.gist_access_id)
 

	
 
    @classmethod
 
    def base_path(cls):
 
        """
 
        Returns base path where all gists are stored
 

	
 
        :param cls:
 
        """
 
        from kallithea.model.gist import GIST_STORE_LOC
 
        q = Session().query(Ui) \
 
            .filter(Ui.ui_key == URL_SEP)
 
        q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
 
        return os.path.join(q.one().ui_value, GIST_STORE_LOC)
 

	
 
    def get_api_data(self):
 
        """
 
        Common function for generating gist related data for API
 
        """
 
        gist = self
 
        data = dict(
 
            gist_id=gist.gist_id,
 
            type=gist.gist_type,
 
            access_id=gist.gist_access_id,
 
            description=gist.gist_description,
 
            url=gist.gist_url(),
 
            expires=gist.gist_expires,
 
            created_on=gist.created_on,
 
        )
 
        return data
 

	
 
    def __json__(self):
 
        data = dict(
 
        )
 
        data.update(self.get_api_data())
 
        return data
 

	
 
    ## SCM functions
 

	
 
    @property
 
    def scm_instance(self):
 
        from kallithea.lib.vcs import get_repo
 
        base_path = self.base_path()
 
        return get_repo(os.path.join(base_path, self.gist_access_id))
 
        from kallithea.model.gist import GIST_STORE_LOC
 
        gist_base_path = os.path.join(kallithea.CONFIG['base_path'], GIST_STORE_LOC)
 
        return get_repo(os.path.join(gist_base_path, self.gist_access_id))
 

	
 

	
 
class UserSshKeys(Base, BaseDbModel):
 
    __tablename__ = 'user_ssh_keys'
 
    __table_args__ = (
 
        Index('usk_fingerprint_idx', 'fingerprint'),
 
        UniqueConstraint('fingerprint'),
 
        _table_args_default_dict
 
    )
 
    __mapper_args__ = {}
 

	
 
    user_ssh_key_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    _public_key = Column('public_key', UnicodeText(), nullable=False)
 
    description = Column(UnicodeText(), nullable=False)
 
    fingerprint = Column(String(255), nullable=False, unique=True)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    last_seen = Column(DateTime(timezone=False), nullable=True)
 

	
 
    user = relationship('User')
 

	
 
    @property
 
    def public_key(self):
 
        return self._public_key
 

	
 
    @public_key.setter
 
    def public_key(self, full_key):
 
        # the full public key is too long to be suitable as database key - instead,
 
        # use fingerprints similar to 'ssh-keygen -E sha256 -lf ~/.ssh/id_rsa.pub'
 
        self._public_key = full_key
 
        enc_key = safe_bytes(full_key.split(" ")[1])
 
        self.fingerprint = base64.b64encode(hashlib.sha256(base64.b64decode(enc_key)).digest()).replace(b'\n', b'').rstrip(b'=').decode()
0 comments (0 inline, 0 general)