Changeset - b8f929bff7e3
[Not reviewed]
rhodecode/lib/auth.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.lib.auth
 
    ~~~~~~~~~~~~~~~~~~
 

	
 
    authentication and permission libraries
 

	
 
    :created_on: Apr 4, 2010
 
    :author: marcink
 
    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import random
 
import logging
 
import traceback
 
import hashlib
 

	
 
from tempfile import _RandomNameSequence
 
from decorator import decorator
 

	
 
from pylons import config, url, request
 
from pylons.controllers.util import abort, redirect
 
from pylons.i18n.translation import _
 
from sqlalchemy.orm.exc import ObjectDeletedError
 

	
 
from rhodecode import __platform__, is_windows, is_unix
 
from rhodecode.model.meta import Session
 

	
 
from rhodecode.lib.utils2 import str2bool, safe_unicode
 
from rhodecode.lib.exceptions import LdapPasswordError, LdapUsernameError
 
from rhodecode.lib.utils import get_repo_slug, get_repos_group_slug
 
from rhodecode.lib.auth_ldap import AuthLdap
 

	
 
from rhodecode.model import meta
 
from rhodecode.model.user import UserModel
 
from rhodecode.model.db import Permission, RhodeCodeSetting, User, UserIpMap
 
from rhodecode.lib.caching_query import FromCache
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class PasswordGenerator(object):
 
    """
 
    This is a simple class for generating password from different sets of
 
    characters
 
    usage::
 

	
 
        passwd_gen = PasswordGenerator()
 
        #print 8-letter password containing only big and small letters
 
            of alphabet
 
        passwd_gen.gen_password(8, passwd_gen.ALPHABETS_BIG_SMALL)
 
    """
 
    ALPHABETS_NUM = r'''1234567890'''
 
    ALPHABETS_SMALL = r'''qwertyuiopasdfghjklzxcvbnm'''
 
    ALPHABETS_BIG = r'''QWERTYUIOPASDFGHJKLZXCVBNM'''
 
    ALPHABETS_SPECIAL = r'''`-=[]\;',./~!@#$%^&*()_+{}|:"<>?'''
 
    ALPHABETS_FULL = ALPHABETS_BIG + ALPHABETS_SMALL \
 
        + ALPHABETS_NUM + ALPHABETS_SPECIAL
 
    ALPHABETS_ALPHANUM = ALPHABETS_BIG + ALPHABETS_SMALL + ALPHABETS_NUM
 
    ALPHABETS_BIG_SMALL = ALPHABETS_BIG + ALPHABETS_SMALL
 
    ALPHABETS_ALPHANUM_BIG = ALPHABETS_BIG + ALPHABETS_NUM
 
    ALPHABETS_ALPHANUM_SMALL = ALPHABETS_SMALL + ALPHABETS_NUM
 

	
 
    def __init__(self, passwd=''):
 
        self.passwd = passwd
 

	
 
    def gen_password(self, length, type_=None):
 
        if type_ is None:
 
            type_ = self.ALPHABETS_FULL
 
        self.passwd = ''.join([random.choice(type_) for _ in xrange(length)])
 
        return self.passwd
 

	
 

	
 
class RhodeCodeCrypto(object):
 

	
 
    @classmethod
 
    def hash_string(cls, str_):
 
        """
 
        Cryptographic function used for password hashing based on pybcrypt
 
        or pycrypto in windows
 

	
 
        :param password: password to hash
 
        """
 
        if is_windows:
 
            from hashlib import sha256
 
            return sha256(str_).hexdigest()
 
        elif is_unix:
 
            import bcrypt
 
            return bcrypt.hashpw(str_, bcrypt.gensalt(10))
 
        else:
 
            raise Exception('Unknown or unsupported platform %s' \
 
                            % __platform__)
 

	
 
    @classmethod
 
    def hash_check(cls, password, hashed):
 
        """
 
        Checks matching password with it's hashed value, runs different
 
        implementation based on platform it runs on
 

	
 
        :param password: password
 
        :param hashed: password in hashed form
 
        """
 

	
 
        if is_windows:
 
            from hashlib import sha256
 
            return sha256(password).hexdigest() == hashed
 
        elif is_unix:
 
            import bcrypt
 
            return bcrypt.hashpw(password, hashed) == hashed
 
        else:
 
            raise Exception('Unknown or unsupported platform %s' \
 
                            % __platform__)
 

	
 

	
 
def get_crypt_password(password):
 
    return RhodeCodeCrypto.hash_string(password)
 

	
 

	
 
def check_password(password, hashed):
 
    return RhodeCodeCrypto.hash_check(password, hashed)
 

	
 

	
 
def generate_api_key(str_, salt=None):
 
    """
 
    Generates API KEY from given string
 

	
 
    :param str_:
 
    :param salt:
 
    """
 

	
 
    if salt is None:
 
        salt = _RandomNameSequence().next()
 

	
 
    return hashlib.sha1(str_ + salt).hexdigest()
 

	
 

	
 
def authfunc(environ, username, password):
 
    """
 
    Dummy authentication wrapper function used in Mercurial and Git for
 
    access control.
 

	
 
    :param environ: needed only for using in Basic auth
 
    """
 
    return authenticate(username, password)
 

	
 

	
 
def authenticate(username, password):
 
    """
 
    Authentication function used for access control,
 
    firstly checks for db authentication then if ldap is enabled for ldap
 
    authentication, also creates ldap user if not in database
 

	
 
    :param username: username
 
    :param password: password
 
    """
 

	
 
    user_model = UserModel()
 
    user = User.get_by_username(username)
 

	
 
    log.debug('Authenticating user using RhodeCode account')
 
    if user is not None and not user.ldap_dn:
 
        if user.active:
 
            if user.username == 'default' and user.active:
 
                log.info('user %s authenticated correctly as anonymous user' %
 
                         username)
 
                return True
 

	
 
            elif user.username == username and check_password(password,
 
                                                              user.password):
 
                log.info('user %s authenticated correctly' % username)
 
                return True
 
        else:
 
            log.warning('user %s tried auth but is disabled' % username)
 

	
 
    else:
 
        log.debug('Regular authentication failed')
 
        user_obj = User.get_by_username(username, case_insensitive=True)
 

	
 
        if user_obj is not None and not user_obj.ldap_dn:
 
            log.debug('this user already exists as non ldap')
 
            return False
 

	
 
        ldap_settings = RhodeCodeSetting.get_ldap_settings()
 
        #======================================================================
 
        # FALLBACK TO LDAP AUTH IF ENABLE
 
        #======================================================================
 
        if str2bool(ldap_settings.get('ldap_active')):
 
            log.debug("Authenticating user using ldap")
 
            kwargs = {
 
                  'server': ldap_settings.get('ldap_host', ''),
 
                  'base_dn': ldap_settings.get('ldap_base_dn', ''),
 
                  'port': ldap_settings.get('ldap_port'),
 
                  'bind_dn': ldap_settings.get('ldap_dn_user'),
 
                  'bind_pass': ldap_settings.get('ldap_dn_pass'),
 
                  'tls_kind': ldap_settings.get('ldap_tls_kind'),
 
                  'tls_reqcert': ldap_settings.get('ldap_tls_reqcert'),
 
                  'ldap_filter': ldap_settings.get('ldap_filter'),
 
                  'search_scope': ldap_settings.get('ldap_search_scope'),
 
                  'attr_login': ldap_settings.get('ldap_attr_login'),
 
                  'ldap_version': 3,
 
                  }
 
            log.debug('Checking for ldap authentication')
 
            try:
 
                aldap = AuthLdap(**kwargs)
 
                (user_dn, ldap_attrs) = aldap.authenticate_ldap(username,
 
                                                                password)
 
                log.debug('Got ldap DN response %s' % user_dn)
 

	
 
                get_ldap_attr = lambda k: ldap_attrs.get(ldap_settings\
 
                                                           .get(k), [''])[0]
 

	
 
                user_attrs = {
 
                 'name': safe_unicode(get_ldap_attr('ldap_attr_firstname')),
 
                 'lastname': safe_unicode(get_ldap_attr('ldap_attr_lastname')),
 
                 'email': get_ldap_attr('ldap_attr_email'),
 
                 'active': 'hg.register.auto_activate' in User\
 
                    .get_by_username('default').AuthUser.permissions['global']
 
                }
 

	
 
                # don't store LDAP password since we don't need it. Override
 
                # with some random generated password
 
                _password = PasswordGenerator().gen_password(length=8)
 
                # create this user on the fly if it doesn't exist in rhodecode
 
                # database
 
                if user_model.create_ldap(username, _password, user_dn,
 
                                          user_attrs):
 
                    log.info('created new ldap user %s' % username)
 

	
 
                Session().commit()
 
                return True
 
            except (LdapUsernameError, LdapPasswordError,):
 
                pass
 
            except (Exception,):
 
                log.error(traceback.format_exc())
 
                pass
 
    return False
 

	
 

	
 
def login_container_auth(username):
 
    user = User.get_by_username(username)
 
    if user is None:
 
        user_attrs = {
 
            'name': username,
 
            'lastname': None,
 
            'email': None,
 
            'active': 'hg.register.auto_activate' in User\
 
               .get_by_username('default').AuthUser.permissions['global']
 
        }
 
        user = UserModel().create_for_container_auth(username, user_attrs)
 
        if not user:
 
            return None
 
        log.info('User %s was created by container authentication' % username)
 

	
 
    if not user.active:
 
        return None
 

	
 
    user.update_lastlogin()
 
    Session().commit()
 

	
 
    log.debug('User %s is now logged in by container authentication',
 
              user.username)
 
    return user
 

	
 

	
 
def get_container_username(environ, config, clean_username=False):
 
    """
 
    Get's the container_auth username (or email). It tries to get username
 
    from REMOTE_USER if container_auth_enabled is enabled, if that fails
 
    it tries to get username from HTTP_X_FORWARDED_USER if proxypass_auth_enabled
 
    is enabled. clean_username extracts the username from this data if it's
 
    having @ in it.
 

	
 
    :param environ:
 
    :param config:
 
    :param clean_username:
 
    """
 
    username = None
 

	
 
    if str2bool(config.get('container_auth_enabled', False)):
 
        from paste.httpheaders import REMOTE_USER
 
        username = REMOTE_USER(environ)
 
        log.debug('extracted REMOTE_USER:%s' % (username))
 

	
 
    if not username and str2bool(config.get('proxypass_auth_enabled', False)):
 
        username = environ.get('HTTP_X_FORWARDED_USER')
 
        log.debug('extracted HTTP_X_FORWARDED_USER:%s' % (username))
 

	
 
    if username and clean_username:
 
        # Removing realm and domain from username
 
        username = username.partition('@')[0]
 
        username = username.rpartition('\\')[2]
 
    log.debug('Received username %s from container' % username)
 

	
 
    return username
 

	
 

	
 
class CookieStoreWrapper(object):
 

	
 
    def __init__(self, cookie_store):
 
        self.cookie_store = cookie_store
 

	
 
    def __repr__(self):
 
        return 'CookieStore<%s>' % (self.cookie_store)
 

	
 
    def get(self, key, other=None):
 
        if isinstance(self.cookie_store, dict):
 
            return self.cookie_store.get(key, other)
 
        elif isinstance(self.cookie_store, AuthUser):
 
            return self.cookie_store.__dict__.get(key, other)
 

	
 

	
 
class  AuthUser(object):
 
    """
 
    A simple object that handles all attributes of user in RhodeCode
 

	
 
    It does lookup based on API key,given user, or user present in session
 
    Then it fills all required information for such user. It also checks if
 
    anonymous access is enabled and if so, it returns default user as logged
 
    in
 
    """
 

	
 
    def __init__(self, user_id=None, api_key=None, username=None, ip_addr=None):
 

	
 
        self.user_id = user_id
 
        self.api_key = None
 
        self.username = username
 
        self.ip_addr = ip_addr
 

	
 
        self.name = ''
 
        self.lastname = ''
 
        self.email = ''
 
        self.is_authenticated = False
 
        self.admin = False
 
        self.inherit_default_permissions = False
 
        self.permissions = {}
 
        self._api_key = api_key
 
        self.propagate_data()
 
        self._instance = None
 

	
 
    def propagate_data(self):
 
        user_model = UserModel()
 
        self.anonymous_user = User.get_by_username('default', cache=True)
 
        is_user_loaded = False
 

	
 
        # try go get user by api key
 
        if self._api_key and self._api_key != self.anonymous_user.api_key:
 
            log.debug('Auth User lookup by API KEY %s' % self._api_key)
 
            is_user_loaded = user_model.fill_data(self, api_key=self._api_key)
 
        # lookup by userid
 
        elif (self.user_id is not None and
 
              self.user_id != self.anonymous_user.user_id):
 
            log.debug('Auth User lookup by USER ID %s' % self.user_id)
 
            is_user_loaded = user_model.fill_data(self, user_id=self.user_id)
 
        # lookup by username
 
        elif self.username and \
 
            str2bool(config.get('container_auth_enabled', False)):
 

	
 
            log.debug('Auth User lookup by USER NAME %s' % self.username)
 
            dbuser = login_container_auth(self.username)
 
            if dbuser is not None:
 
                log.debug('filling all attributes to object')
 
                for k, v in dbuser.get_dict().items():
 
                    setattr(self, k, v)
 
                self.set_authenticated()
 
                is_user_loaded = True
 
        else:
 
            log.debug('No data in %s that could been used to log in' % self)
 

	
 
        if not is_user_loaded:
 
            # if we cannot authenticate user try anonymous
 
            if self.anonymous_user.active is True:
 
                user_model.fill_data(self, user_id=self.anonymous_user.user_id)
 
                # then we set this user is logged in
 
                self.is_authenticated = True
 
            else:
 
                self.user_id = None
 
                self.username = None
 
                self.is_authenticated = False
 

	
 
        if not self.username:
 
            self.username = 'None'
 

	
 
        log.debug('Auth User is now %s' % self)
 
        user_model.fill_perms(self)
 

	
 
    @property
 
    def is_admin(self):
 
        return self.admin
 

	
 
    @property
 
    def repos_admin(self):
 
        """
 
        Returns list of repositories you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['repositories'].iteritems()
 
                if x[1] == 'repository.admin']
 

	
 
    @property
 
    def groups_admin(self):
 
        """
 
        Returns list of repositories groups you're an admin of
 
        Returns list of repository groups you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['repositories_groups'].iteritems()
 
                if x[1] == 'group.admin']
 

	
 
    @property
 
    def ip_allowed(self):
 
        """
 
        Checks if ip_addr used in constructor is allowed from defined list of
 
        allowed ip_addresses for user
 

	
 
        :returns: boolean, True if ip is in allowed ip range
 
        """
 
        #check IP
 
        allowed_ips = AuthUser.get_allowed_ips(self.user_id, cache=True)
 
        if check_ip_access(source_ip=self.ip_addr, allowed_ips=allowed_ips):
 
            log.debug('IP:%s is in range of %s' % (self.ip_addr, allowed_ips))
 
            return True
 
        else:
 
            log.info('Access for IP:%s forbidden, '
 
                     'not in %s' % (self.ip_addr, allowed_ips))
 
            return False
 

	
 
    def __repr__(self):
 
        return "<AuthUser('id:%s:%s|%s')>" % (self.user_id, self.username,
 
                                              self.is_authenticated)
 

	
 
    def set_authenticated(self, authenticated=True):
 
        if self.user_id != self.anonymous_user.user_id:
 
            self.is_authenticated = authenticated
 

	
 
    def get_cookie_store(self):
 
        return {'username': self.username,
 
                'user_id': self.user_id,
 
                'is_authenticated': self.is_authenticated}
 

	
 
    @classmethod
 
    def from_cookie_store(cls, cookie_store):
 
        """
 
        Creates AuthUser from a cookie store
 

	
 
        :param cls:
 
        :param cookie_store:
 
        """
 
        user_id = cookie_store.get('user_id')
 
        username = cookie_store.get('username')
 
        api_key = cookie_store.get('api_key')
 
        return AuthUser(user_id, api_key, username)
 

	
 
    @classmethod
 
    def get_allowed_ips(cls, user_id, cache=False):
 
        _set = set()
 
        user_ips = UserIpMap.query().filter(UserIpMap.user_id == user_id)
 
        if cache:
 
            user_ips = user_ips.options(FromCache("sql_cache_short",
 
                                                  "get_user_ips_%s" % user_id))
 
        for ip in user_ips:
 
            try:
 
                _set.add(ip.ip_addr)
 
            except ObjectDeletedError:
 
                # since we use heavy caching sometimes it happens that we get
 
                # deleted objects here, we just skip them
 
                pass
 
        return _set or set(['0.0.0.0/0', '::/0'])
 

	
 

	
 
def set_available_permissions(config):
 
    """
 
    This function will propagate pylons globals with all available defined
 
    permission given in db. We don't want to check each time from db for new
 
    permissions since adding a new permission also requires application restart
 
    ie. to decorate new views with the newly created permission
 

	
 
    :param config: current pylons config instance
 

	
 
    """
 
    log.info('getting information about all available permissions')
 
    try:
 
        sa = meta.Session
 
        all_perms = sa.query(Permission).all()
 
    except Exception:
 
        pass
 
    finally:
 
        meta.Session.remove()
 

	
 
    config['available_permissions'] = [x.permission_name for x in all_perms]
 

	
 

	
 
#==============================================================================
 
# CHECK DECORATORS
 
#==============================================================================
 
class LoginRequired(object):
 
    """
 
    Must be logged in to execute this function else
 
    redirect to login page
 

	
 
    :param api_access: if enabled this checks only for valid auth token
 
        and grants access based on valid token
 
    """
 

	
 
    def __init__(self, api_access=False):
 
        self.api_access = api_access
 

	
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 

	
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        cls = fargs[0]
 
        user = cls.rhodecode_user
 
        loc = "%s:%s" % (cls.__class__.__name__, func.__name__)
 

	
 
        #check IP
 
        ip_access_ok = True
 
        if not user.ip_allowed:
 
            from rhodecode.lib import helpers as h
 
            h.flash(h.literal(_('IP %s not allowed' % (user.ip_addr))),
 
                    category='warning')
 
            ip_access_ok = False
 

	
 
        api_access_ok = False
 
        if self.api_access:
 
            log.debug('Checking API KEY access for %s' % cls)
 
            if user.api_key == request.GET.get('api_key'):
 
                api_access_ok = True
 
            else:
 
                log.debug("API KEY token not valid")
 

	
 
        log.debug('Checking if %s is authenticated @ %s' % (user.username, loc))
 
        if (user.is_authenticated or api_access_ok) and ip_access_ok:
 
            reason = 'RegularAuth' if user.is_authenticated else 'APIAuth'
 
            log.info('user %s is authenticated and granted access to %s '
 
                     'using %s' % (user.username, loc, reason)
 
            )
 
            return func(*fargs, **fkwargs)
 
        else:
 
            log.warn('user %s NOT authenticated on func: %s' % (
 
                user, loc)
 
            )
 
            p = url.current()
 

	
 
            log.debug('redirecting to login page with %s' % p)
 
            return redirect(url('login_home', came_from=p))
 

	
 

	
 
class NotAnonymous(object):
 
    """
 
    Must be logged in to execute this function else
 
    redirect to login page"""
 

	
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 

	
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        cls = fargs[0]
 
        self.user = cls.rhodecode_user
 

	
 
        log.debug('Checking if user is not anonymous @%s' % cls)
 

	
 
        anonymous = self.user.username == 'default'
 

	
 
        if anonymous:
 
            p = url.current()
 

	
 
            import rhodecode.lib.helpers as h
 
            h.flash(_('You need to be a registered user to '
 
                      'perform this action'),
 
                    category='warning')
 
            return redirect(url('login_home', came_from=p))
 
        else:
 
            return func(*fargs, **fkwargs)
 

	
 

	
 
class PermsDecorator(object):
 
    """Base class for controller decorators"""
 

	
 
    def __init__(self, *required_perms):
 
        available_perms = config['available_permissions']
 
        for perm in required_perms:
 
            if perm not in available_perms:
 
                raise Exception("'%s' permission is not defined" % perm)
 
        self.required_perms = set(required_perms)
 
        self.user_perms = None
 

	
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 

	
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        cls = fargs[0]
 
        self.user = cls.rhodecode_user
 
        self.user_perms = self.user.permissions
 
        log.debug('checking %s permissions %s for %s %s',
 
           self.__class__.__name__, self.required_perms, cls, self.user)
 

	
 
        if self.check_permissions():
 
            log.debug('Permission granted for %s %s' % (cls, self.user))
 
            return func(*fargs, **fkwargs)
 

	
 
        else:
 
            log.debug('Permission denied for %s %s' % (cls, self.user))
 
            anonymous = self.user.username == 'default'
 

	
 
            if anonymous:
 
                p = url.current()
 

	
 
                import rhodecode.lib.helpers as h
 
                h.flash(_('You need to be a signed in to '
 
                          'view this page'),
 
                        category='warning')
 
                return redirect(url('login_home', came_from=p))
 

	
 
            else:
 
                # redirect with forbidden ret code
 
                return abort(403)
 

	
 
    def check_permissions(self):
 
        """Dummy function for overriding"""
 
        raise Exception('You have to write this function in child class')
 

	
 

	
 
class HasPermissionAllDecorator(PermsDecorator):
 
    """
 
    Checks for access permission for all given predicates. All of them
 
    have to be meet in order to fulfill the request
 
    """
 

	
 
    def check_permissions(self):
 
        if self.required_perms.issubset(self.user_perms.get('global')):
 
            return True
 
        return False
 

	
 

	
 
class HasPermissionAnyDecorator(PermsDecorator):
 
    """
 
    Checks for access permission for any of given predicates. In order to
 
    fulfill the request any of predicates must be meet
 
    """
 

	
 
    def check_permissions(self):
 
        if self.required_perms.intersection(self.user_perms.get('global')):
 
            return True
 
        return False
 

	
 

	
 
class HasRepoPermissionAllDecorator(PermsDecorator):
 
    """
 
    Checks for access permission for all given predicates for specific
 
    repository. All of them have to be meet in order to fulfill the request
 
    """
 

	
 
    def check_permissions(self):
 
        repo_name = get_repo_slug(request)
 
        try:
 
            user_perms = set([self.user_perms['repositories'][repo_name]])
 
        except KeyError:
 
            return False
 
        if self.required_perms.issubset(user_perms):
 
            return True
 
        return False
 

	
 

	
 
class HasRepoPermissionAnyDecorator(PermsDecorator):
 
    """
 
    Checks for access permission for any of given predicates for specific
 
    repository. In order to fulfill the request any of predicates must be meet
 
    """
 

	
 
    def check_permissions(self):
 
        repo_name = get_repo_slug(request)
 
        try:
 
            user_perms = set([self.user_perms['repositories'][repo_name]])
 
        except KeyError:
 
            return False
 

	
 
        if self.required_perms.intersection(user_perms):
 
            return True
 
        return False
 

	
 

	
 
class HasReposGroupPermissionAllDecorator(PermsDecorator):
 
    """
 
    Checks for access permission for all given predicates for specific
 
    repository. All of them have to be meet in order to fulfill the request
 
    """
 

	
 
    def check_permissions(self):
 
        group_name = get_repos_group_slug(request)
 
        try:
 
            user_perms = set([self.user_perms['repositories_groups'][group_name]])
 
        except KeyError:
 
            return False
 

	
 
        if self.required_perms.issubset(user_perms):
 
            return True
 
        return False
 

	
 

	
 
class HasReposGroupPermissionAnyDecorator(PermsDecorator):
 
    """
 
    Checks for access permission for any of given predicates for specific
 
    repository. In order to fulfill the request any of predicates must be meet
 
    """
 

	
 
    def check_permissions(self):
 
        group_name = get_repos_group_slug(request)
 
        try:
 
            user_perms = set([self.user_perms['repositories_groups'][group_name]])
 
        except KeyError:
 
            return False
 

	
 
        if self.required_perms.intersection(user_perms):
 
            return True
 
        return False
 

	
 

	
 
#==============================================================================
 
# CHECK FUNCTIONS
 
#==============================================================================
 
class PermsFunction(object):
 
    """Base function for other check functions"""
 

	
 
    def __init__(self, *perms):
 
        available_perms = config['available_permissions']
 

	
 
        for perm in perms:
 
            if perm not in available_perms:
 
                raise Exception("'%s' permission is not defined" % perm)
 
        self.required_perms = set(perms)
 
        self.user_perms = None
 
        self.repo_name = None
 
        self.group_name = None
 

	
 
    def __call__(self, check_location=''):
 
        #TODO: put user as attribute here
 
        user = request.user
 
        cls_name = self.__class__.__name__
 
        check_scope = {
 
            'HasPermissionAll': '',
 
            'HasPermissionAny': '',
 
            'HasRepoPermissionAll': 'repo:%s' % self.repo_name,
 
            'HasRepoPermissionAny': 'repo:%s' % self.repo_name,
 
            'HasReposGroupPermissionAll': 'group:%s' % self.group_name,
 
            'HasReposGroupPermissionAny': 'group:%s' % self.group_name,
 
        }.get(cls_name, '?')
 
        log.debug('checking cls:%s %s usr:%s %s @ %s', cls_name,
 
                  self.required_perms, user, check_scope,
 
                  check_location or 'unspecified location')
 
        if not user:
 
            log.debug('Empty request user')
 
            return False
 
        self.user_perms = user.permissions
 
        if self.check_permissions():
 
            log.debug('Permission to %s granted for user: %s @ %s', self.repo_name, user,
 
                      check_location or 'unspecified location')
 
            return True
 

	
 
        else:
 
            log.debug('Permission to %s denied for user: %s @ %s', self.repo_name, user,
 
                        check_location or 'unspecified location')
 
            return False
 

	
 
    def check_permissions(self):
 
        """Dummy function for overriding"""
 
        raise Exception('You have to write this function in child class')
 

	
 

	
 
class HasPermissionAll(PermsFunction):
 
    def check_permissions(self):
 
        if self.required_perms.issubset(self.user_perms.get('global')):
 
            return True
 
        return False
 

	
 

	
 
class HasPermissionAny(PermsFunction):
 
    def check_permissions(self):
 
        if self.required_perms.intersection(self.user_perms.get('global')):
 
            return True
 
        return False
 

	
 

	
 
class HasRepoPermissionAll(PermsFunction):
 
    def __call__(self, repo_name=None, check_location=''):
 
        self.repo_name = repo_name
 
        return super(HasRepoPermissionAll, self).__call__(check_location)
 

	
 
    def check_permissions(self):
 
        if not self.repo_name:
 
            self.repo_name = get_repo_slug(request)
 

	
 
        try:
 
            self._user_perms = set(
 
                [self.user_perms['repositories'][self.repo_name]]
 
            )
 
        except KeyError:
 
            return False
 
        if self.required_perms.issubset(self._user_perms):
 
            return True
 
        return False
 

	
 

	
 
class HasRepoPermissionAny(PermsFunction):
 
    def __call__(self, repo_name=None, check_location=''):
 
        self.repo_name = repo_name
 
        return super(HasRepoPermissionAny, self).__call__(check_location)
 

	
 
    def check_permissions(self):
 
        if not self.repo_name:
 
            self.repo_name = get_repo_slug(request)
 

	
 
        try:
 
            self._user_perms = set(
 
                [self.user_perms['repositories'][self.repo_name]]
 
            )
 
        except KeyError:
 
            return False
 
        if self.required_perms.intersection(self._user_perms):
 
            return True
 
        return False
 

	
 

	
 
class HasReposGroupPermissionAny(PermsFunction):
 
    def __call__(self, group_name=None, check_location=''):
 
        self.group_name = group_name
 
        return super(HasReposGroupPermissionAny, self).__call__(check_location)
 

	
 
    def check_permissions(self):
 
        try:
 
            self._user_perms = set(
 
                [self.user_perms['repositories_groups'][self.group_name]]
 
            )
 
        except KeyError:
 
            return False
 
        if self.required_perms.intersection(self._user_perms):
 
            return True
 
        return False
 

	
 

	
 
class HasReposGroupPermissionAll(PermsFunction):
 
    def __call__(self, group_name=None, check_location=''):
 
        self.group_name = group_name
 
        return super(HasReposGroupPermissionAll, self).__call__(check_location)
 

	
 
    def check_permissions(self):
 
        try:
 
            self._user_perms = set(
 
                [self.user_perms['repositories_groups'][self.group_name]]
 
            )
 
        except KeyError:
 
            return False
 
        if self.required_perms.issubset(self._user_perms):
 
            return True
 
        return False
 

	
 

	
 
#==============================================================================
 
# SPECIAL VERSION TO HANDLE MIDDLEWARE AUTH
 
#==============================================================================
 
class HasPermissionAnyMiddleware(object):
 
    def __init__(self, *perms):
 
        self.required_perms = set(perms)
 

	
 
    def __call__(self, user, repo_name):
 
        # repo_name MUST be unicode, since we handle keys in permission
 
        # dict by unicode
 
        repo_name = safe_unicode(repo_name)
 
        usr = AuthUser(user.user_id)
 
        try:
 
            self.user_perms = set([usr.permissions['repositories'][repo_name]])
 
        except Exception:
 
            log.error('Exception while accessing permissions %s' %
 
                      traceback.format_exc())
 
            self.user_perms = set()
 
        self.username = user.username
 
        self.repo_name = repo_name
 
        return self.check_permissions()
 

	
 
    def check_permissions(self):
 
        log.debug('checking VCS protocol '
 
                  'permissions %s for user:%s repository:%s', self.user_perms,
 
                                                self.username, self.repo_name)
 
        if self.required_perms.intersection(self.user_perms):
 
            log.debug('permission granted for user:%s on repo:%s' % (
 
                          self.username, self.repo_name
 
                     )
 
            )
 
            return True
 
        log.debug('permission denied for user:%s on repo:%s' % (
 
                      self.username, self.repo_name
 
                 )
 
        )
 
        return False
 

	
 

	
 
#==============================================================================
 
# SPECIAL VERSION TO HANDLE API AUTH
 
#==============================================================================
 
class _BaseApiPerm(object):
 
    def __init__(self, *perms):
 
        self.required_perms = set(perms)
 

	
 
    def __call__(self, check_location='unspecified', user=None, repo_name=None):
 
        cls_name = self.__class__.__name__
 
        check_scope = 'user:%s, repo:%s' % (user, repo_name)
 
        log.debug('checking cls:%s %s %s @ %s', cls_name,
 
                  self.required_perms, check_scope, check_location)
 
        if not user:
 
            log.debug('Empty User passed into arguments')
 
            return False
 

	
 
        ## process user
 
        if not isinstance(user, AuthUser):
 
            user = AuthUser(user.user_id)
 

	
 
        if self.check_permissions(user.permissions, repo_name):
 
            log.debug('Permission to %s granted for user: %s @ %s', repo_name,
 
                      user, check_location)
 
            return True
 

	
 
        else:
 
            log.debug('Permission to %s denied for user: %s @ %s', repo_name,
 
                      user, check_location)
 
            return False
 

	
 
    def check_permissions(self, perm_defs, repo_name):
 
        """
 
        implement in child class should return True if permissions are ok,
 
        False otherwise
 

	
 
        :param perm_defs: dict with permission definitions
 
        :param repo_name: repo name
 
        """
 
        raise NotImplementedError()
 

	
 

	
 
class HasPermissionAllApi(_BaseApiPerm):
 
    def __call__(self, user, check_location=''):
 
        return super(HasPermissionAllApi, self)\
 
            .__call__(check_location=check_location, user=user)
 

	
 
    def check_permissions(self, perm_defs, repo):
 
        if self.required_perms.issubset(perm_defs.get('global')):
 
            return True
 
        return False
 

	
 

	
 
class HasPermissionAnyApi(_BaseApiPerm):
 
    def __call__(self, user, check_location=''):
 
        return super(HasPermissionAnyApi, self)\
 
            .__call__(check_location=check_location, user=user)
 

	
 
    def check_permissions(self, perm_defs, repo):
 
        if self.required_perms.intersection(perm_defs.get('global')):
 
            return True
 
        return False
 

	
 

	
 
class HasRepoPermissionAllApi(_BaseApiPerm):
 
    def __call__(self, user, repo_name, check_location=''):
 
        return super(HasRepoPermissionAllApi, self)\
 
            .__call__(check_location=check_location, user=user,
 
                      repo_name=repo_name)
 

	
 
    def check_permissions(self, perm_defs, repo_name):
 

	
 
        try:
 
            self._user_perms = set(
 
                [perm_defs['repositories'][repo_name]]
 
            )
 
        except KeyError:
 
            log.warning(traceback.format_exc())
 
            return False
 
        if self.required_perms.issubset(self._user_perms):
 
            return True
 
        return False
 

	
 

	
 
class HasRepoPermissionAnyApi(_BaseApiPerm):
 
    def __call__(self, user, repo_name, check_location=''):
 
        return super(HasRepoPermissionAnyApi, self)\
 
            .__call__(check_location=check_location, user=user,
 
                      repo_name=repo_name)
 

	
 
    def check_permissions(self, perm_defs, repo_name):
 

	
 
        try:
 
            _user_perms = set(
 
                [perm_defs['repositories'][repo_name]]
 
            )
 
        except KeyError:
 
            log.warning(traceback.format_exc())
 
            return False
 
        if self.required_perms.intersection(_user_perms):
 
            return True
 
        return False
 

	
 

	
 
def check_ip_access(source_ip, allowed_ips=None):
 
    """
 
    Checks if source_ip is a subnet of any of allowed_ips.
 

	
 
    :param source_ip:
 
    :param allowed_ips: list of allowed ips together with mask
 
    """
 
    from rhodecode.lib import ipaddr
 
    log.debug('checking if ip:%s is subnet of %s' % (source_ip, allowed_ips))
 
    if isinstance(allowed_ips, (tuple, list, set)):
 
        for ip in allowed_ips:
 
            try:
 
                if ipaddr.IPAddress(source_ip) in ipaddr.IPNetwork(ip):
 
                    return True
 
                # for any case we cannot determine the IP, don't crash just
 
                # skip it and log as error, we want to say forbidden still when
 
                # sending bad IP
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                continue
 
    return False
rhodecode/lib/helpers.py
Show inline comments
 
"""Helper functions
 

	
 
Consists of functions to typically be used within templates, but also
 
available to Controllers. This module is available to both as 'h'.
 
"""
 
import random
 
import hashlib
 
import StringIO
 
import urllib
 
import math
 
import logging
 
import re
 
import urlparse
 
import textwrap
 

	
 
from datetime import datetime
 
from pygments.formatters.html import HtmlFormatter
 
from pygments import highlight as code_highlight
 
from pylons import url, request, config
 
from pylons.i18n.translation import _, ungettext
 
from hashlib import md5
 

	
 
from webhelpers.html import literal, HTML, escape
 
from webhelpers.html.tools import *
 
from webhelpers.html.builder import make_tag
 
from webhelpers.html.tags import auto_discovery_link, checkbox, css_classes, \
 
    end_form, file, form, hidden, image, javascript_link, link_to, \
 
    link_to_if, link_to_unless, ol, required_legend, select, stylesheet_link, \
 
    submit, text, password, textarea, title, ul, xml_declaration, radio
 
from webhelpers.html.tools import auto_link, button_to, highlight, \
 
    js_obfuscate, mail_to, strip_links, strip_tags, tag_re
 
from webhelpers.number import format_byte_size, format_bit_size
 
from webhelpers.pylonslib import Flash as _Flash
 
from webhelpers.pylonslib.secure_form import secure_form
 
from webhelpers.text import chop_at, collapse, convert_accented_entities, \
 
    convert_misc_entities, lchop, plural, rchop, remove_formatting, \
 
    replace_whitespace, urlify, truncate, wrap_paragraphs
 
from webhelpers.date import time_ago_in_words
 
from webhelpers.paginate import Page
 
from webhelpers.html.tags import _set_input_attrs, _set_id_attr, \
 
    convert_boolean_attrs, NotGiven, _make_safe_id_component
 

	
 
from rhodecode.lib.annotate import annotate_highlight
 
from rhodecode.lib.utils import repo_name_slug, get_custom_lexer
 
from rhodecode.lib.utils2 import str2bool, safe_unicode, safe_str, \
 
    get_changeset_safe, datetime_to_time, time_to_datetime, AttributeDict
 
from rhodecode.lib.markup_renderer import MarkupRenderer
 
from rhodecode.lib.vcs.exceptions import ChangesetDoesNotExistError
 
from rhodecode.lib.vcs.backends.base import BaseChangeset, EmptyChangeset
 
from rhodecode.config.conf import DATE_FORMAT, DATETIME_FORMAT
 
from rhodecode.model.changeset_status import ChangesetStatusModel
 
from rhodecode.model.db import URL_SEP, Permission
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
html_escape_table = {
 
    "&": "&amp;",
 
    '"': "&quot;",
 
    "'": "&apos;",
 
    ">": "&gt;",
 
    "<": "&lt;",
 
}
 

	
 

	
 
def html_escape(text):
 
    """Produce entities within text."""
 
    return "".join(html_escape_table.get(c, c) for c in text)
 

	
 

	
 
def shorter(text, size=20):
 
    postfix = '...'
 
    if len(text) > size:
 
        return text[:size - len(postfix)] + postfix
 
    return text
 

	
 

	
 
def _reset(name, value=None, id=NotGiven, type="reset", **attrs):
 
    """
 
    Reset button
 
    """
 
    _set_input_attrs(attrs, type, name, value)
 
    _set_id_attr(attrs, id, name)
 
    convert_boolean_attrs(attrs, ["disabled"])
 
    return HTML.input(**attrs)
 

	
 
reset = _reset
 
safeid = _make_safe_id_component
 

	
 

	
 
def FID(raw_id, path):
 
    """
 
    Creates a uniqe ID for filenode based on it's hash of path and revision
 
    it's safe to use in urls
 

	
 
    :param raw_id:
 
    :param path:
 
    """
 

	
 
    return 'C-%s-%s' % (short_id(raw_id), md5(safe_str(path)).hexdigest()[:12])
 

	
 

	
 
def get_token():
 
    """Return the current authentication token, creating one if one doesn't
 
    already exist.
 
    """
 
    token_key = "_authentication_token"
 
    from pylons import session
 
    if not token_key in session:
 
        try:
 
            token = hashlib.sha1(str(random.getrandbits(128))).hexdigest()
 
        except AttributeError:  # Python < 2.4
 
            token = hashlib.sha1(str(random.randrange(2 ** 128))).hexdigest()
 
        session[token_key] = token
 
        if hasattr(session, 'save'):
 
            session.save()
 
    return session[token_key]
 

	
 

	
 
class _GetError(object):
 
    """Get error from form_errors, and represent it as span wrapped error
 
    message
 

	
 
    :param field_name: field to fetch errors for
 
    :param form_errors: form errors dict
 
    """
 

	
 
    def __call__(self, field_name, form_errors):
 
        tmpl = """<span class="error_msg">%s</span>"""
 
        if form_errors and field_name in form_errors:
 
            return literal(tmpl % form_errors.get(field_name))
 

	
 
get_error = _GetError()
 

	
 

	
 
class _ToolTip(object):
 

	
 
    def __call__(self, tooltip_title, trim_at=50):
 
        """
 
        Special function just to wrap our text into nice formatted
 
        autowrapped text
 

	
 
        :param tooltip_title:
 
        """
 
        tooltip_title = escape(tooltip_title)
 
        tooltip_title = tooltip_title.replace('<', '&lt;').replace('>', '&gt;')
 
        return tooltip_title
 
tooltip = _ToolTip()
 

	
 

	
 
class _FilesBreadCrumbs(object):
 

	
 
    def __call__(self, repo_name, rev, paths):
 
        if isinstance(paths, str):
 
            paths = safe_unicode(paths)
 
        url_l = [link_to(repo_name, url('files_home',
 
                                        repo_name=repo_name,
 
                                        revision=rev, f_path=''),
 
                         class_='ypjax-link')]
 
        paths_l = paths.split('/')
 
        for cnt, p in enumerate(paths_l):
 
            if p != '':
 
                url_l.append(link_to(p,
 
                                     url('files_home',
 
                                         repo_name=repo_name,
 
                                         revision=rev,
 
                                         f_path='/'.join(paths_l[:cnt + 1])
 
                                         ),
 
                                     class_='ypjax-link'
 
                                     )
 
                             )
 

	
 
        return literal('/'.join(url_l))
 

	
 
files_breadcrumbs = _FilesBreadCrumbs()
 

	
 

	
 
class CodeHtmlFormatter(HtmlFormatter):
 
    """
 
    My code Html Formatter for source codes
 
    """
 

	
 
    def wrap(self, source, outfile):
 
        return self._wrap_div(self._wrap_pre(self._wrap_code(source)))
 

	
 
    def _wrap_code(self, source):
 
        for cnt, it in enumerate(source):
 
            i, t = it
 
            t = '<div id="L%s">%s</div>' % (cnt + 1, t)
 
            yield i, t
 

	
 
    def _wrap_tablelinenos(self, inner):
 
        dummyoutfile = StringIO.StringIO()
 
        lncount = 0
 
        for t, line in inner:
 
            if t:
 
                lncount += 1
 
            dummyoutfile.write(line)
 

	
 
        fl = self.linenostart
 
        mw = len(str(lncount + fl - 1))
 
        sp = self.linenospecial
 
        st = self.linenostep
 
        la = self.lineanchors
 
        aln = self.anchorlinenos
 
        nocls = self.noclasses
 
        if sp:
 
            lines = []
 

	
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if i % sp == 0:
 
                        if aln:
 
                            lines.append('<a href="#%s%d" class="special">%*d</a>' %
 
                                         (la, i, mw, i))
 
                        else:
 
                            lines.append('<span class="special">%*d</span>' % (mw, i))
 
                    else:
 
                        if aln:
 
                            lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
 
                        else:
 
                            lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 
        else:
 
            lines = []
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if aln:
 
                        lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
 
                    else:
 
                        lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 

	
 
        # in case you wonder about the seemingly redundant <div> here: since the
 
        # content in the other cell also is wrapped in a div, some browsers in
 
        # some configurations seem to mess up the formatting...
 
        if nocls:
 
            yield 0, ('<table class="%stable">' % self.cssclass +
 
                      '<tr><td><div class="linenodiv" '
 
                      'style="background-color: #f0f0f0; padding-right: 10px">'
 
                      '<pre style="line-height: 125%">' +
 
                      ls + '</pre></div></td><td id="hlcode" class="code">')
 
        else:
 
            yield 0, ('<table class="%stable">' % self.cssclass +
 
                      '<tr><td class="linenos"><div class="linenodiv"><pre>' +
 
                      ls + '</pre></div></td><td id="hlcode" class="code">')
 
        yield 0, dummyoutfile.getvalue()
 
        yield 0, '</td></tr></table>'
 

	
 

	
 
def pygmentize(filenode, **kwargs):
 
    """
 
    pygmentize function using pygments
 

	
 
    :param filenode:
 
    """
 
    lexer = get_custom_lexer(filenode.extension) or filenode.lexer
 
    return literal(code_highlight(filenode.content, lexer,
 
                                  CodeHtmlFormatter(**kwargs)))
 

	
 

	
 
def pygmentize_annotation(repo_name, filenode, **kwargs):
 
    """
 
    pygmentize function for annotation
 

	
 
    :param filenode:
 
    """
 

	
 
    color_dict = {}
 

	
 
    def gen_color(n=10000):
 
        """generator for getting n of evenly distributed colors using
 
        hsv color and golden ratio. It always return same order of colors
 

	
 
        :returns: RGB tuple
 
        """
 

	
 
        def hsv_to_rgb(h, s, v):
 
            if s == 0.0:
 
                return v, v, v
 
            i = int(h * 6.0)  # XXX assume int() truncates!
 
            f = (h * 6.0) - i
 
            p = v * (1.0 - s)
 
            q = v * (1.0 - s * f)
 
            t = v * (1.0 - s * (1.0 - f))
 
            i = i % 6
 
            if i == 0:
 
                return v, t, p
 
            if i == 1:
 
                return q, v, p
 
            if i == 2:
 
                return p, v, t
 
            if i == 3:
 
                return p, q, v
 
            if i == 4:
 
                return t, p, v
 
            if i == 5:
 
                return v, p, q
 

	
 
        golden_ratio = 0.618033988749895
 
        h = 0.22717784590367374
 

	
 
        for _ in xrange(n):
 
            h += golden_ratio
 
            h %= 1
 
            HSV_tuple = [h, 0.95, 0.95]
 
            RGB_tuple = hsv_to_rgb(*HSV_tuple)
 
            yield map(lambda x: str(int(x * 256)), RGB_tuple)
 

	
 
    cgenerator = gen_color()
 

	
 
    def get_color_string(cs):
 
        if cs in color_dict:
 
            col = color_dict[cs]
 
        else:
 
            col = color_dict[cs] = cgenerator.next()
 
        return "color: rgb(%s)! important;" % (', '.join(col))
 

	
 
    def url_func(repo_name):
 

	
 
        def _url_func(changeset):
 
            author = changeset.author
 
            date = changeset.date
 
            message = tooltip(changeset.message)
 

	
 
            tooltip_html = ("<div style='font-size:0.8em'><b>Author:</b>"
 
                            " %s<br/><b>Date:</b> %s</b><br/><b>Message:"
 
                            "</b> %s<br/></div>")
 

	
 
            tooltip_html = tooltip_html % (author, date, message)
 
            lnk_format = '%5s:%s' % ('r%s' % changeset.revision,
 
                                     short_id(changeset.raw_id))
 
            uri = link_to(
 
                    lnk_format,
 
                    url('changeset_home', repo_name=repo_name,
 
                        revision=changeset.raw_id),
 
                    style=get_color_string(changeset.raw_id),
 
                    class_='tooltip',
 
                    title=tooltip_html
 
                  )
 

	
 
            uri += '\n'
 
            return uri
 
        return _url_func
 

	
 
    return literal(annotate_highlight(filenode, url_func(repo_name), **kwargs))
 

	
 

	
 
def is_following_repo(repo_name, user_id):
 
    from rhodecode.model.scm import ScmModel
 
    return ScmModel().is_following_repo(repo_name, user_id)
 

	
 
flash = _Flash()
 

	
 
#==============================================================================
 
# SCM FILTERS available via h.
 
#==============================================================================
 
from rhodecode.lib.vcs.utils import author_name, author_email
 
from rhodecode.lib.utils2 import credentials_filter, age as _age
 
from rhodecode.model.db import User, ChangesetStatus
 

	
 
age = lambda  x: _age(x)
 
capitalize = lambda x: x.capitalize()
 
email = author_email
 
short_id = lambda x: x[:12]
 
hide_credentials = lambda x: ''.join(credentials_filter(x))
 

	
 

	
 
def fmt_date(date):
 
    if date:
 
        _fmt = _(u"%a, %d %b %Y %H:%M:%S").encode('utf8')
 
        return date.strftime(_fmt).decode('utf8')
 

	
 
    return ""
 

	
 

	
 
def is_git(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'git'
 

	
 

	
 
def is_hg(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'hg'
 

	
 

	
 
def email_or_none(author):
 
    # extract email from the commit string
 
    _email = email(author)
 
    if _email != '':
 
        # check it against RhodeCode database, and use the MAIN email for this
 
        # user
 
        user = User.get_by_email(_email, case_insensitive=True, cache=True)
 
        if user is not None:
 
            return user.email
 
        return _email
 

	
 
    # See if it contains a username we can get an email from
 
    user = User.get_by_username(author_name(author), case_insensitive=True,
 
                                cache=True)
 
    if user is not None:
 
        return user.email
 

	
 
    # No valid email, not a valid user in the system, none!
 
    return None
 

	
 

	
 
def person(author, show_attr="username_and_name"):
 
    # attr to return from fetched user
 
    person_getter = lambda usr: getattr(usr, show_attr)
 

	
 
    # Valid email in the attribute passed, see if they're in the system
 
    _email = email(author)
 
    if _email != '':
 
        user = User.get_by_email(_email, case_insensitive=True, cache=True)
 
        if user is not None:
 
            return person_getter(user)
 
        return _email
 

	
 
    # Maybe it's a username?
 
    _author = author_name(author)
 
    user = User.get_by_username(_author, case_insensitive=True,
 
                                cache=True)
 
    if user is not None:
 
        return person_getter(user)
 

	
 
    # Still nothing?  Just pass back the author name then
 
    return _author
 

	
 

	
 
def person_by_id(id_, show_attr="username_and_name"):
 
    # attr to return from fetched user
 
    person_getter = lambda usr: getattr(usr, show_attr)
 

	
 
    #maybe it's an ID ?
 
    if str(id_).isdigit() or isinstance(id_, int):
 
        id_ = int(id_)
 
        user = User.get(id_)
 
        if user is not None:
 
            return person_getter(user)
 
    return id_
 

	
 

	
 
def desc_stylize(value):
 
    """
 
    converts tags from value into html equivalent
 

	
 
    :param value:
 
    """
 
    value = re.sub(r'\[see\ \=\>\ *([a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\]',
 
                   '<div class="metatag" tag="see">see =&gt; \\1 </div>', value)
 
    value = re.sub(r'\[license\ \=\>\ *([a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\]',
 
                   '<div class="metatag" tag="license"><a href="http:\/\/www.opensource.org/licenses/\\1">\\1</a></div>', value)
 
    value = re.sub(r'\[(requires|recommends|conflicts|base)\ \=\>\ *([a-zA-Z0-9\-\/]*)\]',
 
                   '<div class="metatag" tag="\\1">\\1 =&gt; <a href="/\\2">\\2</a></div>', value)
 
    value = re.sub(r'\[(lang|language)\ \=\>\ *([a-zA-Z\-\/\#\+]*)\]',
 
                   '<div class="metatag" tag="lang">\\2</div>', value)
 
    value = re.sub(r'\[([a-z]+)\]',
 
                  '<div class="metatag" tag="\\1">\\1</div>', value)
 

	
 
    return value
 

	
 

	
 
def bool2icon(value):
 
    """Returns True/False values represented as small html image of true/false
 
    icons
 

	
 
    :param value: bool value
 
    """
 

	
 
    if value is True:
 
        return HTML.tag('img', src=url("/images/icons/accept.png"),
 
                        alt=_('True'))
 

	
 
    if value is False:
 
        return HTML.tag('img', src=url("/images/icons/cancel.png"),
 
                        alt=_('False'))
 

	
 
    return value
 

	
 

	
 
def action_parser(user_log, feed=False, parse_cs=False):
 
    """
 
    This helper will action_map the specified string action into translated
 
    fancy names with icons and links
 

	
 
    :param user_log: user log instance
 
    :param feed: use output for feeds (no html and fancy icons)
 
    :param parse_cs: parse Changesets into VCS instances
 
    """
 

	
 
    action = user_log.action
 
    action_params = ' '
 

	
 
    x = action.split(':')
 

	
 
    if len(x) > 1:
 
        action, action_params = x
 

	
 
    def get_cs_links():
 
        revs_limit = 3  # display this amount always
 
        revs_top_limit = 50  # show upto this amount of changesets hidden
 
        revs_ids = action_params.split(',')
 
        deleted = user_log.repository is None
 
        if deleted:
 
            return ','.join(revs_ids)
 

	
 
        repo_name = user_log.repository.repo_name
 

	
 
        def lnk(rev, repo_name):
 
            if isinstance(rev, BaseChangeset) or isinstance(rev, AttributeDict):
 
                lazy_cs = True
 
                if getattr(rev, 'op', None) and getattr(rev, 'ref_name', None):
 
                    lazy_cs = False
 
                    lbl = '?'
 
                    if rev.op == 'delete_branch':
 
                        lbl = '%s' % _('Deleted branch: %s') % rev.ref_name
 
                        title = ''
 
                    elif rev.op == 'tag':
 
                        lbl = '%s' % _('Created tag: %s') % rev.ref_name
 
                        title = ''
 
                    _url = '#'
 

	
 
                else:
 
                    lbl = '%s' % (rev.short_id[:8])
 
                    _url = url('changeset_home', repo_name=repo_name,
 
                               revision=rev.raw_id)
 
                    title = tooltip(rev.message)
 
            else:
 
                ## changeset cannot be found/striped/removed etc.
 
                lbl = ('%s' % rev)[:12]
 
                _url = '#'
 
                title = _('Changeset not found')
 
            if parse_cs:
 
                return link_to(lbl, _url, title=title, class_='tooltip')
 
            return link_to(lbl, _url, raw_id=rev.raw_id, repo_name=repo_name,
 
                           class_='lazy-cs' if lazy_cs else '')
 

	
 
        revs = []
 
        if len(filter(lambda v: v != '', revs_ids)) > 0:
 
            repo = None
 
            for rev in revs_ids[:revs_top_limit]:
 
                _op = _name = None
 
                if len(rev.split('=>')) == 2:
 
                    _op, _name = rev.split('=>')
 

	
 
                # we want parsed changesets, or new log store format is bad
 
                if parse_cs:
 
                    try:
 
                        if repo is None:
 
                            repo = user_log.repository.scm_instance
 
                        _rev = repo.get_changeset(rev)
 
                        revs.append(_rev)
 
                    except ChangesetDoesNotExistError:
 
                        log.error('cannot find revision %s in this repo' % rev)
 
                        revs.append(rev)
 
                        continue
 
                else:
 
                    _rev = AttributeDict({
 
                        'short_id': rev[:12],
 
                        'raw_id': rev,
 
                        'message': '',
 
                        'op': _op,
 
                        'ref_name': _name
 
                    })
 
                    revs.append(_rev)
 
        cs_links = []
 
        cs_links.append(" " + ', '.join(
 
            [lnk(rev, repo_name) for rev in revs[:revs_limit]]
 
            )
 
        )
 

	
 
        compare_view = (
 
            ' <div class="compare_view tooltip" title="%s">'
 
            '<a href="%s">%s</a> </div>' % (
 
                _('Show all combined changesets %s->%s') % (
 
                    revs_ids[0][:12], revs_ids[-1][:12]
 
                ),
 
                url('changeset_home', repo_name=repo_name,
 
                    revision='%s...%s' % (revs_ids[0], revs_ids[-1])
 
                ),
 
                _('compare view')
 
            )
 
        )
 

	
 
        # if we have exactly one more than normally displayed
 
        # just display it, takes less space than displaying
 
        # "and 1 more revisions"
 
        if len(revs_ids) == revs_limit + 1:
 
            rev = revs[revs_limit]
 
            cs_links.append(", " + lnk(rev, repo_name))
 

	
 
        # hidden-by-default ones
 
        if len(revs_ids) > revs_limit + 1:
 
            uniq_id = revs_ids[0]
 
            html_tmpl = (
 
                '<span> %s <a class="show_more" id="_%s" '
 
                'href="#more">%s</a> %s</span>'
 
            )
 
            if not feed:
 
                cs_links.append(html_tmpl % (
 
                      _('and'),
 
                      uniq_id, _('%s more') % (len(revs_ids) - revs_limit),
 
                      _('revisions')
 
                    )
 
                )
 

	
 
            if not feed:
 
                html_tmpl = '<span id="%s" style="display:none">, %s </span>'
 
            else:
 
                html_tmpl = '<span id="%s"> %s </span>'
 

	
 
            morelinks = ', '.join(
 
              [lnk(rev, repo_name) for rev in revs[revs_limit:]]
 
            )
 

	
 
            if len(revs_ids) > revs_top_limit:
 
                morelinks += ', ...'
 

	
 
            cs_links.append(html_tmpl % (uniq_id, morelinks))
 
        if len(revs) > 1:
 
            cs_links.append(compare_view)
 
        return ''.join(cs_links)
 

	
 
    def get_fork_name():
 
        repo_name = action_params
 
        _url = url('summary_home', repo_name=repo_name)
 
        return _('fork name %s') % link_to(action_params, _url)
 

	
 
    def get_user_name():
 
        user_name = action_params
 
        return user_name
 

	
 
    def get_users_group():
 
        group_name = action_params
 
        return group_name
 

	
 
    def get_pull_request():
 
        pull_request_id = action_params
 
        deleted = user_log.repository is None
 
        if deleted:
 
            repo_name = user_log.repository_name
 
        else:
 
            repo_name = user_log.repository.repo_name
 
        return link_to(_('Pull request #%s') % pull_request_id,
 
                    url('pullrequest_show', repo_name=repo_name,
 
                    pull_request_id=pull_request_id))
 

	
 
    # action : translated str, callback(extractor), icon
 
    action_map = {
 
    'user_deleted_repo':           (_('[deleted] repository'),
 
                                    None, 'database_delete.png'),
 
    'user_created_repo':           (_('[created] repository'),
 
                                    None, 'database_add.png'),
 
    'user_created_fork':           (_('[created] repository as fork'),
 
                                    None, 'arrow_divide.png'),
 
    'user_forked_repo':            (_('[forked] repository'),
 
                                    get_fork_name, 'arrow_divide.png'),
 
    'user_updated_repo':           (_('[updated] repository'),
 
                                    None, 'database_edit.png'),
 
    'admin_deleted_repo':          (_('[delete] repository'),
 
                                    None, 'database_delete.png'),
 
    'admin_created_repo':          (_('[created] repository'),
 
                                    None, 'database_add.png'),
 
    'admin_forked_repo':           (_('[forked] repository'),
 
                                    None, 'arrow_divide.png'),
 
    'admin_updated_repo':          (_('[updated] repository'),
 
                                    None, 'database_edit.png'),
 
    'admin_created_user':          (_('[created] user'),
 
                                    get_user_name, 'user_add.png'),
 
    'admin_updated_user':          (_('[updated] user'),
 
                                    get_user_name, 'user_edit.png'),
 
    'admin_created_users_group':   (_('[created] users group'),
 
    'admin_created_users_group':   (_('[created] user group'),
 
                                    get_users_group, 'group_add.png'),
 
    'admin_updated_users_group':   (_('[updated] users group'),
 
    'admin_updated_users_group':   (_('[updated] user group'),
 
                                    get_users_group, 'group_edit.png'),
 
    'user_commented_revision':     (_('[commented] on revision in repository'),
 
                                    get_cs_links, 'comment_add.png'),
 
    'user_commented_pull_request': (_('[commented] on pull request for'),
 
                                    get_pull_request, 'comment_add.png'),
 
    'user_closed_pull_request':    (_('[closed] pull request for'),
 
                                    get_pull_request, 'tick.png'),
 
    'push':                        (_('[pushed] into'),
 
                                    get_cs_links, 'script_add.png'),
 
    'push_local':                  (_('[committed via RhodeCode] into repository'),
 
                                    get_cs_links, 'script_edit.png'),
 
    'push_remote':                 (_('[pulled from remote] into repository'),
 
                                    get_cs_links, 'connect.png'),
 
    'pull':                        (_('[pulled] from'),
 
                                    None, 'down_16.png'),
 
    'started_following_repo':      (_('[started following] repository'),
 
                                    None, 'heart_add.png'),
 
    'stopped_following_repo':      (_('[stopped following] repository'),
 
                                    None, 'heart_delete.png'),
 
    }
 

	
 
    action_str = action_map.get(action, action)
 
    if feed:
 
        action = action_str[0].replace('[', '').replace(']', '')
 
    else:
 
        action = action_str[0]\
 
            .replace('[', '<span class="journal_highlight">')\
 
            .replace(']', '</span>')
 

	
 
    action_params_func = lambda: ""
 

	
 
    if callable(action_str[1]):
 
        action_params_func = action_str[1]
 

	
 
    def action_parser_icon():
 
        action = user_log.action
 
        action_params = None
 
        x = action.split(':')
 

	
 
        if len(x) > 1:
 
            action, action_params = x
 

	
 
        tmpl = """<img src="%s%s" alt="%s"/>"""
 
        ico = action_map.get(action, ['', '', ''])[2]
 
        return literal(tmpl % ((url('/images/icons/')), ico, action))
 

	
 
    # returned callbacks we need to call to get
 
    return [lambda: literal(action), action_params_func, action_parser_icon]
 

	
 

	
 

	
 
#==============================================================================
 
# PERMS
 
#==============================================================================
 
from rhodecode.lib.auth import HasPermissionAny, HasPermissionAll, \
 
HasRepoPermissionAny, HasRepoPermissionAll, HasReposGroupPermissionAll, \
 
HasReposGroupPermissionAny
 

	
 

	
 
#==============================================================================
 
# GRAVATAR URL
 
#==============================================================================
 

	
 
def gravatar_url(email_address, size=30):
 
    from pylons import url  # doh, we need to re-import url to mock it later
 
    _def = 'anonymous@rhodecode.org'
 
    use_gravatar = str2bool(config['app_conf'].get('use_gravatar'))
 
    email_address = email_address or _def
 
    if (not use_gravatar or not email_address or email_address == _def):
 
        f = lambda a, l: min(l, key=lambda x: abs(x - a))
 
        return url("/images/user%s.png" % f(size, [14, 16, 20, 24, 30]))
 

	
 
    if use_gravatar and config['app_conf'].get('alternative_gravatar_url'):
 
        tmpl = config['app_conf'].get('alternative_gravatar_url', '')
 
        parsed_url = urlparse.urlparse(url.current(qualified=True))
 
        tmpl = tmpl.replace('{email}', email_address)\
 
                   .replace('{md5email}', hashlib.md5(email_address.lower()).hexdigest()) \
 
                   .replace('{netloc}', parsed_url.netloc)\
 
                   .replace('{scheme}', parsed_url.scheme)\
 
                   .replace('{size}', str(size))
 
        return tmpl
 

	
 
    ssl_enabled = 'https' == request.environ.get('wsgi.url_scheme')
 
    default = 'identicon'
 
    baseurl_nossl = "http://www.gravatar.com/avatar/"
 
    baseurl_ssl = "https://secure.gravatar.com/avatar/"
 
    baseurl = baseurl_ssl if ssl_enabled else baseurl_nossl
 

	
 
    if isinstance(email_address, unicode):
 
        #hashlib crashes on unicode items
 
        email_address = safe_str(email_address)
 
    # construct the url
 
    gravatar_url = baseurl + hashlib.md5(email_address.lower()).hexdigest() + "?"
 
    gravatar_url += urllib.urlencode({'d': default, 's': str(size)})
 

	
 
    return gravatar_url
 

	
 

	
 
#==============================================================================
 
# REPO PAGER, PAGER FOR REPOSITORY
 
#==============================================================================
 
class RepoPage(Page):
 

	
 
    def __init__(self, collection, page=1, items_per_page=20,
 
                 item_count=None, url=None, **kwargs):
 

	
 
        """Create a "RepoPage" instance. special pager for paging
 
        repository
 
        """
 
        self._url_generator = url
 

	
 
        # Safe the kwargs class-wide so they can be used in the pager() method
 
        self.kwargs = kwargs
 

	
 
        # Save a reference to the collection
 
        self.original_collection = collection
 

	
 
        self.collection = collection
 

	
 
        # The self.page is the number of the current page.
 
        # The first page has the number 1!
 
        try:
 
            self.page = int(page)  # make it int() if we get it as a string
 
        except (ValueError, TypeError):
 
            self.page = 1
 

	
 
        self.items_per_page = items_per_page
 

	
 
        # Unless the user tells us how many items the collections has
 
        # we calculate that ourselves.
 
        if item_count is not None:
 
            self.item_count = item_count
 
        else:
 
            self.item_count = len(self.collection)
 

	
 
        # Compute the number of the first and last available page
 
        if self.item_count > 0:
 
            self.first_page = 1
 
            self.page_count = int(math.ceil(float(self.item_count) /
 
                                            self.items_per_page))
 
            self.last_page = self.first_page + self.page_count - 1
 

	
 
            # Make sure that the requested page number is the range of
 
            # valid pages
 
            if self.page > self.last_page:
 
                self.page = self.last_page
 
            elif self.page < self.first_page:
 
                self.page = self.first_page
 

	
 
            # Note: the number of items on this page can be less than
 
            #       items_per_page if the last page is not full
 
            self.first_item = max(0, (self.item_count) - (self.page *
 
                                                          items_per_page))
 
            self.last_item = ((self.item_count - 1) - items_per_page *
 
                              (self.page - 1))
 

	
 
            self.items = list(self.collection[self.first_item:self.last_item + 1])
 

	
 
            # Links to previous and next page
 
            if self.page > self.first_page:
 
                self.previous_page = self.page - 1
 
            else:
 
                self.previous_page = None
 

	
 
            if self.page < self.last_page:
 
                self.next_page = self.page + 1
 
            else:
 
                self.next_page = None
 

	
 
        # No items available
 
        else:
 
            self.first_page = None
 
            self.page_count = 0
 
            self.last_page = None
 
            self.first_item = None
 
            self.last_item = None
 
            self.previous_page = None
 
            self.next_page = None
 
            self.items = []
 

	
 
        # This is a subclass of the 'list' type. Initialise the list now.
 
        list.__init__(self, reversed(self.items))
 

	
 

	
 
def changed_tooltip(nodes):
 
    """
 
    Generates a html string for changed nodes in changeset page.
 
    It limits the output to 30 entries
 

	
 
    :param nodes: LazyNodesGenerator
 
    """
 
    if nodes:
 
        pref = ': <br/> '
 
        suf = ''
 
        if len(nodes) > 30:
 
            suf = '<br/>' + _(' and %s more') % (len(nodes) - 30)
 
        return literal(pref + '<br/> '.join([safe_unicode(x.path)
 
                                             for x in nodes[:30]]) + suf)
 
    else:
 
        return ': ' + _('No Files')
 

	
 

	
 
def repo_link(groups_and_repos, last_url=None):
 
    """
 
    Makes a breadcrumbs link to repo within a group
 
    joins &raquo; on each group to create a fancy link
 

	
 
    ex::
 
        group >> subgroup >> repo
 

	
 
    :param groups_and_repos:
 
    :param last_url:
 
    """
 
    groups, repo_name = groups_and_repos
 
    last_link = link_to(repo_name, last_url) if last_url else repo_name
 

	
 
    if not groups:
 
        if last_url:
 
            return last_link
 
        return repo_name
 
    else:
 
        def make_link(group):
 
            return link_to(group.name,
 
                           url('repos_group_home', group_name=group.group_name))
 
        return literal(' &raquo; '.join(map(make_link, groups) + [last_link]))
 

	
 

	
 
def fancy_file_stats(stats):
 
    """
 
    Displays a fancy two colored bar for number of added/deleted
 
    lines of code on file
 

	
 
    :param stats: two element list of added/deleted lines of code
 
    """
 
    def cgen(l_type, a_v, d_v):
 
        mapping = {'tr': 'top-right-rounded-corner-mid',
 
                   'tl': 'top-left-rounded-corner-mid',
 
                   'br': 'bottom-right-rounded-corner-mid',
 
                   'bl': 'bottom-left-rounded-corner-mid'}
 
        map_getter = lambda x: mapping[x]
 

	
 
        if l_type == 'a' and d_v:
 
            #case when added and deleted are present
 
            return ' '.join(map(map_getter, ['tl', 'bl']))
 

	
 
        if l_type == 'a' and not d_v:
 
            return ' '.join(map(map_getter, ['tr', 'br', 'tl', 'bl']))
 

	
 
        if l_type == 'd' and a_v:
 
            return ' '.join(map(map_getter, ['tr', 'br']))
 

	
 
        if l_type == 'd' and not a_v:
 
            return ' '.join(map(map_getter, ['tr', 'br', 'tl', 'bl']))
 

	
 
    a, d = stats[0], stats[1]
 
    width = 100
 

	
 
    if a == 'b':
 
        #binary mode
 
        b_d = '<div class="bin%s %s" style="width:100%%">%s</div>' % (d, cgen('a', a_v='', d_v=0), 'bin')
 
        b_a = '<div class="bin1" style="width:0%%">%s</div>' % ('bin')
 
        return literal('<div style="width:%spx">%s%s</div>' % (width, b_a, b_d))
 

	
 
    t = stats[0] + stats[1]
 
    unit = float(width) / (t or 1)
 

	
 
    # needs > 9% of width to be visible or 0 to be hidden
 
    a_p = max(9, unit * a) if a > 0 else 0
 
    d_p = max(9, unit * d) if d > 0 else 0
 
    p_sum = a_p + d_p
 

	
 
    if p_sum > width:
 
        #adjust the percentage to be == 100% since we adjusted to 9
 
        if a_p > d_p:
 
            a_p = a_p - (p_sum - width)
 
        else:
 
            d_p = d_p - (p_sum - width)
 

	
 
    a_v = a if a > 0 else ''
 
    d_v = d if d > 0 else ''
 

	
 
    d_a = '<div class="added %s" style="width:%s%%">%s</div>' % (
 
        cgen('a', a_v, d_v), a_p, a_v
 
    )
 
    d_d = '<div class="deleted %s" style="width:%s%%">%s</div>' % (
 
        cgen('d', a_v, d_v), d_p, d_v
 
    )
 
    return literal('<div style="width:%spx">%s%s</div>' % (width, d_a, d_d))
 

	
 

	
 
def urlify_text(text_, safe=True):
 
    """
 
    Extrac urls from text and make html links out of them
 

	
 
    :param text_:
 
    """
 

	
 
    url_pat = re.compile(r'''(http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]'''
 
                         '''|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+)''')
 

	
 
    def url_func(match_obj):
 
        url_full = match_obj.groups()[0]
 
        return '<a href="%(url)s">%(url)s</a>' % ({'url': url_full})
 
    _newtext = url_pat.sub(url_func, text_)
 
    if safe:
 
        return literal(_newtext)
 
    return _newtext
 

	
 

	
 
def urlify_changesets(text_, repository):
 
    """
 
    Extract revision ids from changeset and make link from them
 

	
 
    :param text_:
 
    :param repository: repo name to build the URL with
 
    """
 
    from pylons import url  # doh, we need to re-import url to mock it later
 
    URL_PAT = re.compile(r'(^|\s)([0-9a-fA-F]{12,40})($|\s)')
 

	
 
    def url_func(match_obj):
 
        rev = match_obj.groups()[1]
 
        pref = match_obj.groups()[0]
 
        suf = match_obj.groups()[2]
 

	
 
        tmpl = (
 
        '%(pref)s<a class="%(cls)s" href="%(url)s">'
 
        '%(rev)s</a>%(suf)s'
 
        )
 
        return tmpl % {
 
         'pref': pref,
 
         'cls': 'revision-link',
 
         'url': url('changeset_home', repo_name=repository, revision=rev),
 
         'rev': rev,
 
         'suf': suf
 
        }
 

	
 
    newtext = URL_PAT.sub(url_func, text_)
 

	
 
    return newtext
 

	
 

	
 
def urlify_commit(text_, repository=None, link_=None):
 
    """
 
    Parses given text message and makes proper links.
 
    issues are linked to given issue-server, and rest is a changeset link
 
    if link_ is given, in other case it's a plain text
 

	
 
    :param text_:
 
    :param repository:
 
    :param link_: changeset link
 
    """
 
    import traceback
 
    from pylons import url  # doh, we need to re-import url to mock it later
 

	
 
    def escaper(string):
 
        return string.replace('<', '&lt;').replace('>', '&gt;')
 

	
 
    def linkify_others(t, l):
 
        urls = re.compile(r'(\<a.*?\<\/a\>)',)
 
        links = []
 
        for e in urls.split(t):
 
            if not urls.match(e):
 
                links.append('<a class="message-link" href="%s">%s</a>' % (l, e))
 
            else:
 
                links.append(e)
 

	
 
        return ''.join(links)
 

	
 
    # urlify changesets - extrac revisions and make link out of them
 
    newtext = urlify_changesets(escaper(text_), repository)
 

	
 
    # extract http/https links and make them real urls
 
    newtext = urlify_text(newtext, safe=False)
 

	
 
    try:
 
        from rhodecode import CONFIG
 
        conf = CONFIG
 

	
 
        # allow multiple issue servers to be used
 
        valid_indices = [
 
            x.group(1)
 
            for x in map(lambda x: re.match(r'issue_pat(.*)', x), conf.keys())
 
            if x and 'issue_server_link%s' % x.group(1) in conf
 
            and 'issue_prefix%s' % x.group(1) in conf
 
        ]
 

	
 
        log.debug('found issue server suffixes `%s` during valuation of: %s'
 
                  % (','.join(valid_indices), newtext))
 

	
 
        for pattern_index in valid_indices:
 
            ISSUE_PATTERN = conf.get('issue_pat%s' % pattern_index)
 
            ISSUE_SERVER_LNK = conf.get('issue_server_link%s' % pattern_index)
 
            ISSUE_PREFIX = conf.get('issue_prefix%s' % pattern_index)
 

	
 
            log.debug('pattern suffix `%s` PAT:%s SERVER_LINK:%s PREFIX:%s'
 
                      % (pattern_index, ISSUE_PATTERN, ISSUE_SERVER_LNK,
 
                         ISSUE_PREFIX))
 

	
 
            URL_PAT = re.compile(r'%s' % ISSUE_PATTERN)
 

	
 
            def url_func(match_obj):
 
                pref = ''
 
                if match_obj.group().startswith(' '):
 
                    pref = ' '
 

	
 
                issue_id = ''.join(match_obj.groups())
 
                tmpl = (
 
                '%(pref)s<a class="%(cls)s" href="%(url)s">'
 
                '%(issue-prefix)s%(id-repr)s'
 
                '</a>'
 
                )
 
                url = ISSUE_SERVER_LNK.replace('{id}', issue_id)
 
                if repository:
 
                    url = url.replace('{repo}', repository)
 
                    repo_name = repository.split(URL_SEP)[-1]
 
                    url = url.replace('{repo_name}', repo_name)
 

	
 
                return tmpl % {
 
                     'pref': pref,
 
                     'cls': 'issue-tracker-link',
 
                     'url': url,
 
                     'id-repr': issue_id,
 
                     'issue-prefix': ISSUE_PREFIX,
 
                     'serv': ISSUE_SERVER_LNK,
 
                }
 
            newtext = URL_PAT.sub(url_func, newtext)
 
            log.debug('processed prefix:`%s` => %s' % (pattern_index, newtext))
 

	
 
        # if we actually did something above
 
        if link_:
 
            # wrap not links into final link => link_
 
            newtext = linkify_others(newtext, link_)
 
    except:
 
        log.error(traceback.format_exc())
 
        pass
 

	
 
    return literal(newtext)
 

	
 

	
 
def rst(source):
 
    return literal('<div class="rst-block">%s</div>' %
 
                   MarkupRenderer.rst(source))
 

	
 

	
 
def rst_w_mentions(source):
 
    """
 
    Wrapped rst renderer with @mention highlighting
 

	
 
    :param source:
 
    """
 
    return literal('<div class="rst-block">%s</div>' %
 
                   MarkupRenderer.rst_with_mentions(source))
 

	
 

	
 
def changeset_status(repo, revision):
 
    return ChangesetStatusModel().get_status(repo, revision)
 

	
 

	
 
def changeset_status_lbl(changeset_status):
 
    return dict(ChangesetStatus.STATUSES).get(changeset_status)
 

	
 

	
 
def get_permission_name(key):
 
    return dict(Permission.PERMS).get(key)
 

	
 

	
 
def journal_filter_help():
 
    return _(textwrap.dedent('''
 
        Example filter terms:
 
            repository:vcs
 
            username:marcin
 
            action:*push*
 
            ip:127.0.0.1
 
            date:20120101
 
            date:[20120101100000 TO 20120102]
 

	
 
        Generate wildcards using '*' character:
 
            "repositroy:vcs*" - search everything starting with 'vcs'
 
            "repository:*vcs*" - search for repository containing 'vcs'
 

	
 
        Optional AND / OR operators in queries
 
            "repository:vcs OR repository:test"
 
            "username:test AND repository:test*"
 
    '''))
 

	
 

	
 
def not_mapped_error(repo_name):
 
    flash(_('%s repository is not mapped to db perhaps'
 
            ' it was created or renamed from the filesystem'
 
            ' please run the application again'
 
            ' in order to rescan repositories') % repo_name, category='error')
 

	
 

	
 
def ip_range(ip_addr):
 
    from rhodecode.model.db import UserIpMap
 
    s, e = UserIpMap._get_ip_range(ip_addr)
 
    return '%s - %s' % (s, e)
rhodecode/model/repo.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.model.repo
 
    ~~~~~~~~~~~~~~~~~~~~
 

	
 
    Repository model for rhodecode
 

	
 
    :created_on: Jun 5, 2010
 
    :author: marcink
 
    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
from __future__ import with_statement
 
import os
 
import shutil
 
import logging
 
import traceback
 
from datetime import datetime
 

	
 
from rhodecode.lib.vcs.backends import get_backend
 
from rhodecode.lib.compat import json
 
from rhodecode.lib.utils2 import LazyProperty, safe_str, safe_unicode,\
 
    remove_prefix
 
from rhodecode.lib.caching_query import FromCache
 
from rhodecode.lib.hooks import log_create_repository, log_delete_repository
 

	
 
from rhodecode.model import BaseModel
 
from rhodecode.model.db import Repository, UserRepoToPerm, User, Permission, \
 
    Statistics, UsersGroup, UsersGroupRepoToPerm, RhodeCodeUi, RepoGroup,\
 
    RhodeCodeSetting, RepositoryField
 
from rhodecode.lib import helpers as h
 
from rhodecode.lib.auth import HasRepoPermissionAny
 
from rhodecode.lib.vcs.backends.base import EmptyChangeset
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class RepoModel(BaseModel):
 

	
 
    cls = Repository
 
    URL_SEPARATOR = Repository.url_sep()
 

	
 
    def __get_users_group(self, users_group):
 
        return self._get_instance(UsersGroup, users_group,
 
                                  callback=UsersGroup.get_by_group_name)
 

	
 
    def _get_repos_group(self, repos_group):
 
        return self._get_instance(RepoGroup, repos_group,
 
                                  callback=RepoGroup.get_by_group_name)
 

	
 
    @LazyProperty
 
    def repos_path(self):
 
        """
 
        Get's the repositories root path from database
 
        """
 

	
 
        q = self.sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one()
 
        return q.ui_value
 

	
 
    def get(self, repo_id, cache=False):
 
        repo = self.sa.query(Repository)\
 
            .filter(Repository.repo_id == repo_id)
 

	
 
        if cache:
 
            repo = repo.options(FromCache("sql_cache_short",
 
                                          "get_repo_%s" % repo_id))
 
        return repo.scalar()
 

	
 
    def get_repo(self, repository):
 
        return self._get_repo(repository)
 

	
 
    def get_by_repo_name(self, repo_name, cache=False):
 
        repo = self.sa.query(Repository)\
 
            .filter(Repository.repo_name == repo_name)
 

	
 
        if cache:
 
            repo = repo.options(FromCache("sql_cache_short",
 
                                          "get_repo_%s" % repo_name))
 
        return repo.scalar()
 

	
 
    def get_all_user_repos(self, user):
 
        """
 
        Get's all repositories that user have at least read access
 

	
 
        :param user:
 
        :type user:
 
        """
 
        from rhodecode.lib.auth import AuthUser
 
        user = self._get_user(user)
 
        repos = AuthUser(user_id=user.user_id).permissions['repositories']
 
        access_check = lambda r: r[1] in ['repository.read',
 
                                          'repository.write',
 
                                          'repository.admin']
 
        repos = [x[0] for x in filter(access_check, repos.items())]
 
        return Repository.query().filter(Repository.repo_name.in_(repos))
 

	
 
    def get_users_js(self):
 
        users = self.sa.query(User).filter(User.active == True).all()
 
        return json.dumps([
 
            {
 
             'id': u.user_id,
 
             'fname': u.name,
 
             'lname': u.lastname,
 
             'nname': u.username,
 
             'gravatar_lnk': h.gravatar_url(u.email, 14)
 
            } for u in users]
 
        )
 

	
 
    def get_users_groups_js(self):
 
        users_groups = self.sa.query(UsersGroup)\
 
            .filter(UsersGroup.users_group_active == True).all()
 

	
 
        return json.dumps([
 
            {
 
             'id': gr.users_group_id,
 
             'grname': gr.users_group_name,
 
             'grmembers': len(gr.members),
 
            } for gr in users_groups]
 
        )
 

	
 
    @classmethod
 
    def _render_datatable(cls, tmpl, *args, **kwargs):
 
        import rhodecode
 
        from pylons import tmpl_context as c
 
        from pylons.i18n.translation import _
 

	
 
        _tmpl_lookup = rhodecode.CONFIG['pylons.app_globals'].mako_lookup
 
        template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
 

	
 
        tmpl = template.get_def(tmpl)
 
        kwargs.update(dict(_=_, h=h, c=c))
 
        return tmpl.render(*args, **kwargs)
 

	
 
    @classmethod
 
    def update_repoinfo(cls, repositories=None):
 
        if not repositories:
 
            repositories = Repository.getAll()
 
        for repo in repositories:
 
            scm_repo = repo.scm_instance_no_cache
 
            last_cs = EmptyChangeset()
 
            if scm_repo:
 
                last_cs = scm_repo.get_changeset()
 
            repo.update_changeset_cache(last_cs)
 

	
 
    def get_repos_as_dict(self, repos_list=None, admin=False, perm_check=True,
 
                          super_user_actions=False):
 
        _render = self._render_datatable
 

	
 
        def quick_menu(repo_name):
 
            return _render('quick_menu', repo_name)
 

	
 
        def repo_lnk(name, rtype, private, fork_of):
 
            return _render('repo_name', name, rtype, private, fork_of,
 
                           short_name=not admin, admin=False)
 

	
 
        def last_change(last_change):
 
            return _render("last_change", last_change)
 

	
 
        def rss_lnk(repo_name):
 
            return _render("rss", repo_name)
 

	
 
        def atom_lnk(repo_name):
 
            return _render("atom", repo_name)
 

	
 
        def last_rev(repo_name, cs_cache):
 
            return _render('revision', repo_name, cs_cache.get('revision'),
 
                           cs_cache.get('raw_id'), cs_cache.get('author'),
 
                           cs_cache.get('message'))
 

	
 
        def desc(desc):
 
            from pylons import tmpl_context as c
 
            if c.visual.stylify_metatags:
 
                return h.urlify_text(h.desc_stylize(h.truncate(desc, 60)))
 
            else:
 
                return h.urlify_text(h.truncate(desc, 60))
 

	
 
        def repo_actions(repo_name):
 
            return _render('repo_actions', repo_name, super_user_actions)
 

	
 
        def owner_actions(user_id, username):
 
            return _render('user_name', user_id, username)
 

	
 
        repos_data = []
 
        for repo in repos_list:
 
            if perm_check:
 
                # check permission at this level
 
                if not HasRepoPermissionAny(
 
                    'repository.read', 'repository.write', 'repository.admin'
 
                )(repo.repo_name, 'get_repos_as_dict check'):
 
                    continue
 
            cs_cache = repo.changeset_cache
 
            row = {
 
                "menu": quick_menu(repo.repo_name),
 
                "raw_name": repo.repo_name.lower(),
 
                "name": repo_lnk(repo.repo_name, repo.repo_type,
 
                                 repo.private, repo.fork),
 
                "last_change": last_change(repo.last_db_change),
 
                "last_changeset": last_rev(repo.repo_name, cs_cache),
 
                "raw_tip": cs_cache.get('revision'),
 
                "desc": desc(repo.description),
 
                "owner": h.person(repo.user.username),
 
                "rss": rss_lnk(repo.repo_name),
 
                "atom": atom_lnk(repo.repo_name),
 

	
 
            }
 
            if admin:
 
                row.update({
 
                    "action": repo_actions(repo.repo_name),
 
                    "owner": owner_actions(repo.user.user_id,
 
                                           h.person(repo.user.username))
 
                })
 
            repos_data.append(row)
 

	
 
        return {
 
            "totalRecords": len(repos_list),
 
            "startIndex": 0,
 
            "sort": "name",
 
            "dir": "asc",
 
            "records": repos_data
 
        }
 

	
 
    def _get_defaults(self, repo_name):
 
        """
 
        Get's information about repository, and returns a dict for
 
        usage in forms
 

	
 
        :param repo_name:
 
        """
 

	
 
        repo_info = Repository.get_by_repo_name(repo_name)
 

	
 
        if repo_info is None:
 
            return None
 

	
 
        defaults = repo_info.get_dict()
 
        group, repo_name = repo_info.groups_and_repo
 
        defaults['repo_name'] = repo_name
 
        defaults['repo_group'] = getattr(group[-1] if group else None,
 
                                         'group_id', None)
 

	
 
        for strip, k in [(0, 'repo_type'), (1, 'repo_enable_downloads'),
 
                  (1, 'repo_description'), (1, 'repo_enable_locking'),
 
                  (1, 'repo_landing_rev'), (0, 'clone_uri'),
 
                  (1, 'repo_private'), (1, 'repo_enable_statistics')]:
 
            attr = k
 
            if strip:
 
                attr = remove_prefix(k, 'repo_')
 

	
 
            defaults[k] = defaults[attr]
 

	
 
        # fill owner
 
        if repo_info.user:
 
            defaults.update({'user': repo_info.user.username})
 
        else:
 
            replacement_user = User.query().filter(User.admin ==
 
                                                   True).first().username
 
            defaults.update({'user': replacement_user})
 

	
 
        # fill repository users
 
        for p in repo_info.repo_to_perm:
 
            defaults.update({'u_perm_%s' % p.user.username:
 
                             p.permission.permission_name})
 

	
 
        # fill repository groups
 
        for p in repo_info.users_group_to_perm:
 
            defaults.update({'g_perm_%s' % p.users_group.users_group_name:
 
                             p.permission.permission_name})
 

	
 
        return defaults
 

	
 
    def update(self, org_repo_name, **kwargs):
 
        try:
 
            cur_repo = self.get_by_repo_name(org_repo_name, cache=False)
 

	
 
            # update permissions
 
            for member, perm, member_type in kwargs['perms_updates']:
 
                if member_type == 'user':
 
                    # this updates existing one
 
                    RepoModel().grant_user_permission(
 
                        repo=cur_repo, user=member, perm=perm
 
                    )
 
                else:
 
                    RepoModel().grant_users_group_permission(
 
                        repo=cur_repo, group_name=member, perm=perm
 
                    )
 
            # set new permissions
 
            for member, perm, member_type in kwargs['perms_new']:
 
                if member_type == 'user':
 
                    RepoModel().grant_user_permission(
 
                        repo=cur_repo, user=member, perm=perm
 
                    )
 
                else:
 
                    RepoModel().grant_users_group_permission(
 
                        repo=cur_repo, group_name=member, perm=perm
 
                    )
 

	
 
            if 'user' in kwargs:
 
                cur_repo.user = User.get_by_username(kwargs['user'])
 

	
 
            if 'repo_group' in kwargs:
 
                cur_repo.group = RepoGroup.get(kwargs['repo_group'])
 

	
 
            for strip, k in [(0, 'repo_type'), (1, 'repo_enable_downloads'),
 
                      (1, 'repo_description'), (1, 'repo_enable_locking'),
 
                      (1, 'repo_landing_rev'), (0, 'clone_uri'),
 
                      (1, 'repo_private'), (1, 'repo_enable_statistics')]:
 
                if k in kwargs:
 
                    val = kwargs[k]
 
                    if strip:
 
                        k = remove_prefix(k, 'repo_')
 
                    setattr(cur_repo, k, val)
 

	
 
            new_name = cur_repo.get_new_name(kwargs['repo_name'])
 
            cur_repo.repo_name = new_name
 

	
 
            #handle extra fields
 
            for field in filter(lambda k: k.startswith(RepositoryField.PREFIX), kwargs):
 
                k = RepositoryField.un_prefix_key(field)
 
                ex_field = RepositoryField.get_by_key_name(key=k, repo=cur_repo)
 
                if ex_field:
 
                    ex_field.field_value = kwargs[field]
 
                    self.sa.add(ex_field)
 
            self.sa.add(cur_repo)
 

	
 
            if org_repo_name != new_name:
 
                # rename repository
 
                self.__rename_repo(old=org_repo_name, new=new_name)
 

	
 
            return cur_repo
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def create_repo(self, repo_name, repo_type, description, owner,
 
                    private=False, clone_uri=None, repos_group=None,
 
                    landing_rev='tip', just_db=False, fork_of=None,
 
                    copy_fork_permissions=False, enable_statistics=False,
 
                    enable_locking=False, enable_downloads=False):
 
        """
 
        Create repository
 

	
 
        """
 
        from rhodecode.model.scm import ScmModel
 

	
 
        owner = self._get_user(owner)
 
        fork_of = self._get_repo(fork_of)
 
        repos_group = self._get_repos_group(repos_group)
 
        try:
 

	
 
            # repo name is just a name of repository
 
            # while repo_name_full is a full qualified name that is combined
 
            # with name and path of group
 
            repo_name_full = repo_name
 
            repo_name = repo_name.split(self.URL_SEPARATOR)[-1]
 

	
 
            new_repo = Repository()
 
            new_repo.enable_statistics = False
 
            new_repo.repo_name = repo_name_full
 
            new_repo.repo_type = repo_type
 
            new_repo.user = owner
 
            new_repo.group = repos_group
 
            new_repo.description = description or repo_name
 
            new_repo.private = private
 
            new_repo.clone_uri = clone_uri
 
            new_repo.landing_rev = landing_rev
 

	
 
            new_repo.enable_statistics = enable_statistics
 
            new_repo.enable_locking = enable_locking
 
            new_repo.enable_downloads = enable_downloads
 

	
 
            if repos_group:
 
                new_repo.enable_locking = repos_group.enable_locking
 

	
 
            if fork_of:
 
                parent_repo = fork_of
 
                new_repo.fork = parent_repo
 

	
 
            self.sa.add(new_repo)
 

	
 
            def _create_default_perms():
 
                # create default permission
 
                repo_to_perm = UserRepoToPerm()
 
                default = 'repository.read'
 
                for p in User.get_by_username('default').user_perms:
 
                    if p.permission.permission_name.startswith('repository.'):
 
                        default = p.permission.permission_name
 
                        break
 

	
 
                default_perm = 'repository.none' if private else default
 

	
 
                repo_to_perm.permission_id = self.sa.query(Permission)\
 
                        .filter(Permission.permission_name == default_perm)\
 
                        .one().permission_id
 

	
 
                repo_to_perm.repository = new_repo
 
                repo_to_perm.user_id = User.get_by_username('default').user_id
 

	
 
                self.sa.add(repo_to_perm)
 

	
 
            if fork_of:
 
                if copy_fork_permissions:
 
                    repo = fork_of
 
                    user_perms = UserRepoToPerm.query()\
 
                        .filter(UserRepoToPerm.repository == repo).all()
 
                    group_perms = UsersGroupRepoToPerm.query()\
 
                        .filter(UsersGroupRepoToPerm.repository == repo).all()
 

	
 
                    for perm in user_perms:
 
                        UserRepoToPerm.create(perm.user, new_repo,
 
                                              perm.permission)
 

	
 
                    for perm in group_perms:
 
                        UsersGroupRepoToPerm.create(perm.users_group, new_repo,
 
                                                    perm.permission)
 
                else:
 
                    _create_default_perms()
 
            else:
 
                _create_default_perms()
 

	
 
            if not just_db:
 
                self.__create_repo(repo_name, repo_type,
 
                                   repos_group,
 
                                   clone_uri)
 
                log_create_repository(new_repo.get_dict(),
 
                                      created_by=owner.username)
 

	
 
            # now automatically start following this repository as owner
 
            ScmModel(self.sa).toggle_following_repo(new_repo.repo_id,
 
                                                    owner.user_id)
 
            return new_repo
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def create(self, form_data, cur_user, just_db=False, fork=None):
 
        """
 
        Backward compatibility function, just a wrapper on top of create_repo
 

	
 
        :param form_data:
 
        :param cur_user:
 
        :param just_db:
 
        :param fork:
 
        """
 
        owner = cur_user
 
        repo_name = form_data['repo_name_full']
 
        repo_type = form_data['repo_type']
 
        description = form_data['repo_description']
 
        private = form_data['repo_private']
 
        clone_uri = form_data.get('clone_uri')
 
        repos_group = form_data['repo_group']
 
        landing_rev = form_data['repo_landing_rev']
 
        copy_fork_permissions = form_data.get('copy_permissions')
 
        fork_of = form_data.get('fork_parent_id')
 

	
 
        ## repo creation defaults, private and repo_type are filled in form
 
        defs = RhodeCodeSetting.get_default_repo_settings(strip_prefix=True)
 
        enable_statistics = defs.get('repo_enable_statistics')
 
        enable_locking = defs.get('repo_enable_locking')
 
        enable_downloads = defs.get('repo_enable_downloads')
 

	
 
        return self.create_repo(
 
            repo_name, repo_type, description, owner, private, clone_uri,
 
            repos_group, landing_rev, just_db, fork_of, copy_fork_permissions,
 
            enable_statistics, enable_locking, enable_downloads
 
        )
 

	
 
    def create_fork(self, form_data, cur_user):
 
        """
 
        Simple wrapper into executing celery task for fork creation
 

	
 
        :param form_data:
 
        :param cur_user:
 
        """
 
        from rhodecode.lib.celerylib import tasks, run_task
 
        run_task(tasks.create_repo_fork, form_data, cur_user)
 

	
 
    def delete(self, repo):
 
        repo = self._get_repo(repo)
 
        if repo:
 
            old_repo_dict = repo.get_dict()
 
            owner = repo.user
 
            try:
 
                self.sa.delete(repo)
 
                self.__delete_repo(repo)
 
                log_delete_repository(old_repo_dict,
 
                                      deleted_by=owner.username)
 
            except:
 
                log.error(traceback.format_exc())
 
                raise
 

	
 
    def grant_user_permission(self, repo, user, perm):
 
        """
 
        Grant permission for user on given repository, or update existing one
 
        if found
 

	
 
        :param repo: Instance of Repository, repository_id, or repository name
 
        :param user: Instance of User, user_id or username
 
        :param perm: Instance of Permission, or permission_name
 
        """
 
        user = self._get_user(user)
 
        repo = self._get_repo(repo)
 
        permission = self._get_perm(perm)
 

	
 
        # check if we have that permission already
 
        obj = self.sa.query(UserRepoToPerm)\
 
            .filter(UserRepoToPerm.user == user)\
 
            .filter(UserRepoToPerm.repository == repo)\
 
            .scalar()
 
        if obj is None:
 
            # create new !
 
            obj = UserRepoToPerm()
 
        obj.repository = repo
 
        obj.user = user
 
        obj.permission = permission
 
        self.sa.add(obj)
 
        log.debug('Granted perm %s to %s on %s' % (perm, user, repo))
 

	
 
    def revoke_user_permission(self, repo, user):
 
        """
 
        Revoke permission for user on given repository
 

	
 
        :param repo: Instance of Repository, repository_id, or repository name
 
        :param user: Instance of User, user_id or username
 
        """
 

	
 
        user = self._get_user(user)
 
        repo = self._get_repo(repo)
 

	
 
        obj = self.sa.query(UserRepoToPerm)\
 
            .filter(UserRepoToPerm.repository == repo)\
 
            .filter(UserRepoToPerm.user == user)\
 
            .scalar()
 
        if obj:
 
            self.sa.delete(obj)
 
            log.debug('Revoked perm on %s on %s' % (repo, user))
 

	
 
    def grant_users_group_permission(self, repo, group_name, perm):
 
        """
 
        Grant permission for users group on given repository, or update
 
        Grant permission for user group on given repository, or update
 
        existing one if found
 

	
 
        :param repo: Instance of Repository, repository_id, or repository name
 
        :param group_name: Instance of UserGroup, users_group_id,
 
            or users group name
 
            or user group name
 
        :param perm: Instance of Permission, or permission_name
 
        """
 
        repo = self._get_repo(repo)
 
        group_name = self.__get_users_group(group_name)
 
        permission = self._get_perm(perm)
 

	
 
        # check if we have that permission already
 
        obj = self.sa.query(UsersGroupRepoToPerm)\
 
            .filter(UsersGroupRepoToPerm.users_group == group_name)\
 
            .filter(UsersGroupRepoToPerm.repository == repo)\
 
            .scalar()
 

	
 
        if obj is None:
 
            # create new
 
            obj = UsersGroupRepoToPerm()
 

	
 
        obj.repository = repo
 
        obj.users_group = group_name
 
        obj.permission = permission
 
        self.sa.add(obj)
 
        log.debug('Granted perm %s to %s on %s' % (perm, group_name, repo))
 

	
 
    def revoke_users_group_permission(self, repo, group_name):
 
        """
 
        Revoke permission for users group on given repository
 
        Revoke permission for user group on given repository
 

	
 
        :param repo: Instance of Repository, repository_id, or repository name
 
        :param group_name: Instance of UserGroup, users_group_id,
 
            or users group name
 
            or user group name
 
        """
 
        repo = self._get_repo(repo)
 
        group_name = self.__get_users_group(group_name)
 

	
 
        obj = self.sa.query(UsersGroupRepoToPerm)\
 
            .filter(UsersGroupRepoToPerm.repository == repo)\
 
            .filter(UsersGroupRepoToPerm.users_group == group_name)\
 
            .scalar()
 
        if obj:
 
            self.sa.delete(obj)
 
            log.debug('Revoked perm to %s on %s' % (repo, group_name))
 

	
 
    def delete_stats(self, repo_name):
 
        """
 
        removes stats for given repo
 

	
 
        :param repo_name:
 
        """
 
        repo = self._get_repo(repo_name)
 
        try:
 
            obj = self.sa.query(Statistics)\
 
                    .filter(Statistics.repository == repo).scalar()
 
            if obj:
 
                self.sa.delete(obj)
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def __create_repo(self, repo_name, alias, parent, clone_uri=False):
 
        """
 
        makes repository on filesystem. It's group aware means it'll create
 
        a repository within a group, and alter the paths accordingly of
 
        group location
 

	
 
        :param repo_name:
 
        :param alias:
 
        :param parent_id:
 
        :param clone_uri:
 
        """
 
        from rhodecode.lib.utils import is_valid_repo, is_valid_repos_group
 
        from rhodecode.model.scm import ScmModel
 

	
 
        if parent:
 
            new_parent_path = os.sep.join(parent.full_path_splitted)
 
        else:
 
            new_parent_path = ''
 

	
 
        # we need to make it str for mercurial
 
        repo_path = os.path.join(*map(lambda x: safe_str(x),
 
                                [self.repos_path, new_parent_path, repo_name]))
 

	
 
        # check if this path is not a repository
 
        if is_valid_repo(repo_path, self.repos_path):
 
            raise Exception('This path %s is a valid repository' % repo_path)
 

	
 
        # check if this path is a group
 
        if is_valid_repos_group(repo_path, self.repos_path):
 
            raise Exception('This path %s is a valid group' % repo_path)
 

	
 
        log.info('creating repo %s in %s @ %s' % (
 
                     repo_name, safe_unicode(repo_path), clone_uri
 
                )
 
        )
 
        backend = get_backend(alias)
 
        if alias == 'hg':
 
            backend(repo_path, create=True, src_url=clone_uri)
 
        elif alias == 'git':
 
            r = backend(repo_path, create=True, src_url=clone_uri, bare=True)
 
            # add rhodecode hook into this repo
 
            ScmModel().install_git_hook(repo=r)
 
        else:
 
            raise Exception('Undefined alias %s' % alias)
 

	
 
    def __rename_repo(self, old, new):
 
        """
 
        renames repository on filesystem
 

	
 
        :param old: old name
 
        :param new: new name
 
        """
 
        log.info('renaming repo from %s to %s' % (old, new))
 

	
 
        old_path = os.path.join(self.repos_path, old)
 
        new_path = os.path.join(self.repos_path, new)
 
        if os.path.isdir(new_path):
 
            raise Exception(
 
                'Was trying to rename to already existing dir %s' % new_path
 
            )
 
        shutil.move(old_path, new_path)
 

	
 
    def __delete_repo(self, repo):
 
        """
 
        removes repo from filesystem, the removal is acctually made by
 
        added rm__ prefix into dir, and rename internat .hg/.git dirs so this
 
        repository is no longer valid for rhodecode, can be undeleted later on
 
        by reverting the renames on this repository
 

	
 
        :param repo: repo object
 
        """
 
        rm_path = os.path.join(self.repos_path, repo.repo_name)
 
        log.info("Removing %s" % (rm_path))
 
        # disable hg/git internal that it doesn't get detected as repo
 
        alias = repo.repo_type
 

	
 
        bare = getattr(repo.scm_instance, 'bare', False)
 

	
 
        if not bare:
 
            # skip this for bare git repos
 
            shutil.move(os.path.join(rm_path, '.%s' % alias),
 
                        os.path.join(rm_path, 'rm__.%s' % alias))
 
        # disable repo
 
        _now = datetime.now()
 
        _ms = str(_now.microsecond).rjust(6, '0')
 
        _d = 'rm__%s__%s' % (_now.strftime('%Y%m%d_%H%M%S_' + _ms),
 
                             repo.just_name)
 
        if repo.group:
 
            args = repo.group.full_path_splitted + [_d]
 
            _d = os.path.join(*args)
 
        shutil.move(rm_path, os.path.join(self.repos_path, _d))
rhodecode/model/user.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.model.user
 
    ~~~~~~~~~~~~~~~~~~~~
 

	
 
    users model for RhodeCode
 

	
 
    :created_on: Apr 9, 2010
 
    :author: marcink
 
    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 
import traceback
 
import itertools
 
import collections
 
from pylons import url
 
from pylons.i18n.translation import _
 

	
 
from sqlalchemy.exc import DatabaseError
 
from sqlalchemy.orm import joinedload
 

	
 
from rhodecode.lib.utils2 import safe_unicode, generate_api_key
 
from rhodecode.lib.caching_query import FromCache
 
from rhodecode.model import BaseModel
 
from rhodecode.model.db import User, UserRepoToPerm, Repository, Permission, \
 
    UserToPerm, UsersGroupRepoToPerm, UsersGroupToPerm, UsersGroupMember, \
 
    Notification, RepoGroup, UserRepoGroupToPerm, UsersGroupRepoGroupToPerm, \
 
    UserEmailMap, UserIpMap
 
from rhodecode.lib.exceptions import DefaultUserException, \
 
    UserOwnsReposException
 
from rhodecode.model.meta import Session
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
PERM_WEIGHTS = Permission.PERM_WEIGHTS
 

	
 

	
 
class UserModel(BaseModel):
 
    cls = User
 

	
 
    def get(self, user_id, cache=False):
 
        user = self.sa.query(User)
 
        if cache:
 
            user = user.options(FromCache("sql_cache_short",
 
                                          "get_user_%s" % user_id))
 
        return user.get(user_id)
 

	
 
    def get_user(self, user):
 
        return self._get_user(user)
 

	
 
    def get_by_username(self, username, cache=False, case_insensitive=False):
 

	
 
        if case_insensitive:
 
            user = self.sa.query(User).filter(User.username.ilike(username))
 
        else:
 
            user = self.sa.query(User)\
 
                .filter(User.username == username)
 
        if cache:
 
            user = user.options(FromCache("sql_cache_short",
 
                                          "get_user_%s" % username))
 
        return user.scalar()
 

	
 
    def get_by_email(self, email, cache=False, case_insensitive=False):
 
        return User.get_by_email(email, case_insensitive, cache)
 

	
 
    def get_by_api_key(self, api_key, cache=False):
 
        return User.get_by_api_key(api_key, cache)
 

	
 
    def create(self, form_data):
 
        from rhodecode.lib.auth import get_crypt_password
 
        try:
 
            new_user = User()
 
            for k, v in form_data.items():
 
                if k == 'password':
 
                    v = get_crypt_password(v)
 
                if k == 'firstname':
 
                    k = 'name'
 
                setattr(new_user, k, v)
 

	
 
            new_user.api_key = generate_api_key(form_data['username'])
 
            self.sa.add(new_user)
 
            return new_user
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def create_or_update(self, username, password, email, firstname='',
 
                         lastname='', active=True, admin=False, ldap_dn=None):
 
        """
 
        Creates a new instance if not found, or updates current one
 

	
 
        :param username:
 
        :param password:
 
        :param email:
 
        :param active:
 
        :param firstname:
 
        :param lastname:
 
        :param active:
 
        :param admin:
 
        :param ldap_dn:
 
        """
 

	
 
        from rhodecode.lib.auth import get_crypt_password
 

	
 
        log.debug('Checking for %s account in RhodeCode database' % username)
 
        user = User.get_by_username(username, case_insensitive=True)
 
        if user is None:
 
            log.debug('creating new user %s' % username)
 
            new_user = User()
 
            edit = False
 
        else:
 
            log.debug('updating user %s' % username)
 
            new_user = user
 
            edit = True
 

	
 
        try:
 
            new_user.username = username
 
            new_user.admin = admin
 
            # set password only if creating an user or password is changed
 
            if edit is False or user.password != password:
 
                new_user.password = get_crypt_password(password)
 
                new_user.api_key = generate_api_key(username)
 
            new_user.email = email
 
            new_user.active = active
 
            new_user.ldap_dn = safe_unicode(ldap_dn) if ldap_dn else None
 
            new_user.name = firstname
 
            new_user.lastname = lastname
 
            self.sa.add(new_user)
 
            return new_user
 
        except (DatabaseError,):
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def create_for_container_auth(self, username, attrs):
 
        """
 
        Creates the given user if it's not already in the database
 

	
 
        :param username:
 
        :param attrs:
 
        """
 
        if self.get_by_username(username, case_insensitive=True) is None:
 

	
 
            # autogenerate email for container account without one
 
            generate_email = lambda usr: '%s@container_auth.account' % usr
 

	
 
            try:
 
                new_user = User()
 
                new_user.username = username
 
                new_user.password = None
 
                new_user.api_key = generate_api_key(username)
 
                new_user.email = attrs['email']
 
                new_user.active = attrs.get('active', True)
 
                new_user.name = attrs['name'] or generate_email(username)
 
                new_user.lastname = attrs['lastname']
 

	
 
                self.sa.add(new_user)
 
                return new_user
 
            except (DatabaseError,):
 
                log.error(traceback.format_exc())
 
                self.sa.rollback()
 
                raise
 
        log.debug('User %s already exists. Skipping creation of account'
 
                  ' for container auth.', username)
 
        return None
 

	
 
    def create_ldap(self, username, password, user_dn, attrs):
 
        """
 
        Checks if user is in database, if not creates this user marked
 
        as ldap user
 

	
 
        :param username:
 
        :param password:
 
        :param user_dn:
 
        :param attrs:
 
        """
 
        from rhodecode.lib.auth import get_crypt_password
 
        log.debug('Checking for such ldap account in RhodeCode database')
 
        if self.get_by_username(username, case_insensitive=True) is None:
 

	
 
            # autogenerate email for ldap account without one
 
            generate_email = lambda usr: '%s@ldap.account' % usr
 

	
 
            try:
 
                new_user = User()
 
                username = username.lower()
 
                # add ldap account always lowercase
 
                new_user.username = username
 
                new_user.password = get_crypt_password(password)
 
                new_user.api_key = generate_api_key(username)
 
                new_user.email = attrs['email'] or generate_email(username)
 
                new_user.active = attrs.get('active', True)
 
                new_user.ldap_dn = safe_unicode(user_dn)
 
                new_user.name = attrs['name']
 
                new_user.lastname = attrs['lastname']
 

	
 
                self.sa.add(new_user)
 
                return new_user
 
            except (DatabaseError,):
 
                log.error(traceback.format_exc())
 
                self.sa.rollback()
 
                raise
 
        log.debug('this %s user exists skipping creation of ldap account',
 
                  username)
 
        return None
 

	
 
    def create_registration(self, form_data):
 
        from rhodecode.model.notification import NotificationModel
 

	
 
        try:
 
            form_data['admin'] = False
 
            new_user = self.create(form_data)
 

	
 
            self.sa.add(new_user)
 
            self.sa.flush()
 

	
 
            # notification to admins
 
            subject = _('new user registration')
 
            body = ('New user registration\n'
 
                    '---------------------\n'
 
                    '- Username: %s\n'
 
                    '- Full Name: %s\n'
 
                    '- Email: %s\n')
 
            body = body % (new_user.username, new_user.full_name,
 
                           new_user.email)
 
            edit_url = url('edit_user', id=new_user.user_id, qualified=True)
 
            kw = {'registered_user_url': edit_url}
 
            NotificationModel().create(created_by=new_user, subject=subject,
 
                                       body=body, recipients=None,
 
                                       type_=Notification.TYPE_REGISTRATION,
 
                                       email_kwargs=kw)
 

	
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def update(self, user_id, form_data, skip_attrs=[]):
 
        from rhodecode.lib.auth import get_crypt_password
 
        try:
 
            user = self.get(user_id, cache=False)
 
            if user.username == 'default':
 
                raise DefaultUserException(
 
                                _("You can't Edit this user since it's"
 
                                  " crucial for entire application"))
 

	
 
            for k, v in form_data.items():
 
                if k in skip_attrs:
 
                    continue
 
                if k == 'new_password' and v:
 
                    user.password = get_crypt_password(v)
 
                    user.api_key = generate_api_key(user.username)
 
                else:
 
                    if k == 'firstname':
 
                        k = 'name'
 
                    setattr(user, k, v)
 
            self.sa.add(user)
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def update_user(self, user, **kwargs):
 
        from rhodecode.lib.auth import get_crypt_password
 
        try:
 
            user = self._get_user(user)
 
            if user.username == 'default':
 
                raise DefaultUserException(
 
                    _("You can't Edit this user since it's"
 
                      " crucial for entire application")
 
                )
 

	
 
            for k, v in kwargs.items():
 
                if k == 'password' and v:
 
                    v = get_crypt_password(v)
 
                    user.api_key = generate_api_key(user.username)
 

	
 
                setattr(user, k, v)
 
            self.sa.add(user)
 
            return user
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def delete(self, user):
 
        user = self._get_user(user)
 

	
 
        try:
 
            if user.username == 'default':
 
                raise DefaultUserException(
 
                    _(u"You can't remove this user since it's"
 
                      " crucial for entire application")
 
                )
 
            if user.repositories:
 
                repos = [x.repo_name for x in user.repositories]
 
                raise UserOwnsReposException(
 
                    _(u'user "%s" still owns %s repositories and cannot be '
 
                      'removed. Switch owners or remove those repositories. %s')
 
                    % (user.username, len(repos), ', '.join(repos))
 
                )
 
            self.sa.delete(user)
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def reset_password_link(self, data):
 
        from rhodecode.lib.celerylib import tasks, run_task
 
        from rhodecode.model.notification import EmailNotificationModel
 
        user_email = data['email']
 
        try:
 
            user = User.get_by_email(user_email)
 
            if user:
 
                log.debug('password reset user found %s' % user)
 
                link = url('reset_password_confirmation', key=user.api_key,
 
                           qualified=True)
 
                reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET
 
                body = EmailNotificationModel().get_email_tmpl(reg_type,
 
                                                    **{'user': user.short_contact,
 
                                                       'reset_url': link})
 
                log.debug('sending email')
 
                run_task(tasks.send_email, user_email,
 
                         _("password reset link"), body, body)
 
                log.info('send new password mail to %s' % user_email)
 
            else:
 
                log.debug("password reset email %s not found" % user_email)
 
        except:
 
            log.error(traceback.format_exc())
 
            return False
 

	
 
        return True
 

	
 
    def reset_password(self, data):
 
        from rhodecode.lib.celerylib import tasks, run_task
 
        from rhodecode.lib import auth
 
        user_email = data['email']
 
        try:
 
            try:
 
                user = User.get_by_email(user_email)
 
                new_passwd = auth.PasswordGenerator().gen_password(8,
 
                                 auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
 
                if user:
 
                    user.password = auth.get_crypt_password(new_passwd)
 
                    user.api_key = auth.generate_api_key(user.username)
 
                    Session().add(user)
 
                    Session().commit()
 
                    log.info('change password for %s' % user_email)
 
                if new_passwd is None:
 
                    raise Exception('unable to generate new password')
 
            except:
 
                log.error(traceback.format_exc())
 
                Session().rollback()
 

	
 
            run_task(tasks.send_email, user_email,
 
                     _('Your new password'),
 
                     _('Your new RhodeCode password:%s') % (new_passwd))
 
            log.info('send new password mail to %s' % user_email)
 

	
 
        except:
 
            log.error('Failed to update user password')
 
            log.error(traceback.format_exc())
 

	
 
        return True
 

	
 
    def fill_data(self, auth_user, user_id=None, api_key=None):
 
        """
 
        Fetches auth_user by user_id,or api_key if present.
 
        Fills auth_user attributes with those taken from database.
 
        Additionally set's is_authenitated if lookup fails
 
        present in database
 

	
 
        :param auth_user: instance of user to set attributes
 
        :param user_id: user id to fetch by
 
        :param api_key: api key to fetch by
 
        """
 
        if user_id is None and api_key is None:
 
            raise Exception('You need to pass user_id or api_key')
 

	
 
        try:
 
            if api_key:
 
                dbuser = self.get_by_api_key(api_key)
 
            else:
 
                dbuser = self.get(user_id)
 

	
 
            if dbuser is not None and dbuser.active:
 
                log.debug('filling %s data' % dbuser)
 
                for k, v in dbuser.get_dict().items():
 
                    setattr(auth_user, k, v)
 
            else:
 
                return False
 

	
 
        except:
 
            log.error(traceback.format_exc())
 
            auth_user.is_authenticated = False
 
            return False
 

	
 
        return True
 

	
 
    def fill_perms(self, user, explicit=True, algo='higherwin'):
 
        """
 
        Fills user permission attribute with permissions taken from database
 
        works for permissions given for repositories, and for permissions that
 
        are granted to groups
 

	
 
        :param user: user instance to fill his perms
 
        :param explicit: In case there are permissions both for user and a group
 
            that user is part of, explicit flag will defiine if user will
 
            explicitly override permissions from group, if it's False it will
 
            make decision based on the algo
 
        :param algo: algorithm to decide what permission should be choose if
 
            it's multiple defined, eg user in two different groups. It also
 
            decides if explicit flag is turned off how to specify the permission
 
            for case when user is in a group + have defined separate permission
 
        """
 
        RK = 'repositories'
 
        GK = 'repositories_groups'
 
        GLOBAL = 'global'
 
        user.permissions[RK] = {}
 
        user.permissions[GK] = {}
 
        user.permissions[GLOBAL] = set()
 

	
 
        def _choose_perm(new_perm, cur_perm):
 
            new_perm_val = PERM_WEIGHTS[new_perm]
 
            cur_perm_val = PERM_WEIGHTS[cur_perm]
 
            if algo == 'higherwin':
 
                if new_perm_val > cur_perm_val:
 
                    return new_perm
 
                return cur_perm
 
            elif algo == 'lowerwin':
 
                if new_perm_val < cur_perm_val:
 
                    return new_perm
 
                return cur_perm
 

	
 
        #======================================================================
 
        # fetch default permissions
 
        #======================================================================
 
        default_user = User.get_by_username('default', cache=True)
 
        default_user_id = default_user.user_id
 

	
 
        default_repo_perms = Permission.get_default_perms(default_user_id)
 
        default_repo_groups_perms = Permission.get_default_group_perms(default_user_id)
 

	
 
        if user.is_admin:
 
            #==================================================================
 
            # admin user have all default rights for repositories
 
            # and groups set to admin
 
            #==================================================================
 
            user.permissions[GLOBAL].add('hg.admin')
 

	
 
            # repositories
 
            for perm in default_repo_perms:
 
                r_k = perm.UserRepoToPerm.repository.repo_name
 
                p = 'repository.admin'
 
                user.permissions[RK][r_k] = p
 

	
 
            # repository groups
 
            for perm in default_repo_groups_perms:
 
                rg_k = perm.UserRepoGroupToPerm.group.group_name
 
                p = 'group.admin'
 
                user.permissions[GK][rg_k] = p
 
            return user
 

	
 
        #==================================================================
 
        # SET DEFAULTS GLOBAL, REPOS, REPOS GROUPS
 
        #==================================================================
 
        uid = user.user_id
 

	
 
        # default global permissions taken fron the default user
 
        default_global_perms = self.sa.query(UserToPerm)\
 
            .filter(UserToPerm.user_id == default_user_id)
 

	
 
        for perm in default_global_perms:
 
            user.permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
        # defaults for repositories, taken from default user
 
        for perm in default_repo_perms:
 
            r_k = perm.UserRepoToPerm.repository.repo_name
 
            if perm.Repository.private and not (perm.Repository.user_id == uid):
 
                # disable defaults for private repos,
 
                p = 'repository.none'
 
            elif perm.Repository.user_id == uid:
 
                # set admin if owner
 
                p = 'repository.admin'
 
            else:
 
                p = perm.Permission.permission_name
 

	
 
            user.permissions[RK][r_k] = p
 

	
 
        # defaults for repository groups taken from default user permission
 
        # on given group
 
        for perm in default_repo_groups_perms:
 
            rg_k = perm.UserRepoGroupToPerm.group.group_name
 
            p = perm.Permission.permission_name
 
            user.permissions[GK][rg_k] = p
 

	
 
        #======================================================================
 
        # !! OVERRIDE GLOBALS !! with user permissions if any found
 
        #======================================================================
 
        # those can be configured from groups or users explicitly
 
        _configurable = set(['hg.fork.none', 'hg.fork.repository',
 
                             'hg.create.none', 'hg.create.repository'])
 

	
 
        # USER GROUPS comes first
 
        # users group global permissions
 
        # user group global permissions
 
        user_perms_from_users_groups = self.sa.query(UsersGroupToPerm)\
 
            .options(joinedload(UsersGroupToPerm.permission))\
 
            .join((UsersGroupMember, UsersGroupToPerm.users_group_id ==
 
                   UsersGroupMember.users_group_id))\
 
            .filter(UsersGroupMember.user_id == uid)\
 
            .order_by(UsersGroupToPerm.users_group_id)\
 
            .all()
 
        #need to group here by groups since user can be in more than one group
 
        _grouped = [[x, list(y)] for x, y in
 
                    itertools.groupby(user_perms_from_users_groups,
 
                                      lambda x:x.users_group)]
 
        for gr, perms in _grouped:
 
            # since user can be in multiple groups iterate over them and
 
            # select the lowest permissions first (more explicit)
 
            ##TODO: do this^^
 
            if not gr.inherit_default_permissions:
 
                # NEED TO IGNORE all configurable permissions and
 
                # replace them with explicitly set
 
                user.permissions[GLOBAL] = user.permissions[GLOBAL]\
 
                                                .difference(_configurable)
 
            for perm in perms:
 
                user.permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
        # user specific global permissions
 
        user_perms = self.sa.query(UserToPerm)\
 
                .options(joinedload(UserToPerm.permission))\
 
                .filter(UserToPerm.user_id == uid).all()
 

	
 
        if not user.inherit_default_permissions:
 
            # NEED TO IGNORE all configurable permissions and
 
            # replace them with explicitly set
 
            user.permissions[GLOBAL] = user.permissions[GLOBAL]\
 
                                            .difference(_configurable)
 

	
 
            for perm in user_perms:
 
                user.permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
        #======================================================================
 
        # !! PERMISSIONS FOR REPOSITORIES !!
 
        #======================================================================
 
        #======================================================================
 
        # check if user is part of user groups for this repository and
 
        # fill in his permission from it. _choose_perm decides of which
 
        # permission should be selected based on selected method
 
        #======================================================================
 

	
 
        # users group for repositories permissions
 
        # user group for repositories permissions
 
        user_repo_perms_from_users_groups = \
 
         self.sa.query(UsersGroupRepoToPerm, Permission, Repository,)\
 
            .join((Repository, UsersGroupRepoToPerm.repository_id ==
 
                   Repository.repo_id))\
 
            .join((Permission, UsersGroupRepoToPerm.permission_id ==
 
                   Permission.permission_id))\
 
            .join((UsersGroupMember, UsersGroupRepoToPerm.users_group_id ==
 
                   UsersGroupMember.users_group_id))\
 
            .filter(UsersGroupMember.user_id == uid)\
 
            .all()
 

	
 
        multiple_counter = collections.defaultdict(int)
 
        for perm in user_repo_perms_from_users_groups:
 
            r_k = perm.UsersGroupRepoToPerm.repository.repo_name
 
            multiple_counter[r_k] += 1
 
            p = perm.Permission.permission_name
 
            cur_perm = user.permissions[RK][r_k]
 

	
 
            if perm.Repository.user_id == uid:
 
                # set admin if owner
 
                p = 'repository.admin'
 
            else:
 
                if multiple_counter[r_k] > 1:
 
                    p = _choose_perm(p, cur_perm)
 
            user.permissions[RK][r_k] = p
 

	
 
        # user explicit permissions for repositories, overrides any specified
 
        # by the group permission
 
        user_repo_perms = \
 
         self.sa.query(UserRepoToPerm, Permission, Repository)\
 
            .join((Repository, UserRepoToPerm.repository_id ==
 
                   Repository.repo_id))\
 
            .join((Permission, UserRepoToPerm.permission_id ==
 
                   Permission.permission_id))\
 
            .filter(UserRepoToPerm.user_id == uid)\
 
            .all()
 

	
 
        for perm in user_repo_perms:
 
            r_k = perm.UserRepoToPerm.repository.repo_name
 
            cur_perm = user.permissions[RK][r_k]
 
            # set admin if owner
 
            if perm.Repository.user_id == uid:
 
                p = 'repository.admin'
 
            else:
 
                p = perm.Permission.permission_name
 
                if not explicit:
 
                    p = _choose_perm(p, cur_perm)
 
            user.permissions[RK][r_k] = p
 

	
 
        #======================================================================
 
        # !! PERMISSIONS FOR REPOSITORY GROUPS !!
 
        #======================================================================
 
        #======================================================================
 
        # check if user is part of user groups for this repository groups and
 
        # fill in his permission from it. _choose_perm decides of which
 
        # permission should be selected based on selected method
 
        #======================================================================
 
        # users group for repo groups permissions
 
        # user group for repo groups permissions
 
        user_repo_group_perms_from_users_groups = \
 
         self.sa.query(UsersGroupRepoGroupToPerm, Permission, RepoGroup)\
 
         .join((RepoGroup, UsersGroupRepoGroupToPerm.group_id == RepoGroup.group_id))\
 
         .join((Permission, UsersGroupRepoGroupToPerm.permission_id
 
                == Permission.permission_id))\
 
         .join((UsersGroupMember, UsersGroupRepoGroupToPerm.users_group_id
 
                == UsersGroupMember.users_group_id))\
 
         .filter(UsersGroupMember.user_id == uid)\
 
         .all()
 

	
 
        multiple_counter = collections.defaultdict(int)
 
        for perm in user_repo_group_perms_from_users_groups:
 
            g_k = perm.UsersGroupRepoGroupToPerm.group.group_name
 
            multiple_counter[g_k] += 1
 
            p = perm.Permission.permission_name
 
            cur_perm = user.permissions[GK][g_k]
 
            if multiple_counter[g_k] > 1:
 
                p = _choose_perm(p, cur_perm)
 
            user.permissions[GK][g_k] = p
 

	
 
        # user explicit permissions for repository groups
 
        user_repo_groups_perms = \
 
         self.sa.query(UserRepoGroupToPerm, Permission, RepoGroup)\
 
         .join((RepoGroup, UserRepoGroupToPerm.group_id == RepoGroup.group_id))\
 
         .join((Permission, UserRepoGroupToPerm.permission_id
 
                == Permission.permission_id))\
 
         .filter(UserRepoGroupToPerm.user_id == uid)\
 
         .all()
 

	
 
        for perm in user_repo_groups_perms:
 
            rg_k = perm.UserRepoGroupToPerm.group.group_name
 
            p = perm.Permission.permission_name
 
            cur_perm = user.permissions[GK][rg_k]
 
            if not explicit:
 
                p = _choose_perm(p, cur_perm)
 
            user.permissions[GK][rg_k] = p
 

	
 
        return user
 

	
 
    def has_perm(self, user, perm):
 
        perm = self._get_perm(perm)
 
        user = self._get_user(user)
 

	
 
        return UserToPerm.query().filter(UserToPerm.user == user)\
 
            .filter(UserToPerm.permission == perm).scalar() is not None
 

	
 
    def grant_perm(self, user, perm):
 
        """
 
        Grant user global permissions
 

	
 
        :param user:
 
        :param perm:
 
        """
 
        user = self._get_user(user)
 
        perm = self._get_perm(perm)
 
        # if this permission is already granted skip it
 
        _perm = UserToPerm.query()\
 
            .filter(UserToPerm.user == user)\
 
            .filter(UserToPerm.permission == perm)\
 
            .scalar()
 
        if _perm:
 
            return
 
        new = UserToPerm()
 
        new.user = user
 
        new.permission = perm
 
        self.sa.add(new)
 

	
 
    def revoke_perm(self, user, perm):
 
        """
 
        Revoke users global permissions
 

	
 
        :param user:
 
        :param perm:
 
        """
 
        user = self._get_user(user)
 
        perm = self._get_perm(perm)
 

	
 
        obj = UserToPerm.query()\
 
                .filter(UserToPerm.user == user)\
 
                .filter(UserToPerm.permission == perm)\
 
                .scalar()
 
        if obj:
 
            self.sa.delete(obj)
 

	
 
    def add_extra_email(self, user, email):
 
        """
 
        Adds email address to UserEmailMap
 

	
 
        :param user:
 
        :param email:
 
        """
 
        from rhodecode.model import forms
 
        form = forms.UserExtraEmailForm()()
 
        data = form.to_python(dict(email=email))
 
        user = self._get_user(user)
 

	
 
        obj = UserEmailMap()
 
        obj.user = user
 
        obj.email = data['email']
 
        self.sa.add(obj)
 
        return obj
 

	
 
    def delete_extra_email(self, user, email_id):
 
        """
 
        Removes email address from UserEmailMap
 

	
 
        :param user:
 
        :param email_id:
 
        """
 
        user = self._get_user(user)
 
        obj = UserEmailMap.query().get(email_id)
 
        if obj:
 
            self.sa.delete(obj)
 

	
 
    def add_extra_ip(self, user, ip):
 
        """
 
        Adds ip address to UserIpMap
 

	
 
        :param user:
 
        :param ip:
 
        """
 
        from rhodecode.model import forms
 
        form = forms.UserExtraIpForm()()
 
        data = form.to_python(dict(ip=ip))
 
        user = self._get_user(user)
 

	
 
        obj = UserIpMap()
 
        obj.user = user
 
        obj.ip_addr = data['ip']
 
        self.sa.add(obj)
 
        return obj
 

	
 
    def delete_extra_ip(self, user, ip_id):
 
        """
 
        Removes ip address from UserIpMap
 

	
 
        :param user:
 
        :param ip_id:
 
        """
 
        user = self._get_user(user)
 
        obj = UserIpMap.query().get(ip_id)
 
        if obj:
 
            self.sa.delete(obj)
rhodecode/model/validators.py
Show inline comments
 
"""
 
Set of generic validators
 
"""
 
import os
 
import re
 
import formencode
 
import logging
 
from collections import defaultdict
 
from pylons.i18n.translation import _
 
from webhelpers.pylonslib.secure_form import authentication_token
 

	
 
from formencode.validators import (
 
    UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set,
 
    NotEmpty, IPAddress, CIDR
 
)
 
from rhodecode.lib.compat import OrderedSet
 
from rhodecode.lib import ipaddr
 
from rhodecode.lib.utils import repo_name_slug
 
from rhodecode.model.db import RepoGroup, Repository, UsersGroup, User,\
 
    ChangesetStatus
 
from rhodecode.lib.exceptions import LdapImportError
 
from rhodecode.config.routing import ADMIN_PREFIX
 
from rhodecode.lib.auth import HasReposGroupPermissionAny, HasPermissionAny
 

	
 
# silence warnings and pylint
 
UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \
 
    NotEmpty, IPAddress, CIDR
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UniqueList(formencode.FancyValidator):
 
    """
 
    Unique List !
 
    """
 
    messages = dict(
 
        empty=_('Value cannot be an empty list'),
 
        missing_value=_('Value cannot be an empty list'),
 
    )
 

	
 
    def _to_python(self, value, state):
 
        if isinstance(value, list):
 
            return value
 
        elif isinstance(value, set):
 
            return list(value)
 
        elif isinstance(value, tuple):
 
            return list(value)
 
        elif value is None:
 
            return []
 
        else:
 
            return [value]
 

	
 
    def empty_value(self, value):
 
        return []
 

	
 

	
 
class StateObj(object):
 
    """
 
    this is needed to translate the messages using _() in validators
 
    """
 
    _ = staticmethod(_)
 

	
 

	
 
def M(self, key, state=None, **kwargs):
 
    """
 
    returns string from self.message based on given key,
 
    passed kw params are used to substitute %(named)s params inside
 
    translated strings
 

	
 
    :param msg:
 
    :param state:
 
    """
 
    if state is None:
 
        state = StateObj()
 
    else:
 
        state._ = staticmethod(_)
 
    #inject validator into state object
 
    return self.message(key, state, **kwargs)
 

	
 

	
 
def ValidUsername(edit=False, old_data={}):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'username_exists': _(u'Username "%(username)s" already exists'),
 
            'system_invalid_username':
 
                _(u'Username "%(username)s" is forbidden'),
 
            'invalid_username':
 
                _(u'Username may only contain alphanumeric characters '
 
                  'underscores, periods or dashes and must begin with '
 
                  'alphanumeric character')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            if value in ['default', 'new_user']:
 
                msg = M(self, 'system_invalid_username', state, username=value)
 
                raise formencode.Invalid(msg, value, state)
 
            #check if user is unique
 
            old_un = None
 
            if edit:
 
                old_un = User.get(old_data.get('user_id')).username
 

	
 
            if old_un != value or not edit:
 
                if User.get_by_username(value, case_insensitive=True):
 
                    msg = M(self, 'username_exists', state, username=value)
 
                    raise formencode.Invalid(msg, value, state)
 

	
 
            if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]*$', value) is None:
 
                msg = M(self, 'invalid_username', state)
 
                raise formencode.Invalid(msg, value, state)
 
    return _validator
 

	
 

	
 
def ValidRepoUser():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_username': _(u'Username %(username)s is not valid')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            try:
 
                User.query().filter(User.active == True)\
 
                    .filter(User.username == value).one()
 
            except Exception:
 
                msg = M(self, 'invalid_username', state, username=value)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(username=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidUsersGroup(edit=False, old_data={}):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_group': _(u'Invalid users group name'),
 
            'invalid_group': _(u'Invalid user group name'),
 
            'group_exist': _(u'Users group "%(usersgroup)s" already exists'),
 
            'invalid_usersgroup_name':
 
                _(u'users group name may only contain  alphanumeric '
 
                _(u'user group name may only contain  alphanumeric '
 
                  'characters underscores, periods or dashes and must begin '
 
                  'with alphanumeric character')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            if value in ['default']:
 
                msg = M(self, 'invalid_group', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(users_group_name=msg)
 
                )
 
            #check if group is unique
 
            old_ugname = None
 
            if edit:
 
                old_id = old_data.get('users_group_id')
 
                old_ugname = UsersGroup.get(old_id).users_group_name
 

	
 
            if old_ugname != value or not edit:
 
                is_existing_group = UsersGroup.get_by_group_name(value,
 
                                                        case_insensitive=True)
 
                if is_existing_group:
 
                    msg = M(self, 'group_exist', state, usersgroup=value)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(users_group_name=msg)
 
                    )
 

	
 
            if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
 
                msg = M(self, 'invalid_usersgroup_name', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(users_group_name=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidReposGroup(edit=False, old_data={}):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'group_parent_id': _(u'Cannot assign this group as parent'),
 
            'group_exists': _(u'Group "%(group_name)s" already exists'),
 
            'repo_exists':
 
                _(u'Repository with name "%(group_name)s" already exists')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            # TODO WRITE VALIDATIONS
 
            group_name = value.get('group_name')
 
            group_parent_id = value.get('group_parent_id')
 

	
 
            # slugify repo group just in case :)
 
            slug = repo_name_slug(group_name)
 

	
 
            # check for parent of self
 
            parent_of_self = lambda: (
 
                old_data['group_id'] == int(group_parent_id)
 
                if group_parent_id else False
 
            )
 
            if edit and parent_of_self():
 
                msg = M(self, 'group_parent_id', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(group_parent_id=msg)
 
                )
 

	
 
            old_gname = None
 
            if edit:
 
                old_gname = RepoGroup.get(old_data.get('group_id')).group_name
 

	
 
            if old_gname != group_name or not edit:
 

	
 
                # check group
 
                gr = RepoGroup.query()\
 
                      .filter(RepoGroup.group_name == slug)\
 
                      .filter(RepoGroup.group_parent_id == group_parent_id)\
 
                      .scalar()
 

	
 
                if gr:
 
                    msg = M(self, 'group_exists', state, group_name=slug)
 
                    raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(group_name=msg)
 
                    )
 

	
 
                # check for same repo
 
                repo = Repository.query()\
 
                      .filter(Repository.repo_name == slug)\
 
                      .scalar()
 

	
 
                if repo:
 
                    msg = M(self, 'repo_exists', state, group_name=slug)
 
                    raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(group_name=msg)
 
                    )
 

	
 
    return _validator
 

	
 

	
 
def ValidPassword():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_password':
 
                _(u'Invalid characters (non-ascii) in password')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            try:
 
                (value or '').decode('ascii')
 
            except UnicodeError:
 
                msg = M(self, 'invalid_password', state)
 
                raise formencode.Invalid(msg, value, state,)
 
    return _validator
 

	
 

	
 
def ValidPasswordsMatch():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'password_mismatch': _(u'Passwords do not match'),
 
        }
 

	
 
        def validate_python(self, value, state):
 

	
 
            pass_val = value.get('password') or value.get('new_password')
 
            if pass_val != value['password_confirmation']:
 
                msg = M(self, 'password_mismatch', state)
 
                raise formencode.Invalid(msg, value, state,
 
                     error_dict=dict(password_confirmation=msg)
 
                )
 
    return _validator
 

	
 

	
 
def ValidAuth():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_password': _(u'invalid password'),
 
            'invalid_username': _(u'invalid user name'),
 
            'disabled_account': _(u'Your account is disabled')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            from rhodecode.lib.auth import authenticate
 

	
 
            password = value['password']
 
            username = value['username']
 

	
 
            if not authenticate(username, password):
 
                user = User.get_by_username(username)
 
                if user and user.active is False:
 
                    log.warning('user %s is disabled' % username)
 
                    msg = M(self, 'disabled_account', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(username=msg)
 
                    )
 
                else:
 
                    log.warning('user %s failed to authenticate' % username)
 
                    msg = M(self, 'invalid_username', state)
 
                    msg2 = M(self, 'invalid_password', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(username=msg, password=msg2)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidAuthToken():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_token': _(u'Token mismatch')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            if value != authentication_token():
 
                msg = M(self, 'invalid_token', state)
 
                raise formencode.Invalid(msg, value, state)
 
    return _validator
 

	
 

	
 
def ValidRepoName(edit=False, old_data={}):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_repo_name':
 
                _(u'Repository name %(repo)s is disallowed'),
 
            'repository_exists':
 
                _(u'Repository named %(repo)s already exists'),
 
            'repository_in_group_exists': _(u'Repository "%(repo)s" already '
 
                                            'exists in group "%(group)s"'),
 
            'same_group_exists': _(u'Repositories group with name "%(repo)s" '
 
                                   'already exists')
 
        }
 

	
 
        def _to_python(self, value, state):
 
            repo_name = repo_name_slug(value.get('repo_name', ''))
 
            repo_group = value.get('repo_group')
 
            if repo_group:
 
                gr = RepoGroup.get(repo_group)
 
                group_path = gr.full_path
 
                group_name = gr.group_name
 
                # value needs to be aware of group name in order to check
 
                # db key This is an actual just the name to store in the
 
                # database
 
                repo_name_full = group_path + RepoGroup.url_sep() + repo_name
 
            else:
 
                group_name = group_path = ''
 
                repo_name_full = repo_name
 

	
 
            value['repo_name'] = repo_name
 
            value['repo_name_full'] = repo_name_full
 
            value['group_path'] = group_path
 
            value['group_name'] = group_name
 
            return value
 

	
 
        def validate_python(self, value, state):
 

	
 
            repo_name = value.get('repo_name')
 
            repo_name_full = value.get('repo_name_full')
 
            group_path = value.get('group_path')
 
            group_name = value.get('group_name')
 

	
 
            if repo_name in [ADMIN_PREFIX, '']:
 
                msg = M(self, 'invalid_repo_name', state, repo=repo_name)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(repo_name=msg)
 
                )
 

	
 
            rename = old_data.get('repo_name') != repo_name_full
 
            create = not edit
 
            if rename or create:
 

	
 
                if group_path != '':
 
                    if Repository.get_by_repo_name(repo_name_full):
 
                        msg = M(self, 'repository_in_group_exists', state,
 
                                repo=repo_name, group=group_name)
 
                        raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(repo_name=msg)
 
                        )
 
                elif RepoGroup.get_by_group_name(repo_name_full):
 
                        msg = M(self, 'same_group_exists', state,
 
                                repo=repo_name)
 
                        raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(repo_name=msg)
 
                        )
 

	
 
                elif Repository.get_by_repo_name(repo_name_full):
 
                        msg = M(self, 'repository_exists', state,
 
                                repo=repo_name)
 
                        raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(repo_name=msg)
 
                        )
 
            return value
 
    return _validator
 

	
 

	
 
def ValidForkName(*args, **kwargs):
 
    return ValidRepoName(*args, **kwargs)
 

	
 

	
 
def SlugifyName():
 
    class _validator(formencode.validators.FancyValidator):
 

	
 
        def _to_python(self, value, state):
 
            return repo_name_slug(value)
 

	
 
        def validate_python(self, value, state):
 
            pass
 

	
 
    return _validator
 

	
 

	
 
def ValidCloneUri():
 
    from rhodecode.lib.utils import make_ui
 

	
 
    def url_handler(repo_type, url, ui=None):
 
        if repo_type == 'hg':
 
            from rhodecode.lib.vcs.backends.hg.repository import MercurialRepository
 
            from mercurial.httppeer import httppeer
 
            if url.startswith('http'):
 
                ## initially check if it's at least the proper URL
 
                ## or does it pass basic auth
 
                MercurialRepository._check_url(url)
 
                httppeer(ui, url)._capabilities()
 
            elif url.startswith('svn+http'):
 
                from hgsubversion.svnrepo import svnremoterepo
 
                svnremoterepo(ui, url).capabilities
 
            elif url.startswith('git+http'):
 
                raise NotImplementedError()
 

	
 
        elif repo_type == 'git':
 
            from rhodecode.lib.vcs.backends.git.repository import GitRepository
 
            if url.startswith('http'):
 
                ## initially check if it's at least the proper URL
 
                ## or does it pass basic auth
 
                GitRepository._check_url(url)
 
            elif url.startswith('svn+http'):
 
                raise NotImplementedError()
 
            elif url.startswith('hg+http'):
 
                raise NotImplementedError()
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'clone_uri': _(u'invalid clone url'),
 
            'invalid_clone_uri': _(u'Invalid clone url, provide a '
 
                                    'valid clone http(s)/svn+http(s) url')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            repo_type = value.get('repo_type')
 
            url = value.get('clone_uri')
 

	
 
            if not url:
 
                pass
 
            else:
 
                try:
 
                    url_handler(repo_type, url, make_ui('db', clear_session=False))
 
                except Exception:
 
                    log.exception('Url validation failed')
 
                    msg = M(self, 'clone_uri')
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(clone_uri=msg)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidForkType(old_data={}):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_fork_type': _(u'Fork have to be the same type as parent')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            if old_data['repo_type'] != value:
 
                msg = M(self, 'invalid_fork_type', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(repo_type=msg)
 
                )
 
    return _validator
 

	
 

	
 
def CanWriteGroup():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'permission_denied': _(u"You don't have permissions "
 
                                   "to create repository in this group"),
 
            'permission_denied_root': _(u"no permission to create repository "
 
                                        "in root location")
 
        }
 

	
 
        def _to_python(self, value, state):
 
            #root location
 
            if value in [-1, "-1"]:
 
                return None
 
            return value
 

	
 
        def validate_python(self, value, state):
 
            gr = RepoGroup.get(value)
 
            gr_name = gr.group_name if gr else None  # None means ROOT location
 
            val = HasReposGroupPermissionAny('group.write', 'group.admin')
 
            can_create_repos = HasPermissionAny('hg.admin', 'hg.create.repository')
 
            forbidden = not val(gr_name, 'can write into group validator')
 
            #parent group need to be existing
 
            if gr and forbidden:
 
                msg = M(self, 'permission_denied', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(repo_type=msg)
 
                )
 
            ## check if we can write to root location !
 
            elif gr is None and can_create_repos() is False:
 
                msg = M(self, 'permission_denied_root', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(repo_type=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def CanCreateGroup(can_create_in_root=False):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'permission_denied': _(u"You don't have permissions "
 
                                   "to create a group in this location")
 
        }
 

	
 
        def to_python(self, value, state):
 
            #root location
 
            if value in [-1, "-1"]:
 
                return None
 
            return value
 

	
 
        def validate_python(self, value, state):
 
            gr = RepoGroup.get(value)
 
            gr_name = gr.group_name if gr else None  # None means ROOT location
 

	
 
            if can_create_in_root and gr is None:
 
                #we can create in root, we're fine no validations required
 
                return
 

	
 
            forbidden_in_root = gr is None and can_create_in_root is False
 
            val = HasReposGroupPermissionAny('group.admin')
 
            forbidden = not val(gr_name, 'can create group validator')
 
            if forbidden_in_root or forbidden:
 
                msg = M(self, 'permission_denied', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(group_parent_id=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidPerms(type_='repo'):
 
    if type_ == 'group':
 
        EMPTY_PERM = 'group.none'
 
    elif type_ == 'repo':
 
        EMPTY_PERM = 'repository.none'
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'perm_new_member_name':
 
                _(u'This username or users group name is not valid')
 
                _(u'This username or user group name is not valid')
 
        }
 

	
 
        def to_python(self, value, state):
 
            perms_update = OrderedSet()
 
            perms_new = OrderedSet()
 
            # build a list of permission to update and new permission to create
 

	
 
            #CLEAN OUT ORG VALUE FROM NEW MEMBERS, and group them using
 
            new_perms_group = defaultdict(dict)
 
            for k, v in value.copy().iteritems():
 
                if k.startswith('perm_new_member'):
 
                    del value[k]
 
                    _type, part = k.split('perm_new_member_')
 
                    args = part.split('_')
 
                    if len(args) == 1:
 
                        new_perms_group[args[0]]['perm'] = v
 
                    elif len(args) == 2:
 
                        _key, pos = args
 
                        new_perms_group[pos][_key] = v
 

	
 
            # fill new permissions in order of how they were added
 
            for k in sorted(map(int, new_perms_group.keys())):
 
                perm_dict = new_perms_group[str(k)]
 
                new_member = perm_dict.get('name')
 
                new_perm = perm_dict.get('perm')
 
                new_type = perm_dict.get('type')
 
                if new_member and new_perm and new_type:
 
                    perms_new.add((new_member, new_perm, new_type))
 

	
 
            for k, v in value.iteritems():
 
                if k.startswith('u_perm_') or k.startswith('g_perm_'):
 
                    member = k[7:]
 
                    t = {'u': 'user',
 
                         'g': 'users_group'
 
                    }[k[0]]
 
                    if member == 'default':
 
                        if value.get('repo_private'):
 
                            # set none for default when updating to
 
                            # private repo
 
                            v = EMPTY_PERM
 
                    perms_update.add((member, v, t))
 
            #always set NONE when private flag is set
 
            if value.get('repo_private'):
 
                perms_update.add(('default', EMPTY_PERM, 'user'))
 

	
 
            value['perms_updates'] = list(perms_update)
 
            value['perms_new'] = list(perms_new)
 

	
 
            # update permissions
 
            for k, v, t in perms_new:
 
                try:
 
                    if t is 'user':
 
                        self.user_db = User.query()\
 
                            .filter(User.active == True)\
 
                            .filter(User.username == k).one()
 
                    if t is 'users_group':
 
                        self.user_db = UsersGroup.query()\
 
                            .filter(UsersGroup.users_group_active == True)\
 
                            .filter(UsersGroup.users_group_name == k).one()
 

	
 
                except Exception:
 
                    log.exception('Updated permission failed')
 
                    msg = M(self, 'perm_new_member_type', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(perm_new_member_name=msg)
 
                    )
 
            return value
 
    return _validator
 

	
 

	
 
def ValidSettings():
 
    class _validator(formencode.validators.FancyValidator):
 
        def _to_python(self, value, state):
 
            # settings  form for users that are not admin
 
            # can't edit certain parameters, it's extra backup if they mangle
 
            # with forms
 

	
 
            forbidden_params = [
 
                'user', 'repo_type', 'repo_enable_locking',
 
                'repo_enable_downloads', 'repo_enable_statistics'
 
            ]
 

	
 
            for param in forbidden_params:
 
                if param in value:
 
                    del value[param]
 
            return value
 

	
 
        def validate_python(self, value, state):
 
            pass
 
    return _validator
 

	
 

	
 
def ValidPath():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_path': _(u'This is not a valid path')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            if not os.path.isdir(value):
 
                msg = M(self, 'invalid_path', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(paths_root_path=msg)
 
                )
 
    return _validator
 

	
 

	
 
def UniqSystemEmail(old_data={}):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'email_taken': _(u'This e-mail address is already taken')
 
        }
 

	
 
        def _to_python(self, value, state):
 
            return value.lower()
 

	
 
        def validate_python(self, value, state):
 
            if (old_data.get('email') or '').lower() != value:
 
                user = User.get_by_email(value, case_insensitive=True)
 
                if user:
 
                    msg = M(self, 'email_taken', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(email=msg)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidSystemEmail():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'non_existing_email': _(u'e-mail "%(email)s" does not exist.')
 
        }
 

	
 
        def _to_python(self, value, state):
 
            return value.lower()
 

	
 
        def validate_python(self, value, state):
 
            user = User.get_by_email(value, case_insensitive=True)
 
            if user is None:
 
                msg = M(self, 'non_existing_email', state, email=value)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(email=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def LdapLibValidator():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 

	
 
        }
 

	
 
        def validate_python(self, value, state):
 
            try:
 
                import ldap
 
                ldap  # pyflakes silence !
 
            except ImportError:
 
                raise LdapImportError()
 

	
 
    return _validator
 

	
 

	
 
def AttrLoginValidator():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_cn':
 
                  _(u'The LDAP Login attribute of the CN must be specified - '
 
                    'this is the name of the attribute that is equivalent '
 
                    'to "username"')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            if not value or not isinstance(value, (str, unicode)):
 
                msg = M(self, 'invalid_cn', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(ldap_attr_login=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def NotReviewedRevisions(repo_id):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'rev_already_reviewed':
 
                  _(u'Revisions %(revs)s are already part of pull request '
 
                    'or have set status')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            # check revisions if they are not reviewed, or a part of another
 
            # pull request
 
            statuses = ChangesetStatus.query()\
 
                .filter(ChangesetStatus.revision.in_(value))\
 
                .filter(ChangesetStatus.repo_id == repo_id)\
 
                .all()
 

	
 
            errors = []
 
            for cs in statuses:
 
                if cs.pull_request_id:
 
                    errors.append(['pull_req', cs.revision[:12]])
 
                elif cs.status:
 
                    errors.append(['status', cs.revision[:12]])
 

	
 
            if errors:
 
                revs = ','.join([x[1] for x in errors])
 
                msg = M(self, 'rev_already_reviewed', state, revs=revs)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(revisions=revs)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidIp():
 
    class _validator(CIDR):
 
        messages = dict(
 
            badFormat=_('Please enter a valid IPv4 or IpV6 address'),
 
            illegalBits=_('The network size (bits) must be within the range'
 
                ' of 0-32 (not %(bits)r)'))
 

	
 
        def to_python(self, value, state):
 
            v = super(_validator, self).to_python(value, state)
 
            v = v.strip()
 
            net = ipaddr.IPNetwork(address=v)
 
            if isinstance(net, ipaddr.IPv4Network):
 
                #if IPv4 doesn't end with a mask, add /32
 
                if '/' not in value:
 
                    v += '/32'
 
            if isinstance(net, ipaddr.IPv6Network):
 
                #if IPv6 doesn't end with a mask, add /128
 
                if '/' not in value:
 
                    v += '/128'
 
            return v
 

	
 
        def validate_python(self, value, state):
 
            try:
 
                addr = value.strip()
 
                #this raises an ValueError if address is not IpV4 or IpV6
 
                ipaddr.IPNetwork(address=addr)
 
            except ValueError:
 
                raise formencode.Invalid(self.message('badFormat', state),
 
                                         value, state)
 

	
 
    return _validator
 

	
 

	
 
def FieldKey():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = dict(
 
            badFormat=_('Key name can only consist of letters, '
 
                        'underscore, dash or numbers'),)
 

	
 
        def validate_python(self, value, state):
 
            if not re.match('[a-zA-Z0-9_-]+$', value):
 
                raise formencode.Invalid(self.message('badFormat', state),
 
                                         value, state)
 
    return _validator
rhodecode/templates/admin/repos_groups/repos_groups_show.html
Show inline comments
 
## -*- coding: utf-8 -*-
 
<%inherit file="/base/base.html"/>
 

	
 
<%def name="title()">
 
    ${_('Repository groups administration')} - ${c.rhodecode_name}
 
</%def>
 

	
 

	
 
<%def name="breadcrumbs_links()">
 
    ${h.link_to(_('Admin'),h.url('admin_home'))}
 
    &raquo;
 
    ${_('repositories groups')}
 
    ${_('repository groups')}
 
</%def>
 
<%def name="page_nav()">
 
    ${self.menu('admin')}
 
</%def>
 
<%def name="main()">
 
<div class="box">
 
    <!-- box / title -->
 
    <div class="title">
 
        ${self.breadcrumbs()}
 
        <ul class="links">
 
          <li>
 
            %if h.HasPermissionAny('hg.admin')():
 
             <span>${h.link_to(_(u'Add group'),h.url('new_repos_group'))}</span>
 
            %endif
 
          </li>
 
        </ul>
 
    </div>
 
    <!-- end box / title -->
 
    <div class="table">
 
           % if c.groups:
 
            <table class="table_disp">
 

	
 
                <thead>
 
                    <tr>
 
                        <th class="left"><a href="#">${_('Group name')}</a></th>
 
                        <th class="left"><a href="#">${_('Description')}</a></th>
 
                        <th class="left"><a href="#">${_('Number of toplevel repositories')}</a></th>
 
                        <th class="left" colspan="2">${_('action')}</th>
 
                    </tr>
 
                </thead>
 

	
 
                ## REPO GROUPS
 

	
 
                % for gr in c.groups:
 
                    <% gr_cn = gr.repositories.count() %>
 
                  <tr>
 
                      <td>
 
                          <div style="white-space: nowrap">
 
                          <img class="icon" alt="${_('Repository group')}" src="${h.url('/images/icons/database_link.png')}"/>
 
                          ${h.link_to(h.literal(' &raquo; '.join(map(h.safe_unicode,[g.name for g in gr.parents+[gr]]))), url('repos_group_home',group_name=gr.group_name))}
 
                          </div>
 
                      </td>
 
                      <td>${gr.group_description}</td>
 
                      <td><b>${gr_cn}</b></td>
 
                      <td>
 
                       <a href="${h.url('edit_repos_group',group_name=gr.group_name)}" title="${_('edit')}">
 
                         ${h.submit('edit_%s' % gr.group_name,_('edit'),class_="edit_icon action_button")}
 
                       </a>
 
                      </td>
 
                      <td>
 
                       ${h.form(url('repos_group', group_name=gr.group_name),method='delete')}
 
                         ${h.submit('remove_%s' % gr.name,_('delete'),class_="delete_icon action_button",onclick="return confirm('"+ungettext('Confirm to delete this group: %s with %s repository','Confirm to delete this group: %s with %s repositories',gr_cn) % (gr.name,gr_cn)+"');")}
 
                       ${h.end_form()}
 
                      </td>
 
                  </tr>
 
                % endfor
 

	
 
            </table>
 
            % else:
 
                ${_('There are no repository groups yet')}
 
            % endif
 

	
 
    </div>
 
</div>
 

	
 
</%def>
rhodecode/templates/admin/users_groups/users_group_edit.html
Show inline comments
 
## -*- coding: utf-8 -*-
 
<%inherit file="/base/base.html"/>
 

	
 
<%def name="title()">
 
    ${_('Edit users group')} ${c.users_group.users_group_name} - ${c.rhodecode_name}
 
    ${_('Edit user group')} ${c.users_group.users_group_name} - ${c.rhodecode_name}
 
</%def>
 

	
 
<%def name="breadcrumbs_links()">
 
    ${h.link_to(_('Admin'),h.url('admin_home'))}
 
    &raquo;
 
    ${h.link_to(_('UsersGroups'),h.url('users_groups'))}
 
    &raquo;
 
    ${_('edit')} "${c.users_group.users_group_name}"
 
</%def>
 

	
 
<%def name="page_nav()">
 
    ${self.menu('admin')}
 
</%def>
 

	
 
<%def name="main()">
 
<div class="box box-left">
 
    <!-- box / title -->
 
    <div class="title">
 
        ${self.breadcrumbs()}
 
    </div>
 
    <!-- end box / title -->
 
    ${h.form(url('users_group', id=c.users_group.users_group_id),method='put', id='edit_users_group')}
 
    <div class="form">
 
        <!-- fields -->
 
            <div class="fields">
 
                 <div class="field">
 
                    <div class="label">
 
                        <label for="users_group_name">${_('Group name')}:</label>
 
                    </div>
 
                    <div class="input">
 
                        ${h.text('users_group_name',class_='small')}
 
                    </div>
 
                 </div>
 

	
 
                 <div class="field">
 
                    <div class="label label-checkbox">
 
                        <label for="users_group_active">${_('Active')}:</label>
 
                    </div>
 
                    <div class="checkboxes">
 
                        ${h.checkbox('users_group_active',value=True)}
 
                    </div>
 
                 </div>
 
                <div class="field">
 
                    <div class="label">
 
                        <label for="users_group_active">${_('Members')}:</label>
 
                    </div>
 
                    <div class="select">
 
                        <table>
 
                                <tr>
 
                                    <td>
 
                                        <div>
 
                                            <div style="float:left">
 
                                                <div class="text" style="padding: 0px 0px 6px;">${_('Choosen group members')}</div>
 
                                                ${h.select('users_group_members',[x[0] for x in c.group_members],c.group_members,multiple=True,size=8,style="min-width:210px")}
 
                                               <div  id="remove_all_elements" style="cursor:pointer;text-align:center">
 
                                                   ${_('Remove all elements')}
 
                                                   <img alt="remove" style="vertical-align:text-bottom" src="${h.url('/images/icons/arrow_right.png')}"/>
 
                                               </div>
 
                                            </div>
 
                                            <div style="float:left;width:20px;padding-top:50px">
 
                                                <img alt="add" id="add_element"
 
                                                    style="padding:2px;cursor:pointer"
 
                                                    src="${h.url('/images/icons/arrow_left.png')}"/>
 
                                                <br />
 
                                                <img alt="remove" id="remove_element"
 
                                                    style="padding:2px;cursor:pointer"
 
                                                    src="${h.url('/images/icons/arrow_right.png')}"/>
 
                                            </div>
 
                                            <div style="float:left">
 
                                                 <div class="text" style="padding: 0px 0px 6px;">${_('Available members')}</div>
 
                                                 ${h.select('available_members',[],c.available_members,multiple=True,size=8,style="min-width:210px")}
 
                                                 <div id="add_all_elements" style="cursor:pointer;text-align:center">
 
                                                       <img alt="add" style="vertical-align:text-bottom" src="${h.url('/images/icons/arrow_left.png')}"/>
 
                                                        ${_('Add all elements')}
 
                                                 </div>
 
                                            </div>
 
                                        </div>
 
                                    </td>
 
                                </tr>
 
                        </table>
 
                    </div>
 

	
 
                </div>
 
                <div class="buttons">
 
                  ${h.submit('save',_('save'),class_="ui-btn large")}
 
                </div>
 
            </div>
 
    </div>
 
${h.end_form()}
 
</div>
 

	
 
<div class="box box-right">
 
    <!-- box / title -->
 
    <div class="title">
 
        <h5>${_('Permissions')}</h5>
 
    </div>
 
    ${h.form(url('users_group_perm', id=c.users_group.users_group_id), method='put')}
 
    <div class="form">
 
        <!-- fields -->
 
        <div class="fields">
 
             <div class="field">
 
                <div class="label label-checkbox">
 
                    <label for="inherit_permissions">${_('Inherit default permissions')}:</label>
 
                </div>
 
                <div class="checkboxes">
 
                    ${h.checkbox('inherit_default_permissions',value=True)}
 
                </div>
 
                <span class="help-block">${h.literal(_('Select to inherit permissions from %s settings. '
 
                                             'With this selected below options does not have any action') % h.link_to('default', url('edit_permission', id='default')))}</span>
 
             </div>
 
             <div id="inherit_overlay" style="${'opacity:0.3' if c.users_group.inherit_default_permissions else ''}" >
 
             <div class="field">
 
                <div class="label label-checkbox">
 
                    <label for="create_repo_perm">${_('Create repositories')}:</label>
 
                </div>
 
                <div class="checkboxes">
 
                    ${h.checkbox('create_repo_perm',value=True)}
 
                </div>
 
             </div>
 
             <div class="field">
 
                <div class="label label-checkbox">
 
                    <label for="fork_repo_perm">${_('Fork repositories')}:</label>
 
                </div>
 
                <div class="checkboxes">
 
                    ${h.checkbox('fork_repo_perm',value=True)}
 
                </div>
 
             </div>
 
             </div>
 
            <div class="buttons">
 
              ${h.submit('save',_('Save'),class_="ui-btn large")}
 
              ${h.reset('reset',_('Reset'),class_="ui-btn large")}
 
            </div>
 
        </div>
 
    </div>
 
    ${h.end_form()}
 
</div>
 

	
 
<div class="box box-right">
 
    <!-- box / title -->
 
    <div class="title">
 
        <h5>${_('Group members')}</h5>
 
    </div>
 

	
 
    <div class="group_members_wrap">
 
    % if c.group_members_obj:
 
      <ul class="group_members">
 
      %for user in c.group_members_obj:
 
        <li>
 
          <div class="group_member">
 
            <div class="gravatar"><img alt="gravatar" src="${h.gravatar_url(user.email,24)}"/> </div>
 
            <div>${h.link_to(user.username, h.url('edit_user',id=user.user_id))}</div>
 
            <div>${user.full_name}</div>
 
          </div>
 
        </li>
 
      %endfor
 
      </ul>
 
      %else:
 
        <span class="empty_data">${_('No members yet')}</span>
 
      %endif
 
    </div>
 
</div>
 

	
 
<div class="box box-left">
 
    <!-- box / title -->
 
    <div class="title">
 
        <h5>${_('Permissions defined for this group')}</h5>
 
    </div>
 
 ## permissions overview
 
    <div id="perms" class="table">
 
       %for section in sorted(c.users_group.permissions.keys()):
 
          <div class="perms_section_head">${section.replace("_"," ").capitalize()}</div>
 
          %if not c.users_group.permissions:
 
              <span class="empty_data">${_('No permissions set yet')}</span>
 
          %else:
 
          <div id='tbl_list_wrap_${section}' class="yui-skin-sam">
 
           <table id="tbl_list_repository">
 
            <thead>
 
                <tr>
 
                <th class="left">${_('Name')}</th>
 
                <th class="left">${_('Permission')}</th>
 
                <th class="left">${_('Edit Permission')}</th>
 
            </thead>
 
            <tbody>
 
            %for k in c.users_group.permissions[section]:
 
                 <%
 
                     section_perm = c.users_group.permissions[section].get(k)
 
                     _perm = section_perm.split('.')[-1]
 
                 %>
 
                <tr>
 
                    <td>
 
                        %if section == 'repositories':
 
                            <a href="${h.url('summary_home',repo_name=k)}">${k}</a>
 
                        %elif section == 'repositories_groups':
 
                            <a href="${h.url('repos_group_home',group_name=k)}">${k}</a>
 
                        %endif
 
                    </td>
 
                    <td>
 
                         <span class="perm_tag ${_perm}">${section_perm}</span>
 
                    </td>
 
                    <td>
 
                        %if section == 'repositories':
 
                            <a href="${h.url('edit_repo',repo_name=k,anchor='permissions_manage')}">${_('edit')}</a>
 
                        %elif section == 'repositories_groups':
 
                            <a href="${h.url('edit_repos_group',group_name=k,anchor='permissions_manage')}">${_('edit')}</a>
 
                        %else:
 
                            --
 
                        %endif
 
                    </td>
 
                </tr>
 
            %endfor
 
            </tbody>
 
           </table>
 
          </div>
 
          %endif
 
       %endfor
 
    </div>
 
</div>
 

	
 

	
 
<script type="text/javascript">
 
  MultiSelectWidget('users_group_members','available_members','edit_users_group');
 
</script>
 
</%def>
rhodecode/tests/api/api_base.py
Show inline comments
 
@@ -272,1023 +272,1023 @@ class BaseTestApi(object):
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_rescan_repos(self):
 
        id_, params = _build_data(self.apikey, 'rescan_repos')
 
        response = api_call(self, params)
 

	
 
        expected = {'added': [], 'removed': []}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(ScmModel, 'repo_scan', crash)
 
    def test_api_rescann_error(self):
 
        id_, params = _build_data(self.apikey, 'rescan_repos',)
 
        response = api_call(self, params)
 

	
 
        expected = 'Error occurred during rescan repositories action'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_invalidate_cache(self):
 
        id_, params = _build_data(self.apikey, 'invalidate_cache',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = ("Cache for repository `%s` was invalidated: "
 
                    "invalidated cache keys: %s" % (self.REPO,
 
                                                    [unicode(self.REPO)]))
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(ScmModel, 'mark_for_invalidation', crash)
 
    def test_api_invalidate_cache_error(self):
 
        id_, params = _build_data(self.apikey, 'invalidate_cache',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = 'Error occurred during cache invalidation action'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_aquire(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  repoid=self.REPO,
 
                                  locked=True)
 
        response = api_call(self, params)
 
        expected = ('User `%s` set lock state for repo `%s` to `%s`'
 
                   % (TEST_USER_ADMIN_LOGIN, self.REPO, True))
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_aquire_by_non_admin(self):
 
        repo_name = 'api_delete_me'
 
        create_repo(repo_name, self.REPO_TYPE, owner=self.TEST_USER_LOGIN)
 
        try:
 
            id_, params = _build_data(self.apikey_regular, 'lock',
 
                                      repoid=repo_name,
 
                                      locked=True)
 
            response = api_call(self, params)
 
            expected = ('User `%s` set lock state for repo `%s` to `%s`'
 
                       % (self.TEST_USER_LOGIN, repo_name, True))
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            destroy_repo(repo_name)
 

	
 
    def test_api_lock_repo_lock_aquire_non_admin_with_userid(self):
 
        repo_name = 'api_delete_me'
 
        create_repo(repo_name, self.REPO_TYPE, owner=self.TEST_USER_LOGIN)
 
        try:
 
            id_, params = _build_data(self.apikey_regular, 'lock',
 
                                      userid=TEST_USER_ADMIN_LOGIN,
 
                                      repoid=repo_name,
 
                                      locked=True)
 
            response = api_call(self, params)
 
            expected = 'userid is not the same as your user'
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            destroy_repo(repo_name)
 

	
 
    def test_api_lock_repo_lock_aquire_non_admin_not_his_repo(self):
 
        id_, params = _build_data(self.apikey_regular, 'lock',
 
                                  repoid=self.REPO,
 
                                  locked=True)
 
        response = api_call(self, params)
 
        expected = 'repository `%s` does not exist' % (self.REPO)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_release(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  repoid=self.REPO,
 
                                  locked=False)
 
        response = api_call(self, params)
 
        expected = ('User `%s` set lock state for repo `%s` to `%s`'
 
                   % (TEST_USER_ADMIN_LOGIN, self.REPO, False))
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_aquire_optional_userid(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  repoid=self.REPO,
 
                                  locked=True)
 
        response = api_call(self, params)
 
        expected = ('User `%s` set lock state for repo `%s` to `%s`'
 
                   % (TEST_USER_ADMIN_LOGIN, self.REPO, True))
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(Repository, 'lock', crash)
 
    def test_api_lock_error(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  repoid=self.REPO,
 
                                  locked=True)
 
        response = api_call(self, params)
 

	
 
        expected = 'Error occurred locking repository `%s`' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_existing_user(self):
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=TEST_USER_ADMIN_LOGIN,
 
                                  email='test@foo.com',
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "user `%s` already exist" % TEST_USER_ADMIN_LOGIN
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_user_with_existing_email(self):
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=TEST_USER_ADMIN_LOGIN + 'new',
 
                                  email=TEST_USER_REGULAR_EMAIL,
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "email `%s` already exist" % TEST_USER_REGULAR_EMAIL
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_user(self):
 
        username = 'test_new_api_user'
 
        email = username + "@foo.com"
 

	
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=username,
 
                                  email=email,
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        usr = UserModel().get_by_username(username)
 
        ret = dict(
 
            msg='created new user `%s`' % username,
 
            user=jsonify(usr.get_api_data())
 
        )
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        UserModel().delete(usr.user_id)
 
        Session().commit()
 

	
 
    @mock.patch.object(UserModel, 'create_or_update', crash)
 
    def test_api_create_user_when_exception_happened(self):
 

	
 
        username = 'test_new_api_user'
 
        email = username + "@foo.com"
 

	
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=username,
 
                                  email=email,
 
                                  password='trololo')
 
        response = api_call(self, params)
 
        expected = 'failed to create user `%s`' % username
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_delete_user(self):
 
        usr = UserModel().create_or_update(username=u'test_user',
 
                                           password=u'qweqwe',
 
                                           email=u'u232@rhodecode.org',
 
                                           firstname=u'u1', lastname=u'u1')
 
        Session().commit()
 
        username = usr.username
 
        email = usr.email
 
        usr_id = usr.user_id
 
        ## DELETE THIS USER NOW
 

	
 
        id_, params = _build_data(self.apikey, 'delete_user',
 
                                  userid=username,)
 
        response = api_call(self, params)
 

	
 
        ret = {'msg': 'deleted user ID:%s %s' % (usr_id, username),
 
               'user': None}
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserModel, 'delete', crash)
 
    def test_api_delete_user_when_exception_happened(self):
 
        usr = UserModel().create_or_update(username=u'test_user',
 
                                           password=u'qweqwe',
 
                                           email=u'u232@rhodecode.org',
 
                                           firstname=u'u1', lastname=u'u1')
 
        Session().commit()
 
        username = usr.username
 

	
 
        id_, params = _build_data(self.apikey, 'delete_user',
 
                                  userid=username,)
 
        response = api_call(self, params)
 
        ret = 'failed to delete ID:%s %s' % (usr.user_id,
 
                                             usr.username)
 
        expected = ret
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([('firstname', 'new_username'),
 
                           ('lastname', 'new_username'),
 
                           ('email', 'new_username'),
 
                           ('admin', True),
 
                           ('admin', False),
 
                           ('ldap_dn', 'test'),
 
                           ('ldap_dn', None),
 
                           ('active', False),
 
                           ('active', True),
 
                           ('password', 'newpass')
 
                           ])
 
    def test_api_update_user(self, name, expected):
 
        usr = UserModel().get_by_username(self.TEST_USER_LOGIN)
 
        kw = {name: expected,
 
              'userid': usr.user_id}
 
        id_, params = _build_data(self.apikey, 'update_user', **kw)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
        'msg': 'updated user ID:%s %s' % (usr.user_id, self.TEST_USER_LOGIN),
 
        'user': jsonify(UserModel()\
 
                            .get_by_username(self.TEST_USER_LOGIN)\
 
                            .get_api_data())
 
        }
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_update_user_no_changed_params(self):
 
        usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = jsonify(usr.get_api_data())
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 

	
 
        response = api_call(self, params)
 
        ret = {
 
        'msg': 'updated user ID:%s %s' % (usr.user_id, TEST_USER_ADMIN_LOGIN),
 
        'user': ret
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_update_user_by_user_id(self):
 
        usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = jsonify(usr.get_api_data())
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=usr.user_id)
 

	
 
        response = api_call(self, params)
 
        ret = {
 
        'msg': 'updated user ID:%s %s' % (usr.user_id, TEST_USER_ADMIN_LOGIN),
 
        'user': ret
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserModel, 'update_user', crash)
 
    def test_api_update_user_when_exception_happens(self):
 
        usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = jsonify(usr.get_api_data())
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=usr.user_id)
 

	
 
        response = api_call(self, params)
 
        ret = 'failed to update user `%s`' % usr.user_id
 

	
 
        expected = ret
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo(self):
 
        new_group = 'some_new_group'
 
        make_users_group(new_group)
 
        RepoModel().grant_users_group_permission(repo=self.REPO,
 
                                                 group_name=new_group,
 
                                                 perm='repository.read')
 
        Session().commit()
 
        id_, params = _build_data(self.apikey, 'get_repo',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        ret = repo.get_api_data()
 

	
 
        members = []
 
        followers = []
 
        for user in repo.repo_to_perm:
 
            perm = user.permission.permission_name
 
            user = user.user
 
            user_data = user.get_api_data()
 
            user_data['type'] = "user"
 
            user_data['permission'] = perm
 
            members.append(user_data)
 

	
 
        for users_group in repo.users_group_to_perm:
 
            perm = users_group.permission.permission_name
 
            users_group = users_group.users_group
 
            users_group_data = users_group.get_api_data()
 
            users_group_data['type'] = "users_group"
 
            users_group_data['permission'] = perm
 
            members.append(users_group_data)
 

	
 
        for user in repo.followers:
 
            followers.append(user.user.get_api_data())
 

	
 
        ret['members'] = members
 
        ret['followers'] = followers
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        destroy_users_group(new_group)
 

	
 
    def test_api_get_repo_by_non_admin(self):
 
        id_, params = _build_data(self.apikey, 'get_repo',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        ret = repo.get_api_data()
 

	
 
        members = []
 
        followers = []
 
        for user in repo.repo_to_perm:
 
            perm = user.permission.permission_name
 
            user = user.user
 
            user_data = user.get_api_data()
 
            user_data['type'] = "user"
 
            user_data['permission'] = perm
 
            members.append(user_data)
 

	
 
        for users_group in repo.users_group_to_perm:
 
            perm = users_group.permission.permission_name
 
            users_group = users_group.users_group
 
            users_group_data = users_group.get_api_data()
 
            users_group_data['type'] = "users_group"
 
            users_group_data['permission'] = perm
 
            members.append(users_group_data)
 

	
 
        for user in repo.followers:
 
            followers.append(user.user.get_api_data())
 

	
 
        ret['members'] = members
 
        ret['followers'] = followers
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_by_non_admin_no_permission_to_repo(self):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.none')
 

	
 
        id_, params = _build_data(self.apikey_regular, 'get_repo',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = 'repository `%s` does not exist' % (self.REPO)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_that_doesn_not_exist(self):
 
        id_, params = _build_data(self.apikey, 'get_repo',
 
                                  repoid='no-such-repo')
 
        response = api_call(self, params)
 

	
 
        ret = 'repository `%s` does not exist' % 'no-such-repo'
 
        expected = ret
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repos(self):
 
        id_, params = _build_data(self.apikey, 'get_repos')
 
        response = api_call(self, params)
 

	
 
        result = []
 
        for repo in RepoModel().get_all():
 
            result.append(repo.get_api_data())
 
        ret = jsonify(result)
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_repos_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_repos')
 
        response = api_call(self, params)
 

	
 
        result = []
 
        for repo in RepoModel().get_all_user_repos(self.TEST_USER_LOGIN):
 
            result.append(repo.get_api_data())
 
        ret = jsonify(result)
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([('all', 'all'),
 
                           ('dirs', 'dirs'),
 
                           ('files', 'files'), ])
 
    def test_api_get_repo_nodes(self, name, ret_type):
 
        rev = 'tip'
 
        path = '/'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        # we don't the actual return types here since it's tested somewhere
 
        # else
 
        expected = json.loads(response.body)['result']
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_revisions(self):
 
        rev = 'i-dont-exist'
 
        path = '/'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to get repo: `%s` nodes' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_path(self):
 
        rev = 'tip'
 
        path = '/idontexits'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to get repo: `%s` nodes' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_ret_type(self):
 
        rev = 'tip'
 
        path = '/'
 
        ret_type = 'error'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        expected = 'ret_type must be one of %s' % (['files', 'dirs', 'all'])
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_repo(self):
 
        repo_name = 'api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                    repo_name=repo_name,
 
                                    owner=TEST_USER_ADMIN_LOGIN,
 
                                    repo_type='hg',
 
                                  )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'repo': jsonify(repo.get_api_data())
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_unknown_owner(self):
 
        repo_name = 'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                    repo_name=repo_name,
 
                                    owner=owner,
 
                                    repo_type='hg',
 
                                  )
 
        response = api_call(self, params)
 
        expected = 'user `%s` does not exist' % owner
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_repo_dont_specify_owner(self):
 
        repo_name = 'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                    repo_name=repo_name,
 
                                    repo_type='hg',
 
                                  )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'repo': jsonify(repo.get_api_data())
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_by_non_admin(self):
 
        repo_name = 'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey_regular, 'create_repo',
 
                                    repo_name=repo_name,
 
                                    repo_type='hg',
 
                                  )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'repo': jsonify(repo.get_api_data())
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_by_non_admin_specify_owner(self):
 
        repo_name = 'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey_regular, 'create_repo',
 
                                    repo_name=repo_name,
 
                                    repo_type='hg',
 
                                    owner=owner
 
                                  )
 
        response = api_call(self, params)
 

	
 
        expected = 'Only RhodeCode admin can specify `owner` param'
 
        self._compare_error(id_, expected, given=response.body)
 
        destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_exists(self):
 
        repo_name = self.REPO
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                    repo_name=repo_name,
 
                                    owner=TEST_USER_ADMIN_LOGIN,
 
                                    repo_type='hg',
 
                                  )
 
        response = api_call(self, params)
 
        expected = "repo `%s` already exist" % repo_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'create_repo', crash)
 
    def test_api_create_repo_exception_occurred(self):
 
        repo_name = 'api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                    repo_name=repo_name,
 
                                    owner=TEST_USER_ADMIN_LOGIN,
 
                                    repo_type='hg',
 
                                  )
 
        response = api_call(self, params)
 
        expected = 'failed to create repository `%s`' % repo_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_delete_repo(self):
 
        repo_name = 'api_delete_me'
 
        create_repo(repo_name, self.REPO_TYPE)
 

	
 
        id_, params = _build_data(self.apikey, 'delete_repo',
 
                                  repoid=repo_name,)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Deleted repository `%s`' % repo_name,
 
            'success': True
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_delete_repo_by_non_admin(self):
 
        repo_name = 'api_delete_me'
 
        create_repo(repo_name, self.REPO_TYPE, owner=self.TEST_USER_LOGIN)
 
        try:
 
            id_, params = _build_data(self.apikey_regular, 'delete_repo',
 
                                      repoid=repo_name,)
 
            response = api_call(self, params)
 

	
 
            ret = {
 
                'msg': 'Deleted repository `%s`' % repo_name,
 
                'success': True
 
            }
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo_by_non_admin_no_permission(self):
 
        repo_name = 'api_delete_me'
 
        create_repo(repo_name, self.REPO_TYPE)
 
        try:
 
            id_, params = _build_data(self.apikey_regular, 'delete_repo',
 
                                      repoid=repo_name,)
 
            response = api_call(self, params)
 
            expected = 'repository `%s` does not exist' % (repo_name)
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo_exception_occurred(self):
 
        repo_name = 'api_delete_me'
 
        create_repo(repo_name, self.REPO_TYPE)
 
        try:
 
            with mock.patch.object(RepoModel, 'delete', crash):
 
                id_, params = _build_data(self.apikey, 'delete_repo',
 
                                          repoid=repo_name,)
 
                response = api_call(self, params)
 

	
 
                expected = 'failed to delete repository `%s`' % repo_name
 
                self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            destroy_repo(repo_name)
 

	
 
    def test_api_fork_repo(self):
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                    repoid=self.REPO,
 
                                    fork_name=fork_name,
 
                                    owner=TEST_USER_ADMIN_LOGIN,
 
                                  )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
 
                                                     fork_name),
 
            'success': True
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_non_admin(self):
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                    repoid=self.REPO,
 
                                    fork_name=fork_name,
 
                                  )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
 
                                                     fork_name),
 
            'success': True
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_non_admin_specify_owner(self):
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                    repoid=self.REPO,
 
                                    fork_name=fork_name,
 
                                    owner=TEST_USER_ADMIN_LOGIN,
 
                                  )
 
        response = api_call(self, params)
 
        expected = 'Only RhodeCode admin can specify `owner` param'
 
        self._compare_error(id_, expected, given=response.body)
 
        destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_non_admin_no_permission_to_fork(self):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.none')
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                    repoid=self.REPO,
 
                                    fork_name=fork_name,
 
                                  )
 
        response = api_call(self, params)
 
        expected = 'repository `%s` does not exist' % (self.REPO)
 
        self._compare_error(id_, expected, given=response.body)
 
        destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_unknown_owner(self):
 
        fork_name = 'api-repo-fork'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                    repoid=self.REPO,
 
                                    fork_name=fork_name,
 
                                    owner=owner,
 
                                  )
 
        response = api_call(self, params)
 
        expected = 'user `%s` does not exist' % owner
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_fork_repo_fork_exists(self):
 
        fork_name = 'api-repo-fork'
 
        create_fork(fork_name, self.REPO_TYPE, self.REPO)
 

	
 
        try:
 
            fork_name = 'api-repo-fork'
 

	
 
            id_, params = _build_data(self.apikey, 'fork_repo',
 
                                        repoid=self.REPO,
 
                                        fork_name=fork_name,
 
                                        owner=TEST_USER_ADMIN_LOGIN,
 
                                      )
 
            response = api_call(self, params)
 

	
 
            expected = "fork `%s` already exist" % fork_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_repo_exists(self):
 
        fork_name = self.REPO
 

	
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                    repoid=self.REPO,
 
                                    fork_name=fork_name,
 
                                    owner=TEST_USER_ADMIN_LOGIN,
 
                                  )
 
        response = api_call(self, params)
 

	
 
        expected = "repo `%s` already exist" % fork_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'create_fork', crash)
 
    def test_api_fork_repo_exception_occurred(self):
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                    repoid=self.REPO,
 
                                    fork_name=fork_name,
 
                                    owner=TEST_USER_ADMIN_LOGIN,
 
                                  )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to fork repository `%s` as `%s`' % (self.REPO,
 
                                                               fork_name)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_users_group(self):
 
        id_, params = _build_data(self.apikey, 'get_users_group',
 
                                  usersgroupid=TEST_USERS_GROUP)
 
        response = api_call(self, params)
 

	
 
        users_group = UsersGroupModel().get_group(TEST_USERS_GROUP)
 
        members = []
 
        for user in users_group.members:
 
            user = user.user
 
            members.append(user.get_api_data())
 

	
 
        ret = users_group.get_api_data()
 
        ret['members'] = members
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_users_groups(self):
 

	
 
        make_users_group('test_users_group2')
 

	
 
        id_, params = _build_data(self.apikey, 'get_users_groups',)
 
        response = api_call(self, params)
 

	
 
        expected = []
 
        for gr_name in [TEST_USERS_GROUP, 'test_users_group2']:
 
            users_group = UsersGroupModel().get_group(gr_name)
 
            ret = users_group.get_api_data()
 
            expected.append(ret)
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        UsersGroupModel().delete(users_group='test_users_group2')
 
        Session().commit()
 

	
 
    def test_api_create_users_group(self):
 
        group_name = 'some_new_group'
 
        id_, params = _build_data(self.apikey, 'create_users_group',
 
                                  group_name=group_name)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'created new users group `%s`' % group_name,
 
            'msg': 'created new user group `%s`' % group_name,
 
            'users_group': jsonify(UsersGroupModel()\
 
                                   .get_by_name(group_name)\
 
                                   .get_api_data())
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        destroy_users_group(group_name)
 

	
 
    def test_api_get_users_group_that_exist(self):
 
        id_, params = _build_data(self.apikey, 'create_users_group',
 
                                  group_name=TEST_USERS_GROUP)
 
        response = api_call(self, params)
 

	
 
        expected = "users group `%s` already exist" % TEST_USERS_GROUP
 
        expected = "user group `%s` already exist" % TEST_USERS_GROUP
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UsersGroupModel, 'create', crash)
 
    def test_api_get_users_group_exception_occurred(self):
 
        group_name = 'exception_happens'
 
        id_, params = _build_data(self.apikey, 'create_users_group',
 
                                  group_name=group_name)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to create group `%s`' % group_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_add_user_to_users_group(self):
 
        gr_name = 'test_group'
 
        UsersGroupModel().create(gr_name)
 
        Session().commit()
 
        id_, params = _build_data(self.apikey, 'add_user_to_users_group',
 
                                  usersgroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = {
 
                    'msg': 'added member `%s` to users group `%s`' % (
 
                    'msg': 'added member `%s` to user group `%s`' % (
 
                                TEST_USER_ADMIN_LOGIN, gr_name
 
                            ),
 
                    'success': True}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        UsersGroupModel().delete(users_group=gr_name)
 
        Session().commit()
 

	
 
    def test_api_add_user_to_users_group_that_doesnt_exist(self):
 
        id_, params = _build_data(self.apikey, 'add_user_to_users_group',
 
                                  usersgroupid='false-group',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = 'users group `%s` does not exist' % 'false-group'
 
        expected = 'user group `%s` does not exist' % 'false-group'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UsersGroupModel, 'add_user_to_group', crash)
 
    def test_api_add_user_to_users_group_exception_occurred(self):
 
        gr_name = 'test_group'
 
        UsersGroupModel().create(gr_name)
 
        Session().commit()
 
        id_, params = _build_data(self.apikey, 'add_user_to_users_group',
 
                                  usersgroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to add member to users group `%s`' % gr_name
 
        expected = 'failed to add member to user group `%s`' % gr_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
        UsersGroupModel().delete(users_group=gr_name)
 
        Session().commit()
 

	
 
    def test_api_remove_user_from_users_group(self):
 
        gr_name = 'test_group_3'
 
        gr = UsersGroupModel().create(gr_name)
 
        UsersGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 
        id_, params = _build_data(self.apikey, 'remove_user_from_users_group',
 
                                  usersgroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = {
 
                    'msg': 'removed member `%s` from users group `%s`' % (
 
                    'msg': 'removed member `%s` from user group `%s`' % (
 
                                TEST_USER_ADMIN_LOGIN, gr_name
 
                            ),
 
                    'success': True}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        UsersGroupModel().delete(users_group=gr_name)
 
        Session().commit()
 

	
 
    @mock.patch.object(UsersGroupModel, 'remove_user_from_group', crash)
 
    def test_api_remove_user_from_users_group_exception_occurred(self):
 
        gr_name = 'test_group_3'
 
        gr = UsersGroupModel().create(gr_name)
 
        UsersGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 
        id_, params = _build_data(self.apikey, 'remove_user_from_users_group',
 
                                  usersgroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to remove member from users group `%s`' % gr_name
 
        expected = 'failed to remove member from user group `%s`' % gr_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
        UsersGroupModel().delete(users_group=gr_name)
 
        Session().commit()
 

	
 
    @parameterized.expand([('none', 'repository.none'),
 
                           ('read', 'repository.read'),
 
                           ('write', 'repository.write'),
 
                           ('admin', 'repository.admin')])
 
    def test_api_grant_user_permission(self, name, perm):
 
        id_, params = _build_data(self.apikey, 'grant_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
                'msg': 'Granted perm: `%s` for user: `%s` in repo: `%s`' % (
 
                    perm, TEST_USER_ADMIN_LOGIN, self.REPO
 
                ),
 
                'success': True
 
            }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_grant_user_permission_wrong_permission(self):
 
        perm = 'haha.no.permission'
 
        id_, params = _build_data(self.apikey, 'grant_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        expected = 'permission `%s` does not exist' % perm
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'grant_user_permission', crash)
 
    def test_api_grant_user_permission_exception_when_adding(self):
 
        perm = 'repository.read'
 
        id_, params = _build_data(self.apikey, 'grant_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
 
                    TEST_USER_ADMIN_LOGIN, self.REPO
 
                )
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_revoke_user_permission(self):
 
        id_, params = _build_data(self.apikey, 'revoke_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN,)
 
        response = api_call(self, params)
 

	
 
        expected = {
 
            'msg': 'Revoked perm for user: `%s` in repo: `%s`' % (
 
                TEST_USER_ADMIN_LOGIN, self.REPO
 
            ),
 
            'success': True
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'revoke_user_permission', crash)
 
    def test_api_revoke_user_permission_exception_when_adding(self):
 
        id_, params = _build_data(self.apikey, 'revoke_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN,)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
 
                    TEST_USER_ADMIN_LOGIN, self.REPO
 
                )
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([('none', 'repository.none'),
 
                           ('read', 'repository.read'),
 
                           ('write', 'repository.write'),
 
                           ('admin', 'repository.admin')])
 
    def test_api_grant_users_group_permission(self, name, perm):
 
        id_, params = _build_data(self.apikey, 'grant_users_group_permission',
 
                                  repoid=self.REPO,
 
                                  usersgroupid=TEST_USERS_GROUP,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Granted perm: `%s` for users group: `%s` in repo: `%s`' % (
 
            'msg': 'Granted perm: `%s` for user group: `%s` in repo: `%s`' % (
 
                perm, TEST_USERS_GROUP, self.REPO
 
            ),
 
            'success': True
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_grant_users_group_permission_wrong_permission(self):
 
        perm = 'haha.no.permission'
 
        id_, params = _build_data(self.apikey, 'grant_users_group_permission',
 
                                  repoid=self.REPO,
 
                                  usersgroupid=TEST_USERS_GROUP,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        expected = 'permission `%s` does not exist' % perm
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'grant_users_group_permission', crash)
 
    def test_api_grant_users_group_permission_exception_when_adding(self):
 
        perm = 'repository.read'
 
        id_, params = _build_data(self.apikey, 'grant_users_group_permission',
 
                                  repoid=self.REPO,
 
                                  usersgroupid=TEST_USERS_GROUP,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to edit permission for users group: `%s` in repo: `%s`' % (
 
        expected = 'failed to edit permission for user group: `%s` in repo: `%s`' % (
 
                    TEST_USERS_GROUP, self.REPO
 
                )
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_revoke_users_group_permission(self):
 
        RepoModel().grant_users_group_permission(repo=self.REPO,
 
                                                 group_name=TEST_USERS_GROUP,
 
                                                 perm='repository.read')
 
        Session().commit()
 
        id_, params = _build_data(self.apikey, 'revoke_users_group_permission',
 
                                  repoid=self.REPO,
 
                                  usersgroupid=TEST_USERS_GROUP,)
 
        response = api_call(self, params)
 

	
 
        expected = {
 
            'msg': 'Revoked perm for users group: `%s` in repo: `%s`' % (
 
            'msg': 'Revoked perm for user group: `%s` in repo: `%s`' % (
 
                TEST_USERS_GROUP, self.REPO
 
            ),
 
            'success': True
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'revoke_users_group_permission', crash)
 
    def test_api_revoke_users_group_permission_exception_when_adding(self):
 

	
 
        id_, params = _build_data(self.apikey, 'revoke_users_group_permission',
 
                                  repoid=self.REPO,
 
                                  usersgroupid=TEST_USERS_GROUP,)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to edit permission for users group: `%s` in repo: `%s`' % (
 
        expected = 'failed to edit permission for user group: `%s` in repo: `%s`' % (
 
                    TEST_USERS_GROUP, self.REPO
 
                )
 
        self._compare_error(id_, expected, given=response.body)
rhodecode/tests/functional/test_admin_users_groups.py
Show inline comments
 
from rhodecode.tests import *
 
from rhodecode.model.db import UsersGroup, UsersGroupToPerm, Permission
 

	
 
TEST_USERS_GROUP = 'admins_test'
 

	
 

	
 
class TestAdminUsersGroupsController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url('users_groups'))
 
        # Test response...
 

	
 
    def test_index_as_xml(self):
 
        response = self.app.get(url('formatted_users_groups', format='xml'))
 

	
 
    def test_create(self):
 
        self.log_user()
 
        users_group_name = TEST_USERS_GROUP
 
        response = self.app.post(url('users_groups'),
 
                                 {'users_group_name': users_group_name,
 
                                  'active':True})
 
        response.follow()
 

	
 
        self.checkSessionFlash(response,
 
                               'created users group %s' % TEST_USERS_GROUP)
 
                               'created user group %s' % TEST_USERS_GROUP)
 

	
 
    def test_new(self):
 
        response = self.app.get(url('new_users_group'))
 

	
 
    def test_new_as_xml(self):
 
        response = self.app.get(url('formatted_new_users_group', format='xml'))
 

	
 
    def test_update(self):
 
        response = self.app.put(url('users_group', id=1))
 

	
 
    def test_update_browser_fakeout(self):
 
        response = self.app.post(url('users_group', id=1),
 
                                 params=dict(_method='put'))
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        users_group_name = TEST_USERS_GROUP + 'another'
 
        response = self.app.post(url('users_groups'),
 
                                 {'users_group_name':users_group_name,
 
                                  'active':True})
 
        response.follow()
 

	
 
        self.checkSessionFlash(response,
 
                               'created users group %s' % users_group_name)
 
                               'created user group %s' % users_group_name)
 

	
 
        gr = self.Session.query(UsersGroup)\
 
                           .filter(UsersGroup.users_group_name ==
 
                                   users_group_name).one()
 

	
 
        response = self.app.delete(url('users_group', id=gr.users_group_id))
 

	
 
        gr = self.Session.query(UsersGroup)\
 
                           .filter(UsersGroup.users_group_name ==
 
                                   users_group_name).scalar()
 

	
 
        self.assertEqual(gr, None)
 

	
 
    def test_enable_repository_read_on_group(self):
 
        self.log_user()
 
        users_group_name = TEST_USERS_GROUP + 'another2'
 
        response = self.app.post(url('users_groups'),
 
                                 {'users_group_name': users_group_name,
 
                                  'active': True})
 
        response.follow()
 

	
 
        ug = UsersGroup.get_by_group_name(users_group_name)
 
        self.checkSessionFlash(response,
 
                               'created users group %s' % users_group_name)
 
                               'created user group %s' % users_group_name)
 
        ## ENABLE REPO CREATE ON A GROUP
 
        response = self.app.put(url('users_group_perm', id=ug.users_group_id),
 
                                 {'create_repo_perm': True})
 

	
 
        response.follow()
 
        ug = UsersGroup.get_by_group_name(users_group_name)
 
        p = Permission.get_by_key('hg.create.repository')
 
        p2 = Permission.get_by_key('hg.fork.none')
 
        # check if user has this perms, they should be here since
 
        # defaults are on
 
        perms = UsersGroupToPerm.query()\
 
            .filter(UsersGroupToPerm.users_group == ug).all()
 

	
 
        self.assertEqual(
 
            [[x.users_group_id, x.permission_id, ] for x in perms],
 
            [[ug.users_group_id, p.permission_id],
 
             [ug.users_group_id, p2.permission_id]]
 
        )
 

	
 
        ## DISABLE REPO CREATE ON A GROUP
 
        response = self.app.put(url('users_group_perm', id=ug.users_group_id),
 
                                    {})
 

	
 
        response.follow()
 
        ug = UsersGroup.get_by_group_name(users_group_name)
 
        p = Permission.get_by_key('hg.create.none')
 
        p2 = Permission.get_by_key('hg.fork.none')
 
        # check if user has this perms, they should be here since
 
        # defaults are on
 
        perms = UsersGroupToPerm.query()\
 
            .filter(UsersGroupToPerm.users_group == ug).all()
 

	
 
        self.assertEqual(
 
            sorted([[x.users_group_id, x.permission_id, ] for x in perms]),
 
            sorted([[ug.users_group_id, p.permission_id],
 
             [ug.users_group_id, p2.permission_id]])
 
        )
 

	
 
        # DELETE !
 
        ug = UsersGroup.get_by_group_name(users_group_name)
 
        ugid = ug.users_group_id
 
        response = self.app.delete(url('users_group', id=ug.users_group_id))
 
        response = response.follow()
 
        gr = self.Session.query(UsersGroup)\
 
                           .filter(UsersGroup.users_group_name ==
 
                                   users_group_name).scalar()
 

	
 
        self.assertEqual(gr, None)
 
        p = Permission.get_by_key('hg.create.repository')
 
        perms = UsersGroupToPerm.query()\
 
            .filter(UsersGroupToPerm.users_group_id == ugid).all()
 
        perms = [[x.users_group_id,
 
                  x.permission_id, ] for x in perms]
 
        self.assertEqual(
 
            perms,
 
            []
 
        )
 

	
 
    def test_enable_repository_fork_on_group(self):
 
        self.log_user()
 
        users_group_name = TEST_USERS_GROUP + 'another2'
 
        response = self.app.post(url('users_groups'),
 
                                 {'users_group_name': users_group_name,
 
                                  'active': True})
 
        response.follow()
 

	
 
        ug = UsersGroup.get_by_group_name(users_group_name)
 
        self.checkSessionFlash(response,
 
                               'created users group %s' % users_group_name)
 
                               'created user group %s' % users_group_name)
 
        ## ENABLE REPO CREATE ON A GROUP
 
        response = self.app.put(url('users_group_perm', id=ug.users_group_id),
 
                                 {'fork_repo_perm': True})
 

	
 
        response.follow()
 
        ug = UsersGroup.get_by_group_name(users_group_name)
 
        p = Permission.get_by_key('hg.create.none')
 
        p2 = Permission.get_by_key('hg.fork.repository')
 
        # check if user has this perms, they should be here since
 
        # defaults are on
 
        perms = UsersGroupToPerm.query()\
 
            .filter(UsersGroupToPerm.users_group == ug).all()
 

	
 
        self.assertEqual(
 
            [[x.users_group_id, x.permission_id, ] for x in perms],
 
            [[ug.users_group_id, p.permission_id],
 
             [ug.users_group_id, p2.permission_id]]
 
        )
 

	
 
        ## DISABLE REPO CREATE ON A GROUP
 
        response = self.app.put(url('users_group_perm', id=ug.users_group_id),
 
                                    {})
 

	
 
        response.follow()
 
        ug = UsersGroup.get_by_group_name(users_group_name)
 
        p = Permission.get_by_key('hg.create.none')
 
        p2 = Permission.get_by_key('hg.fork.none')
 
        # check if user has this perms, they should be here since
 
        # defaults are on
 
        perms = UsersGroupToPerm.query()\
 
            .filter(UsersGroupToPerm.users_group == ug).all()
 

	
 
        self.assertEqual(
 
            [[x.users_group_id, x.permission_id, ] for x in perms],
 
            [[ug.users_group_id, p.permission_id],
 
             [ug.users_group_id, p2.permission_id]]
 
        )
 

	
 
        # DELETE !
 
        ug = UsersGroup.get_by_group_name(users_group_name)
 
        ugid = ug.users_group_id
 
        response = self.app.delete(url('users_group', id=ug.users_group_id))
 
        response = response.follow()
 
        gr = self.Session.query(UsersGroup)\
 
                           .filter(UsersGroup.users_group_name ==
 
                                   users_group_name).scalar()
 

	
 
        self.assertEqual(gr, None)
 
        p = Permission.get_by_key('hg.fork.repository')
 
        perms = UsersGroupToPerm.query()\
 
            .filter(UsersGroupToPerm.users_group_id == ugid).all()
 
        perms = [[x.users_group_id,
 
                  x.permission_id, ] for x in perms]
 
        self.assertEqual(
 
            perms,
 
            []
 
        )
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('users_group', id=1),
 
                                 params=dict(_method='delete'))
 

	
 
    def test_show(self):
 
        response = self.app.get(url('users_group', id=1))
 

	
 
    def test_show_as_xml(self):
 
        response = self.app.get(url('formatted_users_group', id=1, format='xml'))
 

	
 
    def test_edit(self):
 
        response = self.app.get(url('edit_users_group', id=1))
 

	
 
    def test_edit_as_xml(self):
 
        response = self.app.get(url('formatted_edit_users_group', id=1, format='xml'))
 

	
 
    def test_assign_members(self):
 
        pass
 

	
 
    def test_add_create_permission(self):
 
        pass
 

	
 
    def test_revoke_members(self):
 
        pass
rhodecode/tests/models/test_permissions.py
Show inline comments
 
import os
 
import unittest
 
from rhodecode.tests import *
 
from rhodecode.tests.models.common import _make_group
 
from rhodecode.model.repos_group import ReposGroupModel
 
from rhodecode.model.repo import RepoModel
 
from rhodecode.model.db import RepoGroup, User, UsersGroupRepoGroupToPerm
 
from rhodecode.model.user import UserModel
 

	
 
from rhodecode.model.meta import Session
 
from rhodecode.model.users_group import UsersGroupModel
 
from rhodecode.lib.auth import AuthUser
 
from rhodecode.tests.api.api_base import create_repo
 

	
 

	
 
class TestPermissions(unittest.TestCase):
 
    def __init__(self, methodName='runTest'):
 
        super(TestPermissions, self).__init__(methodName=methodName)
 

	
 
    def setUp(self):
 
        self.u1 = UserModel().create_or_update(
 
            username=u'u1', password=u'qweqwe',
 
            email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1'
 
        )
 
        self.u2 = UserModel().create_or_update(
 
            username=u'u2', password=u'qweqwe',
 
            email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2'
 
        )
 
        self.u3 = UserModel().create_or_update(
 
            username=u'u3', password=u'qweqwe',
 
            email=u'u3@rhodecode.org', firstname=u'u3', lastname=u'u3'
 
        )
 
        self.anon = User.get_by_username('default')
 
        self.a1 = UserModel().create_or_update(
 
            username=u'a1', password=u'qweqwe',
 
            email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True
 
        )
 
        Session().commit()
 

	
 
    def tearDown(self):
 
        if hasattr(self, 'test_repo'):
 
            RepoModel().delete(repo=self.test_repo)
 

	
 
        UserModel().delete(self.u1)
 
        UserModel().delete(self.u2)
 
        UserModel().delete(self.u3)
 
        UserModel().delete(self.a1)
 
        if hasattr(self, 'g1'):
 
            ReposGroupModel().delete(self.g1.group_id)
 
        if hasattr(self, 'g2'):
 
            ReposGroupModel().delete(self.g2.group_id)
 

	
 
        if hasattr(self, 'ug1'):
 
            UsersGroupModel().delete(self.ug1, force=True)
 

	
 
        Session().commit()
 

	
 
    def test_default_perms_set(self):
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set([u'hg.create.repository', u'repository.read',
 
                           u'hg.register.manual_activate']),
 
            'repositories': {u'vcs_test_hg': u'repository.read'}
 
        }
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         perms['repositories'][HG_REPO])
 
        new_perm = 'repository.write'
 
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
 
                                          perm=new_perm)
 
        Session().commit()
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         new_perm)
 

	
 
    def test_default_admin_perms_set(self):
 
        a1_auth = AuthUser(user_id=self.a1.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set([u'hg.admin']),
 
            'repositories': {u'vcs_test_hg': u'repository.admin'}
 
        }
 
        self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
 
                         perms['repositories'][HG_REPO])
 
        new_perm = 'repository.write'
 
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1,
 
                                          perm=new_perm)
 
        Session().commit()
 
        # cannot really downgrade admins permissions !? they still get's set as
 
        # admin !
 
        u1_auth = AuthUser(user_id=self.a1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         perms['repositories'][HG_REPO])
 

	
 
    def test_default_group_perms(self):
 
        self.g1 = _make_group('test1', skip_if_exists=True)
 
        self.g2 = _make_group('test2', skip_if_exists=True)
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        perms = {
 
            'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'},
 
            'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']),
 
            'repositories': {u'vcs_test_hg': u'repository.read'}
 
        }
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         perms['repositories'][HG_REPO])
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         perms['repositories_groups'])
 

	
 
    def test_default_admin_group_perms(self):
 
        self.g1 = _make_group('test1', skip_if_exists=True)
 
        self.g2 = _make_group('test2', skip_if_exists=True)
 
        a1_auth = AuthUser(user_id=self.a1.user_id)
 
        perms = {
 
            'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'},
 
            'global': set(['hg.admin']),
 
            'repositories': {u'vcs_test_hg': 'repository.admin'}
 
        }
 

	
 
        self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
 
                         perms['repositories'][HG_REPO])
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                         perms['repositories_groups'])
 

	
 
    def test_propagated_permission_from_users_group_by_explicit_perms_exist(self):
 
        # make group
 
        self.ug1 = UsersGroupModel().create('G1')
 
        # add user to group
 

	
 
        UsersGroupModel().add_user_to_group(self.ug1, self.u1)
 

	
 
        # set permission to lower
 
        new_perm = 'repository.none'
 
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         new_perm)
 

	
 
        # grant perm for group this should not override permission from user
 
        # since it has explicitly set
 
        new_perm_gr = 'repository.write'
 
        RepoModel().grant_users_group_permission(repo=HG_REPO,
 
                                                 group_name=self.ug1,
 
                                                 perm=new_perm_gr)
 
        # check perms
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set([u'hg.create.repository', u'repository.read',
 
                           u'hg.register.manual_activate']),
 
            'repositories': {u'vcs_test_hg': u'repository.read'}
 
        }
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         new_perm)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         perms['repositories_groups'])
 

	
 
    def test_propagated_permission_from_users_group(self):
 
        # make group
 
        self.ug1 = UsersGroupModel().create('G1')
 
        # add user to group
 

	
 
        UsersGroupModel().add_user_to_group(self.ug1, self.u3)
 

	
 
        # grant perm for group this should override default permission from user
 
        new_perm_gr = 'repository.write'
 
        RepoModel().grant_users_group_permission(repo=HG_REPO,
 
                                                 group_name=self.ug1,
 
                                                 perm=new_perm_gr)
 
        # check perms
 
        u3_auth = AuthUser(user_id=self.u3.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set([u'hg.create.repository', u'repository.read',
 
                           u'hg.register.manual_activate']),
 
            'repositories': {u'vcs_test_hg': u'repository.read'}
 
        }
 
        self.assertEqual(u3_auth.permissions['repositories'][HG_REPO],
 
                         new_perm_gr)
 
        self.assertEqual(u3_auth.permissions['repositories_groups'],
 
                         perms['repositories_groups'])
 

	
 
    def test_propagated_permission_from_users_group_lower_weight(self):
 
        # make group
 
        self.ug1 = UsersGroupModel().create('G1')
 
        # add user to group
 
        UsersGroupModel().add_user_to_group(self.ug1, self.u1)
 

	
 
        # set permission to lower
 
        new_perm_h = 'repository.write'
 
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
 
                                          perm=new_perm_h)
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         new_perm_h)
 

	
 
        # grant perm for group this should NOT override permission from user
 
        # since it's lower than granted
 
        new_perm_l = 'repository.read'
 
        RepoModel().grant_users_group_permission(repo=HG_REPO,
 
                                                 group_name=self.ug1,
 
                                                 perm=new_perm_l)
 
        # check perms
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set([u'hg.create.repository', u'repository.read',
 
                           u'hg.register.manual_activate']),
 
            'repositories': {u'vcs_test_hg': u'repository.write'}
 
        }
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         new_perm_h)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         perms['repositories_groups'])
 

	
 
    def test_repo_in_group_permissions(self):
 
        self.g1 = _make_group('group1', skip_if_exists=True)
 
        self.g2 = _make_group('group2', skip_if_exists=True)
 
        Session().commit()
 
        # both perms should be read !
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.read', u'group2': u'group.read'})
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.read', u'group2': u'group.read'})
 

	
 
        #Change perms to none for both groups
 
        ReposGroupModel().grant_user_permission(repos_group=self.g1,
 
                                                user=self.anon,
 
                                                perm='group.none')
 
        ReposGroupModel().grant_user_permission(repos_group=self.g2,
 
                                                user=self.anon,
 
                                                perm='group.none')
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        # add repo to group
 
        name = RepoGroup.url_sep().join([self.g1.group_name, 'test_perm'])
 
        self.test_repo = RepoModel().create_repo(
 
            repo_name=name,
 
            repo_type='hg',
 
            description='',
 
            repos_group=self.g1,
 
            owner=self.u1,
 
        )
 
        Session().commit()
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        #grant permission for u2 !
 
        ReposGroupModel().grant_user_permission(repos_group=self.g1,
 
                                                user=self.u2,
 
                                                perm='group.read')
 
        ReposGroupModel().grant_user_permission(repos_group=self.g2,
 
                                                user=self.u2,
 
                                                perm='group.read')
 
        Session().commit()
 
        self.assertNotEqual(self.u1, self.u2)
 
        #u1 and anon should have not change perms while u2 should !
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        u2_auth = AuthUser(user_id=self.u2.user_id)
 
        self.assertEqual(u2_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.read', u'group2': u'group.read'})
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
    def test_repo_group_user_as_user_group_member(self):
 
        # create Group1
 
        self.g1 = _make_group('group1', skip_if_exists=True)
 
        Session().commit()
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 

	
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.read'})
 

	
 
        # set default permission to none
 
        ReposGroupModel().grant_user_permission(repos_group=self.g1,
 
                                                user=self.anon,
 
                                                perm='group.none')
 
        # make group
 
        self.ug1 = UsersGroupModel().create('G1')
 
        # add user to group
 
        UsersGroupModel().add_user_to_group(self.ug1, self.u1)
 
        Session().commit()
 

	
 
        # check if user is in the group
 
        membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members]
 
        self.assertEqual(membrs, [self.u1.user_id])
 
        # add some user to that group
 

	
 
        # check his permissions
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.none'})
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.none'})
 

	
 
        # grant ug1 read permissions for
 
        ReposGroupModel().grant_users_group_permission(repos_group=self.g1,
 
                                                       group_name=self.ug1,
 
                                                       perm='group.read')
 
        Session().commit()
 
        # check if the
 
        obj = Session().query(UsersGroupRepoGroupToPerm)\
 
            .filter(UsersGroupRepoGroupToPerm.group == self.g1)\
 
            .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\
 
            .scalar()
 
        self.assertEqual(obj.permission.permission_name, 'group.read')
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 

	
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.none'})
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.read'})
 

	
 
    def test_inherited_permissions_from_default_on_user_enabled(self):
 
        user_model = UserModel()
 
        # enable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.none')
 
        user_model.grant_perm(usr, 'hg.create.repository')
 
        user_model.revoke_perm(usr, 'hg.fork.none')
 
        user_model.grant_perm(usr, 'hg.fork.repository')
 
        # make sure inherit flag is turned on
 
        self.u1.inherit_default_permissions = True
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        # this user will have inherited permissions from default user
 
        self.assertEqual(u1_auth.permissions['global'],
 
                         set(['hg.create.repository', 'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'repository.read', 'group.read']))
 

	
 
    def test_inherited_permissions_from_default_on_user_disabled(self):
 
        user_model = UserModel()
 
        # disable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.repository')
 
        user_model.grant_perm(usr, 'hg.create.none')
 
        user_model.revoke_perm(usr, 'hg.fork.repository')
 
        user_model.grant_perm(usr, 'hg.fork.none')
 
        # make sure inherit flag is turned on
 
        self.u1.inherit_default_permissions = True
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        # this user will have inherited permissions from default user
 
        self.assertEqual(u1_auth.permissions['global'],
 
                         set(['hg.create.none', 'hg.fork.none',
 
                              'hg.register.manual_activate',
 
                              'repository.read', 'group.read']))
 

	
 
    def test_non_inherited_permissions_from_default_on_user_enabled(self):
 
        user_model = UserModel()
 
        # enable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.none')
 
        user_model.grant_perm(usr, 'hg.create.repository')
 
        user_model.revoke_perm(usr, 'hg.fork.none')
 
        user_model.grant_perm(usr, 'hg.fork.repository')
 

	
 
        #disable global perms on specific user
 
        user_model.revoke_perm(self.u1, 'hg.create.repository')
 
        user_model.grant_perm(self.u1, 'hg.create.none')
 
        user_model.revoke_perm(self.u1, 'hg.fork.repository')
 
        user_model.grant_perm(self.u1, 'hg.fork.none')
 

	
 
        # make sure inherit flag is turned off
 
        self.u1.inherit_default_permissions = False
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        # this user will have non inherited permissions from he's
 
        # explicitly set permissions
 
        self.assertEqual(u1_auth.permissions['global'],
 
                         set(['hg.create.none', 'hg.fork.none',
 
                              'hg.register.manual_activate',
 
                              'repository.read', 'group.read']))
 

	
 
    def test_non_inherited_permissions_from_default_on_user_disabled(self):
 
        user_model = UserModel()
 
        # disable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.repository')
 
        user_model.grant_perm(usr, 'hg.create.none')
 
        user_model.revoke_perm(usr, 'hg.fork.repository')
 
        user_model.grant_perm(usr, 'hg.fork.none')
 

	
 
        #enable global perms on specific user
 
        user_model.revoke_perm(self.u1, 'hg.create.none')
 
        user_model.grant_perm(self.u1, 'hg.create.repository')
 
        user_model.revoke_perm(self.u1, 'hg.fork.none')
 
        user_model.grant_perm(self.u1, 'hg.fork.repository')
 

	
 
        # make sure inherit flag is turned off
 
        self.u1.inherit_default_permissions = False
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        # this user will have non inherited permissions from he's
 
        # explicitly set permissions
 
        self.assertEqual(u1_auth.permissions['global'],
 
                         set(['hg.create.repository', 'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'repository.read', 'group.read']))
 

	
 
    def test_owner_permissions_doesnot_get_overwritten_by_group(self):
 
        #create repo as USER,
 
        self.test_repo = repo = RepoModel().create_repo(repo_name='myownrepo',
 
                                repo_type='hg',
 
                                description='desc',
 
                                owner=self.u1)
 

	
 
        Session().commit()
 
        #he has permissions of admin as owner
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
 
                         'repository.admin')
 
        #set his permission as users group, he should still be admin
 
        #set his permission as user group, he should still be admin
 
        self.ug1 = UsersGroupModel().create('G1')
 
        # add user to group
 
        UsersGroupModel().add_user_to_group(self.ug1, self.u1)
 
        RepoModel().grant_users_group_permission(repo, group_name=self.ug1,
 
                                                 perm='repository.none')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
 
                         'repository.admin')
 

	
 
    def test_owner_permissions_doesnot_get_overwritten_by_others(self):
 
        #create repo as USER,
 
        self.test_repo = repo = RepoModel().create_repo(repo_name='myownrepo',
 
                                repo_type='hg',
 
                                description='desc',
 
                                owner=self.u1)
 

	
 
        Session().commit()
 
        #he has permissions of admin as owner
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
 
                         'repository.admin')
 
        #set his permission as user, he should still be admin
 
        RepoModel().grant_user_permission(repo, user=self.u1,
 
                                          perm='repository.none')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
 
                         'repository.admin')
rhodecode/tests/models/test_users.py
Show inline comments
 
import unittest
 
from rhodecode.tests import *
 

	
 
from rhodecode.model.db import User, UsersGroup, UsersGroupMember, UserEmailMap,\
 
    Permission
 
from rhodecode.model.user import UserModel
 

	
 
from rhodecode.model.meta import Session
 
from rhodecode.model.users_group import UsersGroupModel
 

	
 

	
 
class TestUser(unittest.TestCase):
 
    def __init__(self, methodName='runTest'):
 
        Session.remove()
 
        super(TestUser, self).__init__(methodName=methodName)
 

	
 
    def test_create_and_remove(self):
 
        usr = UserModel().create_or_update(username=u'test_user',
 
                                           password=u'qweqwe',
 
                                     email=u'u232@rhodecode.org',
 
                                     firstname=u'u1', lastname=u'u1')
 
        Session().commit()
 
        self.assertEqual(User.get_by_username(u'test_user'), usr)
 

	
 
        # make users group
 
        # make user group
 
        users_group = UsersGroupModel().create('some_example_group')
 
        Session().commit()
 

	
 
        UsersGroupModel().add_user_to_group(users_group, usr)
 
        Session().commit()
 

	
 
        self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group)
 
        self.assertEqual(UsersGroupMember.query().count(), 1)
 
        UserModel().delete(usr.user_id)
 
        Session().commit()
 

	
 
        self.assertEqual(UsersGroupMember.query().all(), [])
 

	
 
    def test_additonal_email_as_main(self):
 
        usr = UserModel().create_or_update(username=u'test_user',
 
                                           password=u'qweqwe',
 
                                     email=u'main_email@rhodecode.org',
 
                                     firstname=u'u1', lastname=u'u1')
 
        Session().commit()
 

	
 
        def do():
 
            m = UserEmailMap()
 
            m.email = u'main_email@rhodecode.org'
 
            m.user = usr
 
            Session().add(m)
 
            Session().commit()
 
        self.assertRaises(AttributeError, do)
 

	
 
        UserModel().delete(usr.user_id)
 
        Session().commit()
 

	
 
    def test_extra_email_map(self):
 
        usr = UserModel().create_or_update(username=u'test_user',
 
                                           password=u'qweqwe',
 
                                     email=u'main_email@rhodecode.org',
 
                                     firstname=u'u1', lastname=u'u1')
 
        Session().commit()
 

	
 
        m = UserEmailMap()
 
        m.email = u'main_email2@rhodecode.org'
 
        m.user = usr
 
        Session().add(m)
 
        Session().commit()
 

	
 
        u = User.get_by_email(email='main_email@rhodecode.org')
 
        self.assertEqual(usr.user_id, u.user_id)
 
        self.assertEqual(usr.username, u.username)
 

	
 
        u = User.get_by_email(email='main_email2@rhodecode.org')
 
        self.assertEqual(usr.user_id, u.user_id)
 
        self.assertEqual(usr.username, u.username)
 
        u = User.get_by_email(email='main_email3@rhodecode.org')
 
        self.assertEqual(None, u)
 

	
 
        UserModel().delete(usr.user_id)
 
        Session().commit()
 

	
 

	
 
class TestUsers(unittest.TestCase):
 

	
 
    def __init__(self, methodName='runTest'):
 
        super(TestUsers, self).__init__(methodName=methodName)
 

	
 
    def setUp(self):
 
        self.u1 = UserModel().create_or_update(username=u'u1',
 
                                        password=u'qweqwe',
 
                                        email=u'u1@rhodecode.org',
 
                                        firstname=u'u1', lastname=u'u1')
 

	
 
    def tearDown(self):
 
        perm = Permission.query().all()
 
        for p in perm:
 
            UserModel().revoke_perm(self.u1, p)
 

	
 
        UserModel().delete(self.u1)
 
        Session().commit()
 

	
 
    def test_add_perm(self):
 
        perm = Permission.query().all()[0]
 
        UserModel().grant_perm(self.u1, perm)
 
        Session().commit()
 
        self.assertEqual(UserModel().has_perm(self.u1, perm), True)
 

	
 
    def test_has_perm(self):
 
        perm = Permission.query().all()
 
        for p in perm:
 
            has_p = UserModel().has_perm(self.u1, p)
 
            self.assertEqual(False, has_p)
 

	
 
    def test_revoke_perm(self):
 
        perm = Permission.query().all()[0]
 
        UserModel().grant_perm(self.u1, perm)
 
        Session().commit()
 
        self.assertEqual(UserModel().has_perm(self.u1, perm), True)
 

	
 
        #revoke
 
        UserModel().revoke_perm(self.u1, perm)
 
        Session().commit()
 
        self.assertEqual(UserModel().has_perm(self.u1, perm), False)
0 comments (0 inline, 0 general)