Changeset - 08e8115585bd
[Not reviewed]
beta
0 4 0
Marcin Kuzminski - 13 years ago 2013-03-05 20:03:33
marcin@python-works.com
calling lock function without lock attribute, will return lock state
4 files changed with 41 insertions and 5 deletions:
0 comments (0 inline, 0 general)
docs/api/api.rst
Show inline comments
 
@@ -85,205 +85,206 @@ after that simply run any api command fo
 
 {'error': 'Missing non optional `repoid` arg in JSON DATA',
 
  'id': 75,
 
  'result': None}
 

	
 
Ups looks like we forgot to add an argument
 

	
 
Let's try again now giving the repoid as parameters::
 

	
 
    rhodecode-api get_repo repoid:rhodecode
 

	
 
    calling {"api_key": "<apikey>", "id": 39, "args": {"repoid": "rhodecode"}, "method": "get_repo"} to http://127.0.0.1:5000
 
    rhodecode said:
 
    {'error': None,
 
     'id': 39,
 
     'result': <json data...>}
 

	
 

	
 

	
 
API METHODS
 
+++++++++++
 

	
 

	
 
pull
 
----
 

	
 
Pulls given repo from remote location. Can be used to automatically keep
 
remote repos up to date. This command can be executed only using api_key
 
belonging to user with admin rights
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "pull"
 
    args :    {
 
                "repoid" : "<reponame or repo_id>"
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result : "Pulled from `<reponame>`"
 
    error :  null
 

	
 

	
 
rescan_repos
 
------------
 

	
 
Dispatch rescan repositories action. If remove_obsolete is set
 
RhodeCode will delete repos that are in database but not in the filesystem.
 
This command can be executed only using api_key belonging to user with admin
 
rights.
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "rescan_repos"
 
    args :    {
 
                "remove_obsolete" : "<boolean = Optional(False)>"
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result : "{'added': [<list of names of added repos>],
 
               'removed': [<list of names of removed repos>]}"
 
    error :  null
 

	
 

	
 
invalidate_cache
 
----------------
 

	
 
Invalidate cache for repository.
 
This command can be executed only using api_key belonging to user with admin
 
rights or regular user that have write or admin or write access to repository.
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "invalidate_cache"
 
    args :    {
 
                "repoid" : "<reponame or repo_id>"
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result : "Cache for repository `<reponame>` was invalidated: invalidated cache keys: <list_of_cache_keys>"
 
    error :  null
 

	
 
lock
 
----
 

	
 
Set locking state on given repository by given user. If userid param is skipped
 
, then it is set to id of user whos calling this method.
 
, then it is set to id of user whos calling this method. If locked param is skipped
 
then function shows current lock state of given repo.
 
This command can be executed only using api_key belonging to user with admin
 
rights or regular user that have admin or write access to repository.
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "lock"
 
    args :    {
 
                "repoid" : "<reponame or repo_id>"
 
                "userid" : "<user_id or username = Optional(=apiuser)>",
 
                "locked" : "<bool true|false>"
 
                "locked" : "<bool true|false = Optional(=None)>"
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result : "User `<username>` set lock state for repo `<reponame>` to `true|false`"
 
    error :  null
 

	
 

	
 
show_ip
 
-------
 

	
 
Shows IP address as seen from RhodeCode server, together with all
 
defined IP addresses for given user.
 
This command can be executed only using api_key belonging to user with admin
 
rights.
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "show_ip"
 
    args :    {
 
                "userid" : "<user_id or username>",
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result : {
 
                 "ip_addr_server": <ip_from_clien>",
 
                 "user_ips": [
 
                                {
 
                                   "ip_addr": "<ip_with_mask>",
 
                                   "ip_range": ["<start_ip>", "<end_ip>"],
 
                                },
 
                                ...
 
                             ]
 
             }
 

	
 
    error :  null
 

	
 

	
 
get_user
 
--------
 

	
 
Get's an user by username or user_id, Returns empty result if user is not found.
 
If userid param is skipped it is set to id of user who is calling this method.
 
This command can be executed only using api_key belonging to user with admin
 
rights, or regular users that cannot specify different userid than theirs
 

	
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "get_user"
 
    args :    {
 
                "userid" : "<username or user_id Optional(=apiuser)>"
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: None if user does not exist or
 
            {
 
                "user_id" :     "<user_id>",
 
                "api_key" :     "<api_key>",
 
                "username" :    "<username>",
 
                "firstname":    "<firstname>",
 
                "lastname" :    "<lastname>",
 
                "email" :       "<email>",
 
                "emails":       "<list_of_all_additional_emails>",
 
                "ip_addresses": "<list_of_ip_addresses_for_user>",
 
                "active" :      "<bool>",
 
                "admin" :       "<bool>",
 
                "ldap_dn" :     "<ldap_dn>",
 
                "last_login":   "<last_login>",
 
                "permissions": {
 
                    "global": ["hg.create.repository",
 
                               "repository.read",
 
                               "hg.register.manual_activate"],
 
                    "repositories": {"repo1": "repository.none"},
 
                    "repositories_groups": {"Group1": "group.read"}
 
                 },
 
            }
 

	
 
    error:  null
 

	
 

	
 
get_users
 
---------
 

	
 
Lists all existing users. This command can be executed only using api_key
 
belonging to user with admin rights.
 

	
rhodecode/controllers/api/api.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.controllers.api
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    API controller for RhodeCode
 

	
 
    :created_on: Aug 20, 2011
 
    :author: marcink
 
    :copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software; you can redistribute it and/or
 
# modify it under the terms of the GNU General Public License
 
# as published by the Free Software Foundation; version 2
 
# of the License or (at your opinion) any later version of the license.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program; if not, write to the Free Software
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 
# MA  02110-1301, USA.
 

	
 
import traceback
 
import logging
 
from pylons.controllers.util import abort
 

	
 
from rhodecode.controllers.api import JSONRPCController, JSONRPCError
 
from rhodecode.lib.auth import PasswordGenerator, AuthUser, \
 
    HasPermissionAllDecorator, HasPermissionAnyDecorator, \
 
    HasPermissionAnyApi, HasRepoPermissionAnyApi
 
from rhodecode.lib.utils import map_groups, repo2db_mapper
 
from rhodecode.lib.utils2 import str2bool
 
from rhodecode.lib.utils2 import str2bool, time_to_datetime
 
from rhodecode.lib import helpers as h
 
from rhodecode.model.meta import Session
 
from rhodecode.model.scm import ScmModel
 
from rhodecode.model.repo import RepoModel
 
from rhodecode.model.user import UserModel
 
from rhodecode.model.users_group import UserGroupModel
 
from rhodecode.model.permission import PermissionModel
 
from rhodecode.model.db import Repository, RhodeCodeSetting, UserIpMap
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class OptionalAttr(object):
 
    """
 
    Special Optional Option that defines other attribute
 
    """
 
    def __init__(self, attr_name):
 
        self.attr_name = attr_name
 

	
 
    def __repr__(self):
 
        return '<OptionalAttr:%s>' % self.attr_name
 

	
 
    def __call__(self):
 
        return self
 
#alias
 
OAttr = OptionalAttr
 

	
 

	
 
class Optional(object):
 
    """
 
    Defines an optional parameter::
 

	
 
        param = param.getval() if isinstance(param, Optional) else param
 
        param = param() if isinstance(param, Optional) else param
 

	
 
    is equivalent of::
 

	
 
        param = Optional.extract(param)
 

	
 
    """
 
    def __init__(self, type_):
 
        self.type_ = type_
 

	
 
    def __repr__(self):
 
        return '<Optional:%s>' % self.type_.__repr__()
 

	
 
    def __call__(self):
 
        return self.getval()
 

	
 
    def getval(self):
 
        """
 
        returns value from this Optional instance
 
        """
 
        return self.type_
 

	
 
    @classmethod
 
    def extract(cls, val):
 
        if isinstance(val, cls):
 
            return val.getval()
 
        return val
 

	
 

	
 
def get_user_or_error(userid):
 
    """
 
    Get user by id or name or return JsonRPCError if not found
 

	
 
    :param userid:
 
    """
 
    user = UserModel().get_user(userid)
 
    if user is None:
 
        raise JSONRPCError("user `%s` does not exist" % userid)
 
    return user
 

	
 

	
 
def get_repo_or_error(repoid):
 
    """
 
    Get repo by id or name or return JsonRPCError if not found
 

	
 
    :param userid:
 
    """
 
    repo = RepoModel().get_repo(repoid)
 
    if repo is None:
 
        raise JSONRPCError('repository `%s` does not exist' % (repoid))
 
    return repo
 

	
 

	
 
def get_users_group_or_error(usersgroupid):
 
    """
 
    Get user group by id or name or return JsonRPCError if not found
 

	
 
    :param userid:
 
    """
 
    users_group = UserGroupModel().get_group(usersgroupid)
 
    if users_group is None:
 
        raise JSONRPCError('user group `%s` does not exist' % usersgroupid)
 
    return users_group
 

	
 
@@ -136,221 +136,240 @@ def get_perm_or_error(permid):
 
    """
 
    Get permission by id or name or return JsonRPCError if not found
 

	
 
    :param userid:
 
    """
 
    perm = PermissionModel().get_permission_by_name(permid)
 
    if perm is None:
 
        raise JSONRPCError('permission `%s` does not exist' % (permid))
 
    return perm
 

	
 

	
 
class ApiController(JSONRPCController):
 
    """
 
    API Controller
 

	
 

	
 
    Each method needs to have USER as argument this is then based on given
 
    API_KEY propagated as instance of user object
 

	
 
    Preferably this should be first argument also
 

	
 

	
 
    Each function should also **raise** JSONRPCError for any
 
    errors that happens
 

	
 
    """
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def pull(self, apiuser, repoid):
 
        """
 
        Dispatch pull action on given repo
 

	
 
        :param apiuser:
 
        :param repoid:
 
        """
 

	
 
        repo = get_repo_or_error(repoid)
 

	
 
        try:
 
            ScmModel().pull_changes(repo.repo_name,
 
                                    self.rhodecode_user.username)
 
            return 'Pulled from `%s`' % repo.repo_name
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'Unable to pull changes from `%s`' % repo.repo_name
 
            )
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def rescan_repos(self, apiuser, remove_obsolete=Optional(False)):
 
        """
 
        Dispatch rescan repositories action. If remove_obsolete is set
 
        than also delete repos that are in database but not in the filesystem.
 
        aka "clean zombies"
 

	
 
        :param apiuser:
 
        :param remove_obsolete:
 
        """
 

	
 
        try:
 
            rm_obsolete = Optional.extract(remove_obsolete)
 
            added, removed = repo2db_mapper(ScmModel().repo_scan(),
 
                                            remove_obsolete=rm_obsolete)
 
            return {'added': added, 'removed': removed}
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'Error occurred during rescan repositories action'
 
            )
 

	
 
    def invalidate_cache(self, apiuser, repoid):
 
        """
 
        Dispatch cache invalidation action on given repo
 

	
 
        :param apiuser:
 
        :param repoid:
 
        """
 
        repo = get_repo_or_error(repoid)
 
        if HasPermissionAnyApi('hg.admin')(user=apiuser) is False:
 
            # check if we have admin permission for this repo !
 
            if HasRepoPermissionAnyApi('repository.admin',
 
                                       'repository.write')(user=apiuser,
 
                                            repo_name=repo.repo_name) is False:
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid))
 

	
 
        try:
 
            invalidated_keys = ScmModel().mark_for_invalidation(repo.repo_name)
 
            Session().commit()
 
            return ('Cache for repository `%s` was invalidated: '
 
                    'invalidated cache keys: %s' % (repoid, invalidated_keys))
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'Error occurred during cache invalidation action'
 
            )
 

	
 
    def lock(self, apiuser, repoid, locked, userid=Optional(OAttr('apiuser'))):
 
    def lock(self, apiuser, repoid, locked=Optional(None),
 
             userid=Optional(OAttr('apiuser'))):
 
        """
 
        Set locking state on particular repository by given user, if
 
        this command is runned by non-admin account userid is set to user
 
        who is calling this method
 

	
 
        :param apiuser:
 
        :param repoid:
 
        :param userid:
 
        :param locked:
 
        """
 
        repo = get_repo_or_error(repoid)
 
        if HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            pass
 
        elif HasRepoPermissionAnyApi('repository.admin',
 
                                     'repository.write')(user=apiuser,
 
                                                         repo_name=repo.repo_name):
 
            #make sure normal user does not pass someone else userid,
 
            #he is not allowed to do that
 
            if not isinstance(userid, Optional) and userid != apiuser.user_id:
 
                raise JSONRPCError(
 
                    'userid is not the same as your user'
 
                )
 
        else:
 
            raise JSONRPCError('repository `%s` does not exist' % (repoid))
 

	
 
        if isinstance(userid, Optional):
 
            userid = apiuser.user_id
 

	
 
        user = get_user_or_error(userid)
 

	
 
        if isinstance(locked, Optional):
 
            lockobj = Repository.getlock(repo)
 

	
 
            if lockobj[0] is None:
 
                return ('Repo `%s` not locked. Locked=`False`.'
 
                        % (repo.repo_name))
 
            else:
 
                userid, time_ = lockobj
 
                user = get_user_or_error(userid)
 

	
 
                return ('Repo `%s` locked by `%s`. Locked=`True`. '
 
                        'Locked since: `%s`'
 
                    % (repo.repo_name, user.username,
 
                       h.fmt_date(time_to_datetime(time_))))
 

	
 
        else:
 
        locked = str2bool(locked)
 
        try:
 
            if locked:
 
                Repository.lock(repo, user.user_id)
 
            else:
 
                Repository.unlock(repo)
 

	
 
            return ('User `%s` set lock state for repo `%s` to `%s`'
 
                    % (user.username, repo.repo_name, locked))
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'Error occurred locking repository `%s`' % repo.repo_name
 
            )
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def show_ip(self, apiuser, userid):
 
        """
 
        Shows IP address as seen from RhodeCode server, together with all
 
        defined IP addresses for given user
 

	
 
        :param apiuser:
 
        :param userid:
 
        """
 
        user = get_user_or_error(userid)
 
        ips = UserIpMap.query().filter(UserIpMap.user == user).all()
 
        return dict(
 
            ip_addr_server=self.ip_addr,
 
            user_ips=ips
 
        )
 

	
 
    def get_user(self, apiuser, userid=Optional(OAttr('apiuser'))):
 
        """"
 
        Get a user by username, or userid, if userid is given
 

	
 
        :param apiuser:
 
        :param userid:
 
        """
 
        if HasPermissionAnyApi('hg.admin')(user=apiuser) is False:
 
            #make sure normal user does not pass someone else userid,
 
            #he is not allowed to do that
 
            if not isinstance(userid, Optional) and userid != apiuser.user_id:
 
                raise JSONRPCError(
 
                    'userid is not the same as your user'
 
                )
 

	
 
        if isinstance(userid, Optional):
 
            userid = apiuser.user_id
 

	
 
        user = get_user_or_error(userid)
 
        data = user.get_api_data()
 
        data['permissions'] = AuthUser(user_id=user.user_id).permissions
 
        return data
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def get_users(self, apiuser):
 
        """"
 
        Get all users
 

	
 
        :param apiuser:
 
        """
 

	
 
        result = []
 
        for user in UserModel().get_all():
 
            result.append(user.get_api_data())
 
        return result
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def create_user(self, apiuser, username, email, password,
 
                    firstname=Optional(None), lastname=Optional(None),
 
                    active=Optional(True), admin=Optional(False),
 
                    ldap_dn=Optional(None)):
 
        """
 
        Create new user
 

	
 
        :param apiuser:
 
        :param username:
 
        :param email:
 
        :param password:
 
        :param firstname:
 
        :param lastname:
 
        :param active:
 
        :param admin:
 
        :param ldap_dn:
 
        """
 

	
 
        if UserModel().get_by_username(username):
 
            raise JSONRPCError("user `%s` already exist" % username)
 

	
 
        if UserModel().get_by_email(email, case_insensitive=True):
 
            raise JSONRPCError("email `%s` already exist" % email)
 

	
 
        if Optional.extract(ldap_dn):
 
            # generate temporary password if ldap_dn
 
            password = PasswordGenerator().gen_password(length=8)
 

	
rhodecode/model/db.py
Show inline comments
 
@@ -909,192 +909,196 @@ class Repository(Base, BaseModel):
 
        p += self.repo_name.split(Repository.url_sep())
 
        return os.path.join(*p)
 

	
 
    @property
 
    def cache_keys(self):
 
        """
 
        Returns associated cache keys for that repo
 
        """
 
        return CacheInvalidation.query()\
 
            .filter(CacheInvalidation.cache_args == self.repo_name)\
 
            .order_by(CacheInvalidation.cache_key)\
 
            .all()
 

	
 
    def get_new_name(self, repo_name):
 
        """
 
        returns new full repository name based on assigned group and new new
 

	
 
        :param group_name:
 
        """
 
        path_prefix = self.group.full_path_splitted if self.group else []
 
        return Repository.url_sep().join(path_prefix + [repo_name])
 

	
 
    @property
 
    def _ui(self):
 
        """
 
        Creates an db based ui object for this repository
 
        """
 
        from rhodecode.lib.utils import make_ui
 
        return make_ui('db', clear_session=False)
 

	
 
    @classmethod
 
    def inject_ui(cls, repo, extras={}):
 
        from rhodecode.lib.vcs.backends.hg import MercurialRepository
 
        from rhodecode.lib.vcs.backends.git import GitRepository
 
        required = (MercurialRepository, GitRepository)
 
        if not isinstance(repo, required):
 
            raise Exception('repo must be instance of %s' % required)
 

	
 
        # inject ui extra param to log this action via push logger
 
        for k, v in extras.items():
 
            repo._repo.ui.setconfig('rhodecode_extras', k, v)
 

	
 
    @classmethod
 
    def is_valid(cls, repo_name):
 
        """
 
        returns True if given repo name is a valid filesystem repository
 

	
 
        :param cls:
 
        :param repo_name:
 
        """
 
        from rhodecode.lib.utils import is_valid_repo
 

	
 
        return is_valid_repo(repo_name, cls.base_path())
 

	
 
    def get_api_data(self):
 
        """
 
        Common function for generating repo api data
 

	
 
        """
 
        repo = self
 
        data = dict(
 
            repo_id=repo.repo_id,
 
            repo_name=repo.repo_name,
 
            repo_type=repo.repo_type,
 
            clone_uri=repo.clone_uri,
 
            private=repo.private,
 
            created_on=repo.created_on,
 
            description=repo.description,
 
            landing_rev=repo.landing_rev,
 
            owner=repo.user.username,
 
            fork_of=repo.fork.repo_name if repo.fork else None,
 
            enable_statistics=repo.enable_statistics,
 
            enable_locking=repo.enable_locking,
 
            enable_downloads=repo.enable_downloads,
 
            last_changeset=repo.changeset_cache
 
        )
 
        rc_config = RhodeCodeSetting.get_app_settings()
 
        repository_fields = str2bool(rc_config.get('rhodecode_repository_fields'))
 
        if repository_fields:
 
            for f in self.extra_fields:
 
                data[f.field_key_prefixed] = f.field_value
 

	
 
        return data
 

	
 
    @classmethod
 
    def lock(cls, repo, user_id):
 
        repo.locked = [user_id, time.time()]
 
        Session().add(repo)
 
        Session().commit()
 

	
 
    @classmethod
 
    def unlock(cls, repo):
 
        repo.locked = None
 
        Session().add(repo)
 
        Session().commit()
 

	
 
    @classmethod
 
    def getlock(cls, repo):
 
        return repo.locked
 

	
 
    @property
 
    def last_db_change(self):
 
        return self.updated_on
 

	
 
    def clone_url(self, **override):
 
        from pylons import url
 
        from urlparse import urlparse
 
        import urllib
 
        parsed_url = urlparse(url('home', qualified=True))
 
        default_clone_uri = '%(scheme)s://%(user)s%(pass)s%(netloc)s%(prefix)s%(path)s'
 
        decoded_path = safe_unicode(urllib.unquote(parsed_url.path))
 
        args = {
 
           'user': '',
 
           'pass': '',
 
           'scheme': parsed_url.scheme,
 
           'netloc': parsed_url.netloc,
 
           'prefix': decoded_path,
 
           'path': self.repo_name
 
        }
 

	
 
        args.update(override)
 
        return default_clone_uri % args
 

	
 
    #==========================================================================
 
    # SCM PROPERTIES
 
    #==========================================================================
 

	
 
    def get_changeset(self, rev=None):
 
        return get_changeset_safe(self.scm_instance, rev)
 

	
 
    def get_landing_changeset(self):
 
        """
 
        Returns landing changeset, or if that doesn't exist returns the tip
 
        """
 
        cs = self.get_changeset(self.landing_rev) or self.get_changeset()
 
        return cs
 

	
 
    def update_changeset_cache(self, cs_cache=None):
 
        """
 
        Update cache of last changeset for repository, keys should be::
 

	
 
            short_id
 
            raw_id
 
            revision
 
            message
 
            date
 
            author
 

	
 
        :param cs_cache:
 
        """
 
        from rhodecode.lib.vcs.backends.base import BaseChangeset
 
        if cs_cache is None:
 
            cs_cache = EmptyChangeset()
 
            # use no-cache version here
 
            scm_repo = self.scm_instance_no_cache
 
            if scm_repo:
 
                cs_cache = scm_repo.get_changeset()
 

	
 
        if isinstance(cs_cache, BaseChangeset):
 
            cs_cache = cs_cache.__json__()
 

	
 
        if (cs_cache != self.changeset_cache or not self.changeset_cache):
 
            _default = datetime.datetime.fromtimestamp(0)
 
            last_change = cs_cache.get('date') or _default
 
            log.debug('updated repo %s with new cs cache %s' % (self, cs_cache))
 
            self.updated_on = last_change
 
            self.changeset_cache = cs_cache
 
            Session().add(self)
 
            Session().commit()
 
        else:
 
            log.debug('Skipping repo:%s already with latest changes' % self)
 

	
 
    @property
 
    def tip(self):
 
        return self.get_changeset('tip')
 

	
 
    @property
 
    def author(self):
 
        return self.tip.author
 

	
 
    @property
 
    def last_change(self):
 
        return self.scm_instance.last_change
 

	
 
    def get_comments(self, revisions=None):
 
        """
 
        Returns comments for this repository grouped by revisions
 

	
 
        :param revisions: filter query by revisions only
 
        """
 
        cmts = ChangesetComment.query()\
 
            .filter(ChangesetComment.repo == self)
 
        if revisions:
 
            cmts = cmts.filter(ChangesetComment.revision.in_(revisions))
 
        grouped = defaultdict(list)
 
        for cmt in cmts.all():
rhodecode/tests/api/api_base.py
Show inline comments
 
@@ -277,192 +277,204 @@ class BaseTestApi(object):
 

	
 
        expected = {'added': [], 'removed': []}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(ScmModel, 'repo_scan', crash)
 
    def test_api_rescann_error(self):
 
        id_, params = _build_data(self.apikey, 'rescan_repos',)
 
        response = api_call(self, params)
 

	
 
        expected = 'Error occurred during rescan repositories action'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_invalidate_cache(self):
 
        id_, params = _build_data(self.apikey, 'invalidate_cache',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = ("Cache for repository `%s` was invalidated: "
 
                    "invalidated cache keys: %s" % (self.REPO,
 
                                                    [unicode(self.REPO)]))
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(ScmModel, 'mark_for_invalidation', crash)
 
    def test_api_invalidate_cache_error(self):
 
        id_, params = _build_data(self.apikey, 'invalidate_cache',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = 'Error occurred during cache invalidation action'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_aquire(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  repoid=self.REPO,
 
                                  locked=True)
 
        response = api_call(self, params)
 
        expected = ('User `%s` set lock state for repo `%s` to `%s`'
 
                   % (TEST_USER_ADMIN_LOGIN, self.REPO, True))
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_aquire_by_non_admin(self):
 
        repo_name = 'api_delete_me'
 
        create_repo(repo_name, self.REPO_TYPE, owner=self.TEST_USER_LOGIN)
 
        try:
 
            id_, params = _build_data(self.apikey_regular, 'lock',
 
                                      repoid=repo_name,
 
                                      locked=True)
 
            response = api_call(self, params)
 
            expected = ('User `%s` set lock state for repo `%s` to `%s`'
 
                       % (self.TEST_USER_LOGIN, repo_name, True))
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            destroy_repo(repo_name)
 

	
 
    def test_api_lock_repo_lock_aquire_non_admin_with_userid(self):
 
        repo_name = 'api_delete_me'
 
        create_repo(repo_name, self.REPO_TYPE, owner=self.TEST_USER_LOGIN)
 
        try:
 
            id_, params = _build_data(self.apikey_regular, 'lock',
 
                                      userid=TEST_USER_ADMIN_LOGIN,
 
                                      repoid=repo_name,
 
                                      locked=True)
 
            response = api_call(self, params)
 
            expected = 'userid is not the same as your user'
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            destroy_repo(repo_name)
 

	
 
    def test_api_lock_repo_lock_aquire_non_admin_not_his_repo(self):
 
        id_, params = _build_data(self.apikey_regular, 'lock',
 
                                  repoid=self.REPO,
 
                                  locked=True)
 
        response = api_call(self, params)
 
        expected = 'repository `%s` does not exist' % (self.REPO)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_release(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  repoid=self.REPO,
 
                                  locked=False)
 
        response = api_call(self, params)
 
        expected = ('User `%s` set lock state for repo `%s` to `%s`'
 
                   % (TEST_USER_ADMIN_LOGIN, self.REPO, False))
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_aquire_optional_userid(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  repoid=self.REPO,
 
                                  locked=True)
 
        response = api_call(self, params)
 
        expected = ('User `%s` set lock state for repo `%s` to `%s`'
 
                   % (TEST_USER_ADMIN_LOGIN, self.REPO, True))
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_optional_locked(self):
 
        from rhodecode.lib import helpers
 
        from rhodecode.lib.utils2 import  time_to_datetime
 
        _locked_since = helpers.fmt_date(time_to_datetime(Repository\
 
                                    .get_by_repo_name(self.REPO).locked[1]))
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 
        expected = ('Repo `%s` locked by `%s`. Locked=`True`. Locked since: `%s`'
 
                   % (self.REPO, TEST_USER_ADMIN_LOGIN, _locked_since))
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(Repository, 'lock', crash)
 
    def test_api_lock_error(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  repoid=self.REPO,
 
                                  locked=True)
 
        response = api_call(self, params)
 

	
 
        expected = 'Error occurred locking repository `%s`' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_existing_user(self):
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=TEST_USER_ADMIN_LOGIN,
 
                                  email='test@foo.com',
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "user `%s` already exist" % TEST_USER_ADMIN_LOGIN
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_user_with_existing_email(self):
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=TEST_USER_ADMIN_LOGIN + 'new',
 
                                  email=TEST_USER_REGULAR_EMAIL,
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "email `%s` already exist" % TEST_USER_REGULAR_EMAIL
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_user(self):
 
        username = 'test_new_api_user'
 
        email = username + "@foo.com"
 

	
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=username,
 
                                  email=email,
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        usr = UserModel().get_by_username(username)
 
        ret = dict(
 
            msg='created new user `%s`' % username,
 
            user=jsonify(usr.get_api_data())
 
        )
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        UserModel().delete(usr.user_id)
 
        Session().commit()
 

	
 
    @mock.patch.object(UserModel, 'create_or_update', crash)
 
    def test_api_create_user_when_exception_happened(self):
 

	
 
        username = 'test_new_api_user'
 
        email = username + "@foo.com"
 

	
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=username,
 
                                  email=email,
 
                                  password='trololo')
 
        response = api_call(self, params)
 
        expected = 'failed to create user `%s`' % username
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_delete_user(self):
 
        usr = UserModel().create_or_update(username=u'test_user',
 
                                           password=u'qweqwe',
 
                                           email=u'u232@rhodecode.org',
 
                                           firstname=u'u1', lastname=u'u1')
 
        Session().commit()
 
        username = usr.username
 
        email = usr.email
 
        usr_id = usr.user_id
 
        ## DELETE THIS USER NOW
 

	
 
        id_, params = _build_data(self.apikey, 'delete_user',
 
                                  userid=username,)
 
        response = api_call(self, params)
 

	
 
        ret = {'msg': 'deleted user ID:%s %s' % (usr_id, username),
 
               'user': None}
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserModel, 'delete', crash)
 
    def test_api_delete_user_when_exception_happened(self):
 
        usr = UserModel().create_or_update(username=u'test_user',
 
                                           password=u'qweqwe',
 
                                           email=u'u232@rhodecode.org',
 
                                           firstname=u'u1', lastname=u'u1')
 
        Session().commit()
 
        username = usr.username
 

	
0 comments (0 inline, 0 general)