Changeset - 1e52ed5c37aa
[Not reviewed]
default
0 7 0
Søren Løvborg - 9 years ago 2016-08-03 15:31:23
sorenl@unity3d.com
db: remove deprecated getAll method

Not sure why this wasn't done immediately when get_all was added. Maybe
search and replace was out of order at the time.
7 files changed with 7 insertions and 12 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/api/api.py
Show inline comments
 
@@ -364,193 +364,193 @@ class ApiController(JSONRPCController):
 
            raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        if isinstance(userid, Optional):
 
            userid = apiuser.user_id
 

	
 
        user = get_user_or_error(userid)
 

	
 
        if isinstance(locked, Optional):
 
            lockobj = Repository.getlock(repo)
 

	
 
            if lockobj[0] is None:
 
                _d = {
 
                    'repo': repo.repo_name,
 
                    'locked': False,
 
                    'locked_since': None,
 
                    'locked_by': None,
 
                    'lock_state_changed': False,
 
                    'msg': 'Repo `%s` not locked.' % repo.repo_name
 
                }
 
                return _d
 
            else:
 
                userid, time_ = lockobj
 
                lock_user = get_user_or_error(userid)
 
                _d = {
 
                    'repo': repo.repo_name,
 
                    'locked': True,
 
                    'locked_since': time_,
 
                    'locked_by': lock_user.username,
 
                    'lock_state_changed': False,
 
                    'msg': ('Repo `%s` locked by `%s` on `%s`.'
 
                            % (repo.repo_name, lock_user.username,
 
                               json.dumps(time_to_datetime(time_))))
 
                }
 
                return _d
 

	
 
        # force locked state through a flag
 
        else:
 
            locked = str2bool(locked)
 
            try:
 
                if locked:
 
                    lock_time = time.time()
 
                    Repository.lock(repo, user.user_id, lock_time)
 
                else:
 
                    lock_time = None
 
                    Repository.unlock(repo)
 
                _d = {
 
                    'repo': repo.repo_name,
 
                    'locked': locked,
 
                    'locked_since': lock_time,
 
                    'locked_by': user.username,
 
                    'lock_state_changed': True,
 
                    'msg': ('User `%s` set lock state for repo `%s` to `%s`'
 
                            % (user.username, repo.repo_name, locked))
 
                }
 
                return _d
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                raise JSONRPCError(
 
                    'Error occurred locking repository `%s`' % repo.repo_name
 
                )
 

	
 
    def get_locks(self, apiuser, userid=Optional(OAttr('apiuser'))):
 
        """
 
        Get all repositories with locks for given userid, if
 
        this command is run by non-admin account userid is set to user
 
        who is calling this method, thus returning locks for himself.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param userid: User to get locks for
 
        :type userid: Optional(str or int)
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            [repo_object, repo_object,...]
 
          }
 
          error :  null
 
        """
 

	
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # make sure normal user does not pass someone else userid,
 
            # he is not allowed to do that
 
            if not isinstance(userid, Optional) and userid != apiuser.user_id:
 
                raise JSONRPCError(
 
                    'userid is not the same as your user'
 
                )
 

	
 
        ret = []
 
        if isinstance(userid, Optional):
 
            user = None
 
        else:
 
            user = get_user_or_error(userid)
 

	
 
        # show all locks
 
        for r in Repository.getAll():
 
        for r in Repository.get_all():
 
            userid, time_ = r.locked
 
            if time_:
 
                _api_data = r.get_api_data()
 
                # if we use userfilter just show the locks for this user
 
                if user is not None:
 
                    if safe_int(userid) == user.user_id:
 
                        ret.append(_api_data)
 
                else:
 
                    ret.append(_api_data)
 

	
 
        return ret
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def get_ip(self, apiuser, userid=Optional(OAttr('apiuser'))):
 
        """
 
        Shows IP address as seen from Kallithea server, together with all
 
        defined IP addresses for given user. If userid is not passed data is
 
        returned for user who's calling this function.
 
        This command can be executed only using api_key belonging to user with
 
        admin rights.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param userid: username to show ips for
 
        :type userid: Optional(str or int)
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result : {
 
                         "server_ip_addr": "<ip_from_clien>",
 
                         "user_ips": [
 
                                        {
 
                                           "ip_addr": "<ip_with_mask>",
 
                                           "ip_range": ["<start_ip>", "<end_ip>"],
 
                                        },
 
                                        ...
 
                                     ]
 
            }
 

	
 
        """
 
        if isinstance(userid, Optional):
 
            userid = apiuser.user_id
 
        user = get_user_or_error(userid)
 
        ips = UserIpMap.query().filter(UserIpMap.user == user).all()
 
        return dict(
 
            server_ip_addr=self.ip_addr,
 
            user_ips=ips
 
        )
 

	
 
    # alias for old
 
    show_ip = get_ip
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def get_server_info(self, apiuser):
 
        """
 
        return server info, including Kallithea version and installed packages
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            'modules': [<module name>,...]
 
            'py_version': <python version>,
 
            'platform': <platform type>,
 
            'kallithea_version': <kallithea version>
 
          }
 
          error :  null
 
        """
 
        return Setting.get_server_info()
 

	
 
    def get_user(self, apiuser, userid=Optional(OAttr('apiuser'))):
 
        """
 
        Gets a user by username or user_id, Returns empty result if user is
 
        not found. If userid param is skipped it is set to id of user who is
 
        calling this method. This command can be executed only using api_key
 
        belonging to user with admin rights, or regular users that cannot
 
        specify different userid than theirs
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param userid: user to get data for
 
        :type userid: Optional(str or int)
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: None if user does not exist or
 
                    {
 
                        "user_id" :     "<user_id>",
 
                        "api_key" :     "<api_key>",
 
                        "api_keys":     "[<list of all API keys including additional ones>]"
 
                        "username" :    "<username>",
kallithea/lib/paster_commands/update_repoinfo.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.paster_commands.update_repoinfo
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
update-repoinfo paster command for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jul 14, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import os
 
import sys
 
import string
 

	
 
from kallithea.lib.paster_commands.common import BasePasterCommand
 
from kallithea.lib.utils2 import safe_unicode
 
from kallithea.model.db import Repository
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.meta import Session
 

	
 
# Add location of top level folder to sys.path
 
from os.path import dirname
 
rc_path = dirname(dirname(dirname(os.path.realpath(__file__))))
 
sys.path.append(rc_path)
 

	
 

	
 
class Command(BasePasterCommand):
 

	
 
    max_args = 1
 
    min_args = 1
 

	
 
    usage = "CONFIG_FILE"
 
    group_name = "Kallithea"
 
    takes_config_file = -1
 
    parser = BasePasterCommand.standard_parser(verbose=True)
 
    summary = "Updates repositories caches for last changeset"
 

	
 
    def command(self):
 
        #get SqlAlchemy session
 
        self._init_session()
 

	
 

	
 
        if self.options.repo_update_list is None:
 
            repo_list = Repository.getAll()
 
            repo_list = Repository.get_all()
 
        else:
 
            repo_names = [safe_unicode(n.strip())
 
                          for n in self.options.repo_update_list.split(',')]
 
            repo_list = list(Repository.query()
 
                .filter(Repository.repo_name.in_(repo_names)))
 
        RepoModel.update_repoinfo(repositories=repo_list)
 
        Session().commit()
 

	
 
        if self.options.invalidate_cache:
 
            for r in repo_list:
 
                r.set_invalidate()
 
        print 'Updated cache for %s repositories' % (len(repo_list))
 

	
 
    def update_parser(self):
 
        self.parser.add_option('--update-only',
 
                           action='store',
 
                           dest='repo_update_list',
 
                           help="Specifies a comma separated list of repositories "
 
                                "to update last commit info for. OPTIONAL")
 
        self.parser.add_option('--invalidate-cache',
 
                           action='store_true',
 
                           dest='invalidate_cache',
 
                           help="Trigger cache invalidation event for repos. "
 
                                "OPTIONAL")
kallithea/model/__init__.py
Show inline comments
 
@@ -45,97 +45,97 @@ Original author and date, and relevant c
 

	
 

	
 
import logging
 
from kallithea.model import meta
 
from kallithea.lib.utils2 import safe_str, obfuscate_url_pw
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def init_model(engine):
 
    """
 
    Initializes db session, bind the engine with the metadata,
 
    Call this before using any of the tables or classes in the model,
 
    preferably once in application start
 

	
 
    :param engine: engine to bind to
 
    """
 
    engine_str = obfuscate_url_pw(str(engine.url))
 
    log.info("initializing db for %s", engine_str)
 
    meta.Base.metadata.bind = engine
 

	
 

	
 
class BaseModel(object):
 
    """
 
    Base Model for all Kallithea models, it adds sql alchemy session
 
    into instance of model
 

	
 
    :param sa: If passed it reuses this session instead of creating a new one
 
    """
 

	
 
    cls = None  # override in child class
 

	
 
    def __init__(self, sa=None):
 
        if sa is not None:
 
            self.sa = sa
 
        else:
 
            self.sa = meta.Session()
 

	
 
    def _get_instance(self, cls, instance, callback=None):
 
        """
 
        Gets instance of given cls using some simple lookup mechanism.
 

	
 
        :param cls: class to fetch
 
        :param instance: int or Instance
 
        :param callback: callback to call if all lookups failed
 
        """
 

	
 
        if isinstance(instance, cls):
 
            return instance
 
        elif isinstance(instance, (int, long)) or safe_str(instance).isdigit():
 
            return cls.get(instance)
 
        else:
 
            if instance is not None:
 
                if callback is None:
 
                    raise Exception(
 
                        'given object must be int, long or Instance of %s '
 
                        'got %s, no callback provided' % (cls, type(instance))
 
                    )
 
                else:
 
                    return callback(instance)
 

	
 
    def _get_user(self, user):
 
        """
 
        Helper method to get user by ID, or username fallback
 

	
 
        :param user: UserID, username, or User instance
 
        """
 
        from kallithea.model.db import User
 
        return self._get_instance(User, user,
 
                                  callback=User.get_by_username)
 

	
 
    def _get_repo(self, repository):
 
        """
 
        Helper method to get repository by ID, or repository name
 

	
 
        :param repository: RepoID, repository name or Repository Instance
 
        """
 
        from kallithea.model.db import Repository
 
        return self._get_instance(Repository, repository,
 
                                  callback=Repository.get_by_repo_name)
 

	
 
    def _get_perm(self, permission):
 
        """
 
        Helper method to get permission by ID, or permission name
 

	
 
        :param permission: PermissionID, permission_name or Permission instance
 
        """
 
        from kallithea.model.db import Permission
 
        return self._get_instance(Permission, permission,
 
                                  callback=Permission.get_by_key)
 

	
 
    @classmethod
 
    def get_all(cls):
 
        """
 
        Returns all instances of what is defined in `cls` class variable
 
        """
 
        return cls.cls.getAll()
 
        return cls.cls.get_all()
kallithea/model/db.py
Show inline comments
 
@@ -41,197 +41,192 @@ from sqlalchemy.orm import relationship,
 
from beaker.cache import cache_region, region_invalidate
 
from webob.exc import HTTPNotFound
 

	
 
from pylons.i18n.translation import lazy_ugettext as _
 

	
 
from kallithea.lib.exceptions import DefaultUserException
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 

	
 
from kallithea.lib.utils2 import str2bool, safe_str, get_changeset_safe, \
 
    safe_unicode, remove_prefix, time_to_datetime, aslist, Optional, safe_int, \
 
    get_clone_url, urlreadable
 
from kallithea.lib.compat import json
 
from kallithea.lib.caching_query import FromCache
 

	
 
from kallithea.model.meta import Base, Session
 

	
 
URL_SEP = '/'
 
log = logging.getLogger(__name__)
 

	
 
#==============================================================================
 
# BASE CLASSES
 
#==============================================================================
 

	
 
_hash_key = lambda k: hashlib.md5(safe_str(k)).hexdigest()
 

	
 

	
 
class BaseModel(object):
 
    """
 
    Base Model for all classes
 
    """
 

	
 
    @classmethod
 
    def _get_keys(cls):
 
        """return column names for this model """
 
        return class_mapper(cls).c.keys()
 

	
 
    def get_dict(self):
 
        """
 
        return dict with keys and values corresponding
 
        to this model data """
 

	
 
        d = {}
 
        for k in self._get_keys():
 
            d[k] = getattr(self, k)
 

	
 
        # also use __json__() if present to get additional fields
 
        _json_attr = getattr(self, '__json__', None)
 
        if _json_attr:
 
            # update with attributes from __json__
 
            if callable(_json_attr):
 
                _json_attr = _json_attr()
 
            for k, val in _json_attr.iteritems():
 
                d[k] = val
 
        return d
 

	
 
    def get_appstruct(self):
 
        """return list with keys and values tuples corresponding
 
        to this model data """
 

	
 
        l = []
 
        for k in self._get_keys():
 
            l.append((k, getattr(self, k),))
 
        return l
 

	
 
    def populate_obj(self, populate_dict):
 
        """populate model with data from given populate_dict"""
 

	
 
        for k in self._get_keys():
 
            if k in populate_dict:
 
                setattr(self, k, populate_dict[k])
 

	
 
    @classmethod
 
    def query(cls):
 
        return Session().query(cls)
 

	
 
    @classmethod
 
    def get(cls, id_):
 
        if id_:
 
            return cls.query().get(id_)
 

	
 
    @classmethod
 
    def get_or_404(cls, id_):
 
        try:
 
            id_ = int(id_)
 
        except (TypeError, ValueError):
 
            raise HTTPNotFound
 

	
 
        res = cls.query().get(id_)
 
        if res is None:
 
            raise HTTPNotFound
 
        return res
 

	
 
    @classmethod
 
    def getAll(cls):
 
        # deprecated and left for backward compatibility
 
        return cls.get_all()
 

	
 
    @classmethod
 
    def get_all(cls):
 
        return cls.query().all()
 

	
 
    @classmethod
 
    def delete(cls, id_):
 
        obj = cls.query().get(id_)
 
        Session().delete(obj)
 

	
 
    def __repr__(self):
 
        if hasattr(self, '__unicode__'):
 
            # python repr needs to return str
 
            try:
 
                return safe_str(self.__unicode__())
 
            except UnicodeDecodeError:
 
                pass
 
        return '<DB:%s>' % (self.__class__.__name__)
 

	
 

	
 
_table_args_default_dict = {'extend_existing': True,
 
                            'mysql_engine': 'InnoDB',
 
                            'mysql_charset': 'utf8',
 
                            'sqlite_autoincrement': True,
 
                           }
 

	
 
class Setting(Base, BaseModel):
 
    __tablename__ = 'settings'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    SETTINGS_TYPES = {
 
        'str': safe_str,
 
        'int': safe_int,
 
        'unicode': safe_unicode,
 
        'bool': str2bool,
 
        'list': functools.partial(aslist, sep=',')
 
    }
 
    DEFAULT_UPDATE_URL = ''
 

	
 
    app_settings_id = Column(Integer(), primary_key=True)
 
    app_settings_name = Column(String(255), nullable=False, unique=True)
 
    _app_settings_value = Column("app_settings_value", Unicode(4096), nullable=False)
 
    _app_settings_type = Column("app_settings_type", String(255), nullable=True) # FIXME: not nullable?
 

	
 
    def __init__(self, key='', val='', type='unicode'):
 
        self.app_settings_name = key
 
        self.app_settings_value = val
 
        self.app_settings_type = type
 

	
 
    @validates('_app_settings_value')
 
    def validate_settings_value(self, key, val):
 
        assert type(val) == unicode
 
        return val
 

	
 
    @hybrid_property
 
    def app_settings_value(self):
 
        v = self._app_settings_value
 
        _type = self.app_settings_type
 
        converter = self.SETTINGS_TYPES.get(_type) or self.SETTINGS_TYPES['unicode']
 
        return converter(v)
 

	
 
    @app_settings_value.setter
 
    def app_settings_value(self, val):
 
        """
 
        Setter that will always make sure we use unicode in app_settings_value
 

	
 
        :param val:
 
        """
 
        self._app_settings_value = safe_unicode(val)
 

	
 
    @hybrid_property
 
    def app_settings_type(self):
 
        return self._app_settings_type
 

	
 
    @app_settings_type.setter
 
    def app_settings_type(self, val):
 
        if val not in self.SETTINGS_TYPES:
 
            raise Exception('type must be one of %s got %s'
 
                            % (self.SETTINGS_TYPES.keys(), val))
 
        self._app_settings_type = val
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s[%s]')>" % (
 
            self.__class__.__name__,
 
            self.app_settings_name, self.app_settings_value, self.app_settings_type
 
        )
 

	
 
    @classmethod
 
    def get_by_name(cls, key):
 
        return cls.query() \
 
            .filter(cls.app_settings_name == key).scalar()
 

	
 
    @classmethod
 
    def get_by_name_or_create(cls, key, val='', type='unicode'):
 
        res = cls.get_by_name(key)
 
        if res is None:
kallithea/model/repo.py
Show inline comments
 
@@ -87,193 +87,193 @@ class RepoModel(BaseModel):
 
        return repo_to_perm
 

	
 
    @LazyProperty
 
    def repos_path(self):
 
        """
 
        Gets the repositories root path from database
 
        """
 

	
 
        q = self.sa.query(Ui).filter(Ui.ui_key == '/').one()
 
        return q.ui_value
 

	
 
    def get(self, repo_id, cache=False):
 
        repo = self.sa.query(Repository) \
 
            .filter(Repository.repo_id == repo_id)
 

	
 
        if cache:
 
            repo = repo.options(FromCache("sql_cache_short",
 
                                          "get_repo_%s" % repo_id))
 
        return repo.scalar()
 

	
 
    def get_repo(self, repository):
 
        return self._get_repo(repository)
 

	
 
    def get_by_repo_name(self, repo_name, cache=False):
 
        repo = self.sa.query(Repository) \
 
            .filter(Repository.repo_name == repo_name)
 

	
 
        if cache:
 
            repo = repo.options(FromCache("sql_cache_short",
 
                                          "get_repo_%s" % repo_name))
 
        return repo.scalar()
 

	
 
    def get_all_user_repos(self, user):
 
        """
 
        Gets all repositories that user have at least read access
 

	
 
        :param user:
 
        """
 
        from kallithea.lib.auth import AuthUser
 
        user = self._get_user(user)
 
        repos = AuthUser(user_id=user.user_id).permissions['repositories']
 
        access_check = lambda r: r[1] in ['repository.read',
 
                                          'repository.write',
 
                                          'repository.admin']
 
        repos = [x[0] for x in filter(access_check, repos.items())]
 
        return Repository.query().filter(Repository.repo_name.in_(repos))
 

	
 
    def get_users_js(self):
 
        users = self.sa.query(User) \
 
            .filter(User.active == True) \
 
            .order_by(User.name, User.lastname) \
 
            .all()
 
        return json.dumps([
 
            {
 
                'id': u.user_id,
 
                'fname': h.escape(u.name),
 
                'lname': h.escape(u.lastname),
 
                'nname': u.username,
 
                'gravatar_lnk': h.gravatar_url(u.email, size=28, default='default'),
 
                'gravatar_size': 14,
 
            } for u in users]
 
        )
 

	
 
    def get_user_groups_js(self):
 
        user_groups = self.sa.query(UserGroup) \
 
            .filter(UserGroup.users_group_active == True) \
 
            .order_by(UserGroup.users_group_name) \
 
            .options(subqueryload(UserGroup.members)) \
 
            .all()
 
        user_groups = UserGroupList(user_groups, perm_set=['usergroup.read',
 
                                                           'usergroup.write',
 
                                                           'usergroup.admin'])
 
        return json.dumps([
 
            {
 
                'id': gr.users_group_id,
 
                'grname': gr.users_group_name,
 
                'grmembers': len(gr.members),
 
            } for gr in user_groups]
 
        )
 

	
 
    @classmethod
 
    def _render_datatable(cls, tmpl, *args, **kwargs):
 
        import kallithea
 
        from pylons import tmpl_context as c
 
        from pylons.i18n.translation import _
 

	
 
        _tmpl_lookup = kallithea.CONFIG['pylons.app_globals'].mako_lookup
 
        template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
 

	
 
        tmpl = template.get_def(tmpl)
 
        kwargs.update(dict(_=_, h=h, c=c))
 
        return tmpl.render(*args, **kwargs)
 

	
 
    @classmethod
 
    def update_repoinfo(cls, repositories=None):
 
        if repositories is None:
 
            repositories = Repository.getAll()
 
            repositories = Repository.get_all()
 
        for repo in repositories:
 
            repo.update_changeset_cache()
 

	
 
    def get_repos_as_dict(self, repos_list=None, admin=False, perm_check=True,
 
                          super_user_actions=False, short_name=False):
 
        _render = self._render_datatable
 
        from pylons import tmpl_context as c
 

	
 
        def quick_menu(repo_name):
 
            return _render('quick_menu', repo_name)
 

	
 
        def repo_lnk(name, rtype, rstate, private, fork_of):
 
            return _render('repo_name', name, rtype, rstate, private, fork_of,
 
                           short_name=short_name, admin=False)
 

	
 
        def last_change(last_change):
 
            return _render("last_change", last_change)
 

	
 
        def rss_lnk(repo_name):
 
            return _render("rss", repo_name)
 

	
 
        def atom_lnk(repo_name):
 
            return _render("atom", repo_name)
 

	
 
        def last_rev(repo_name, cs_cache):
 
            return _render('revision', repo_name, cs_cache.get('revision'),
 
                           cs_cache.get('raw_id'), cs_cache.get('author'),
 
                           cs_cache.get('message'))
 

	
 
        def desc(desc):
 
            return h.urlify_text(desc, truncate=80, stylize=c.visual.stylify_metatags)
 

	
 
        def state(repo_state):
 
            return _render("repo_state", repo_state)
 

	
 
        def repo_actions(repo_name):
 
            return _render('repo_actions', repo_name, super_user_actions)
 

	
 
        def owner_actions(user_id, username):
 
            return _render('user_name', user_id, username)
 

	
 
        repos_data = []
 
        for repo in repos_list:
 
            if perm_check:
 
                # check permission at this level
 
                if not HasRepoPermissionAny(
 
                        'repository.read', 'repository.write',
 
                        'repository.admin'
 
                )(repo.repo_name, 'get_repos_as_dict check'):
 
                    continue
 
            cs_cache = repo.changeset_cache
 
            row = {
 
                "menu": quick_menu(repo.repo_name),
 
                "raw_name": repo.repo_name,
 
                "name": repo_lnk(repo.repo_name, repo.repo_type,
 
                                 repo.repo_state, repo.private, repo.fork),
 
                "last_change_iso": repo.last_db_change.isoformat(),
 
                "last_change": last_change(repo.last_db_change),
 
                "last_changeset": last_rev(repo.repo_name, cs_cache),
 
                "last_rev_raw": cs_cache.get('revision'),
 
                "desc": desc(repo.description),
 
                "owner": h.person(repo.user),
 
                "state": state(repo.repo_state),
 
                "rss": rss_lnk(repo.repo_name),
 
                "atom": atom_lnk(repo.repo_name),
 

	
 
            }
 
            if admin:
 
                row.update({
 
                    "action": repo_actions(repo.repo_name),
 
                    "owner": owner_actions(repo.user.user_id,
 
                                           h.person(repo.user))
 
                })
 
            repos_data.append(row)
 

	
 
        return {
 
            "totalRecords": len(repos_list),
 
            "startIndex": 0,
 
            "sort": "name",
 
            "dir": "asc",
 
            "records": repos_data
 
        }
 

	
 
    def _get_defaults(self, repo_name):
 
        """
 
        Gets information about repository, and returns a dict for
 
        usage in forms
 

	
 
        :param repo_name:
 
        """
 

	
 
        repo_info = Repository.get_by_repo_name(repo_name)
 

	
 
        if repo_info is None:
 
            return None
 

	
kallithea/tests/functional/test_home.py
Show inline comments
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.model.meta import Session
 
from kallithea.model.db import Repository
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestHomeController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='home', action='index'))
 
        #if global permission is set
 
        response.mustcontain('Add Repository')
 
        # html in javascript variable:
 
        response.mustcontain('var data = {"totalRecords": %s' % len(Repository.getAll()))
 
        response.mustcontain('var data = {"totalRecords": %s' % len(Repository.get_all()))
 
        response.mustcontain(r'href=\"/%s\"' % HG_REPO)
 

	
 
        response.mustcontain(r'<span class="repotag">git')
 
        response.mustcontain(r'<i class=\"icon-globe\"')
 

	
 
        response.mustcontain("""fixes issue with having custom format for git-log""")
 
        response.mustcontain("""/%s/changeset/5f2c6ee195929b0be80749243c18121c9864a3b3""" % GIT_REPO)
 

	
 
        response.mustcontain("""disable security checks on hg clone for travis""")
 
        response.mustcontain("""/%s/changeset/96507bd11ecc815ebc6270fdf6db110928c09c1e""" % HG_REPO)
 

	
 
    def test_repo_summary_with_anonymous_access_disabled(self):
 
        with fixture.anon_access(False):
 
            response = self.app.get(url(controller='summary',
 
                                        action='index', repo_name=HG_REPO),
 
                                        status=302)
 
            assert 'login' in response.location
 

	
 
    def test_index_with_anonymous_access_disabled(self):
 
        with fixture.anon_access(False):
 
            response = self.app.get(url(controller='home', action='index'),
 
                                    status=302)
 
            assert 'login' in response.location
 

	
 
    def test_index_page_on_groups(self):
 
        self.log_user()
 
        gr = fixture.create_repo_group(u'gr1')
 
        fixture.create_repo(name=u'gr1/repo_in_group', repo_group=gr)
 
        response = self.app.get(url('repos_group_home', group_name=u'gr1'))
 

	
 
        try:
 
            response.mustcontain(u"gr1/repo_in_group")
 
        finally:
 
            RepoModel().delete(u'gr1/repo_in_group')
 
            RepoGroupModel().delete(repo_group=u'gr1', force_delete=True)
 
            Session().commit()
kallithea/tests/other/manual_test_vcs_operations.py
Show inline comments
 
@@ -434,132 +434,132 @@ class TestVCSOperations(TestController):
 
                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
 
        assert msg in stderr
 

	
 
    def test_push_on_locked_repo_by_other_user_git(self):
 
        #clone some temp
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        #lock repo
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        # let this user actually push !
 
        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
 
                                          perm='repository.write')
 
        Session().commit()
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 

	
 
        #push fails repo is locked by other user !
 
        stdout, stderr = _add_files_and_push('git', DEST,
 
                                             user=TEST_USER_REGULAR_LOGIN,
 
                                             passwd=TEST_USER_REGULAR_PASS)
 
        err = 'Repository `%s` locked by user `%s`' % (GIT_REPO, TEST_USER_ADMIN_LOGIN)
 
        assert err in stderr
 

	
 
        #TODO: fix this somehow later on Git, Git is stupid and even if we throw
 
        #back 423 to it, it makes ANOTHER request and we fail there with 405 :/
 

	
 
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
                % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
 
        #msg = "405 Method Not Allowed"
 
        #assert msg in stderr
 

	
 
    def test_push_unlocks_repository_hg(self):
 
        # enable locking
 
        fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(HG_REPO, fork_name)
 
        r = Repository.get_by_repo_name(fork_name)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        #clone some temp
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(fork_name, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        #check for lock repo after clone
 
        r = Repository.get_by_repo_name(fork_name)
 
        uid = User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 
        assert r.locked[0] == uid
 

	
 
        #push is ok and repo is now unlocked
 
        stdout, stderr = _add_files_and_push('hg', DEST, clone_url=clone_url.split()[0])
 
        assert str('remote: Released lock on repo `%s`' % fork_name) in stdout
 
        #we need to cleanup the Session Here !
 
        Session.remove()
 
        r = Repository.get_by_repo_name(fork_name)
 
        assert r.locked == [None, None]
 

	
 
    #TODO: fix me ! somehow during tests hooks don't get called on Git
 
    def test_push_unlocks_repository_git(self):
 
        # enable locking
 
        fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(GIT_REPO, fork_name)
 
        r = Repository.get_by_repo_name(fork_name)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        #clone some temp
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(fork_name, dest=DEST)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        #check for lock repo after clone
 
        r = Repository.get_by_repo_name(fork_name)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 
        #push is ok and repo is now unlocked
 
        stdout, stderr = _add_files_and_push('git', DEST, clone_url=clone_url.split()[0])
 
        _check_proper_git_push(stdout, stderr)
 

	
 
        assert ('remote: Released lock on repo `%s`' % fork_name) in stderr
 
        #we need to cleanup the Session Here !
 
        Session.remove()
 
        r = Repository.get_by_repo_name(fork_name)
 
        assert r.locked == [None, None]
 

	
 
    def test_ip_restriction_hg(self):
 
        user_model = UserModel()
 
        try:
 
            user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
 
            Session().commit()
 
            clone_url = _construct_url(HG_REPO)
 
            stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 
            assert 'abort: HTTP Error 403: Forbidden' in stderr
 
        finally:
 
            #release IP restrictions
 
            for ip in UserIpMap.getAll():
 
            for ip in UserIpMap.get_all():
 
                UserIpMap.delete(ip.ip_id)
 
            Session().commit()
 

	
 
        time.sleep(2)
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        assert 'requesting all changes' in stdout
 
        assert 'adding changesets' in stdout
 
        assert 'adding manifests' in stdout
 
        assert 'adding file changes' in stdout
 

	
 
        assert stderr == ''
 

	
 
    def test_ip_restriction_git(self):
 
        user_model = UserModel()
 
        try:
 
            user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
 
            Session().commit()
 
            clone_url = _construct_url(GIT_REPO)
 
            stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 
            # The message apparently changed in Git 1.8.3, so match it loosely.
 
            assert re.search(r'\b403\b', stderr)
 
        finally:
 
            #release IP restrictions
 
            for ip in UserIpMap.getAll():
 
            for ip in UserIpMap.get_all():
 
                UserIpMap.delete(ip.ip_id)
 
            Session().commit()
 

	
 
        time.sleep(2)
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        assert 'Cloning into' in stdout + stderr
 
        assert stderr == '' or stdout == ''
0 comments (0 inline, 0 general)