Changeset - e04106e46d6f
[Not reviewed]
default
0 2 0
Thomas De Schampheleire - 11 years ago 2015-03-30 21:27:02
thomas.de.schampheleire@gmail.com
auth: return early in LoginRequired on API key validation

Simplify the logic in the LoginRequired decorator when Kallithea is accessed
using an API key. Either:
- the key is valid and API access is allowed for the accessed method
(continue), or
- the key is invalid (redirect to login page), or
- the accessed method does not allow API access (403 Forbidden)

In none of these cases does it make sense to continue checking for user
authentication, so return early.
2 files changed with 19 insertions and 21 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/auth.py
Show inline comments
 
@@ -563,421 +563,419 @@ class AuthUser(object):
 
            decides if explicit flag is turned off how to specify the permission
 
            for case when user is in a group + have defined separate permission
 
        """
 
        user_id = user.user_id
 
        user_is_admin = user.is_admin
 
        user_inherit_default_permissions = user.inherit_default_permissions
 

	
 
        log.debug('Getting PERMISSION tree')
 
        compute = conditional_cache('short_term', 'cache_desc',
 
                                    condition=cache, func=_cached_perms_data)
 
        return compute(user_id, user_is_admin,
 
                       user_inherit_default_permissions, explicit, algo)
 

	
 
    def get_api_keys(self):
 
        api_keys = [self.api_key]
 
        for api_key in UserApiKeys.query()\
 
                .filter(UserApiKeys.user_id == self.user_id)\
 
                .filter(or_(UserApiKeys.expires == -1,
 
                            UserApiKeys.expires >= time.time())).all():
 
            api_keys.append(api_key.api_key)
 

	
 
        return api_keys
 

	
 
    @property
 
    def is_admin(self):
 
        return self.admin
 

	
 
    @property
 
    def repositories_admin(self):
 
        """
 
        Returns list of repositories you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['repositories'].iteritems()
 
                if x[1] == 'repository.admin']
 

	
 
    @property
 
    def repository_groups_admin(self):
 
        """
 
        Returns list of repository groups you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['repositories_groups'].iteritems()
 
                if x[1] == 'group.admin']
 

	
 
    @property
 
    def user_groups_admin(self):
 
        """
 
        Returns list of user groups you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['user_groups'].iteritems()
 
                if x[1] == 'usergroup.admin']
 

	
 
    @property
 
    def ip_allowed(self):
 
        """
 
        Checks if ip_addr used in constructor is allowed from defined list of
 
        allowed ip_addresses for user
 

	
 
        :returns: boolean, True if ip is in allowed ip range
 
        """
 
        # check IP
 
        inherit = self.inherit_default_permissions
 
        return AuthUser.check_ip_allowed(self.user_id, self.ip_addr,
 
                                         inherit_from_default=inherit)
 

	
 
    @classmethod
 
    def check_ip_allowed(cls, user_id, ip_addr, inherit_from_default):
 
        allowed_ips = AuthUser.get_allowed_ips(user_id, cache=True,
 
                        inherit_from_default=inherit_from_default)
 
        if check_ip_access(source_ip=ip_addr, allowed_ips=allowed_ips):
 
            log.debug('IP:%s is in range of %s' % (ip_addr, allowed_ips))
 
            return True
 
        else:
 
            log.info('Access for IP:%s forbidden, '
 
                     'not in %s' % (ip_addr, allowed_ips))
 
            return False
 

	
 
    def __repr__(self):
 
        return "<AuthUser('id:%s[%s] ip:%s auth:%s')>"\
 
            % (self.user_id, self.username, self.ip_addr, self.is_authenticated)
 

	
 
    def set_authenticated(self, authenticated=True):
 
        if self.user_id != self.anonymous_user.user_id:
 
            self.is_authenticated = authenticated
 

	
 
    def get_cookie_store(self):
 
        return {'username': self.username,
 
                'user_id': self.user_id,
 
                'is_authenticated': self.is_authenticated}
 

	
 
    @classmethod
 
    def from_cookie_store(cls, cookie_store):
 
        """
 
        Creates AuthUser from a cookie store
 

	
 
        :param cls:
 
        :param cookie_store:
 
        """
 
        user_id = cookie_store.get('user_id')
 
        username = cookie_store.get('username')
 
        api_key = cookie_store.get('api_key')
 
        return AuthUser(user_id, api_key, username)
 

	
 
    @classmethod
 
    def get_allowed_ips(cls, user_id, cache=False, inherit_from_default=False):
 
        _set = set()
 

	
 
        if inherit_from_default:
 
            default_ips = UserIpMap.query().filter(UserIpMap.user ==
 
                                            User.get_default_user(cache=True))
 
            if cache:
 
                default_ips = default_ips.options(FromCache("sql_cache_short",
 
                                                  "get_user_ips_default"))
 

	
 
            # populate from default user
 
            for ip in default_ips:
 
                try:
 
                    _set.add(ip.ip_addr)
 
                except ObjectDeletedError:
 
                    # since we use heavy caching sometimes it happens that we get
 
                    # deleted objects here, we just skip them
 
                    pass
 

	
 
        user_ips = UserIpMap.query().filter(UserIpMap.user_id == user_id)
 
        if cache:
 
            user_ips = user_ips.options(FromCache("sql_cache_short",
 
                                                  "get_user_ips_%s" % user_id))
 

	
 
        for ip in user_ips:
 
            try:
 
                _set.add(ip.ip_addr)
 
            except ObjectDeletedError:
 
                # since we use heavy caching sometimes it happens that we get
 
                # deleted objects here, we just skip them
 
                pass
 
        return _set or set(['0.0.0.0/0', '::/0'])
 

	
 

	
 
def set_available_permissions(config):
 
    """
 
    This function will propagate pylons globals with all available defined
 
    permission given in db. We don't want to check each time from db for new
 
    permissions since adding a new permission also requires application restart
 
    ie. to decorate new views with the newly created permission
 

	
 
    :param config: current pylons config instance
 

	
 
    """
 
    log.info('getting information about all available permissions')
 
    try:
 
        sa = meta.Session
 
        all_perms = sa.query(Permission).all()
 
        config['available_permissions'] = [x.permission_name for x in all_perms]
 
    finally:
 
        meta.Session.remove()
 

	
 

	
 
#==============================================================================
 
# CHECK DECORATORS
 
#==============================================================================
 

	
 
def redirect_to_login(message=None):
 
    from kallithea.lib import helpers as h
 
    p = url.current()
 
    h.flash(h.literal(message), category='warning')
 
    log.debug('Redirecting to login page, origin: %s' % p)
 
    return redirect(url('login_home', came_from=p))
 

	
 
class LoginRequired(object):
 
    """
 
    Must be logged in to execute this function else
 
    redirect to login page
 

	
 
    :param api_access: if enabled this checks only for valid auth token
 
        and grants access based on valid token
 
    """
 

	
 
    def __init__(self, api_access=False):
 
        self.api_access = api_access
 

	
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 

	
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        cls = fargs[0]
 
        user = cls.authuser
 
        loc = "%s:%s" % (cls.__class__.__name__, func.__name__)
 
        log.debug('Checking access for user %s @ %s' % (user, loc))
 

	
 
        # check if our IP is allowed
 
        if not user.ip_allowed:
 
            return redirect_to_login(_('IP %s not allowed' % (user.ip_addr)))
 

	
 
        # check if we used an APIKEY and it's a valid one
 
        # defined whitelist of controllers which API access will be enabled
 
        _api_key = request.GET.get('api_key', '')
 
        api_access_valid = allowed_api_access(loc, api_key=_api_key)
 

	
 
        # explicit controller is enabled or API is in our whitelist
 
        if self.api_access or api_access_valid:
 
            log.debug('Checking API KEY access for %s' % cls)
 
            if _api_key and _api_key in user.api_keys:
 
                api_access_valid = True
 
                log.debug('API KEY ****%s is VALID' % _api_key[-4:])
 
        # check if we used an API key and it's a valid one
 
        api_key = request.GET.get('api_key')
 
        if api_key is not None:
 
            # explicit controller is enabled or API is in our whitelist
 
            if self.api_access or allowed_api_access(loc, api_key=api_key):
 
                if api_key in user.api_keys:
 
                    log.info('user %s authenticated with API key ****%s @ %s'
 
                             % (user, api_key[-4:], loc))
 
                    return func(*fargs, **fkwargs)
 
                else:
 
                    log.warning('API key ****%s is NOT valid' % api_key[-4:])
 
                    return redirect_to_login(_('Invalid API key'))
 
            else:
 
                api_access_valid = False
 
                if not _api_key:
 
                    log.debug("API KEY *NOT* present in request")
 
                else:
 
                    log.warning("API KEY ****%s *NOT* valid" % _api_key[-4:])
 
                # controller does not allow API access
 
                log.warning('API access to %s is not allowed' % loc)
 
                return abort(403)
 

	
 
        # CSRF protection - POSTs with session auth must contain correct token
 
        if request.POST and user.is_authenticated and not api_access_valid:
 
        if request.POST and user.is_authenticated:
 
            token = request.POST.get(secure_form.token_key)
 
            if not token or token != secure_form.authentication_token():
 
                log.error('CSRF check failed')
 
                return abort(403)
 

	
 
        log.debug('Checking if %s is authenticated @ %s' % (user.username, loc))
 
        reason = 'RegularAuth' if user.is_authenticated else 'APIAuth'
 

	
 
        if user.is_authenticated or api_access_valid:
 
        if user.is_authenticated:
 
            log.info('user %s authenticating with:%s IS authenticated on func %s '
 
                     % (user, reason, loc)
 
            )
 
            return func(*fargs, **fkwargs)
 
        else:
 
            log.warning('user %s authenticating with:%s NOT authenticated on func: %s: '
 
                     'API_ACCESS:%s'
 
                     % (user, reason, loc, api_access_valid)
 
                     % (user, reason, loc)
 
            )
 
            return redirect_to_login()
 

	
 
class NotAnonymous(object):
 
    """
 
    Must be logged in to execute this function else
 
    redirect to login page"""
 

	
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 

	
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        cls = fargs[0]
 
        self.user = cls.authuser
 

	
 
        log.debug('Checking if user is not anonymous @%s' % cls)
 

	
 
        anonymous = self.user.username == User.DEFAULT_USER
 

	
 
        if anonymous:
 
            return redirect_to_login(_('You need to be a registered user to '
 
                    'perform this action'))
 
        else:
 
            return func(*fargs, **fkwargs)
 

	
 

	
 
class PermsDecorator(object):
 
    """Base class for controller decorators"""
 

	
 
    def __init__(self, *required_perms):
 
        self.required_perms = set(required_perms)
 
        self.user_perms = None
 

	
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 

	
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        cls = fargs[0]
 
        self.user = cls.authuser
 
        self.user_perms = self.user.permissions
 
        log.debug('checking %s permissions %s for %s %s',
 
           self.__class__.__name__, self.required_perms, cls, self.user)
 

	
 
        if self.check_permissions():
 
            log.debug('Permission granted for %s %s' % (cls, self.user))
 
            return func(*fargs, **fkwargs)
 

	
 
        else:
 
            log.debug('Permission denied for %s %s' % (cls, self.user))
 
            anonymous = self.user.username == User.DEFAULT_USER
 

	
 
            if anonymous:
 
                return redirect_to_login(_('You need to be signed in to view this page'))
 
            else:
 
                # redirect with forbidden ret code
 
                return abort(403)
 

	
 
    def check_permissions(self):
 
        """Dummy function for overriding"""
 
        raise Exception('You have to write this function in child class')
 

	
 

	
 
class HasPermissionAllDecorator(PermsDecorator):
 
    """
 
    Checks for access permission for all given predicates. All of them
 
    have to be meet in order to fulfill the request
 
    """
 

	
 
    def check_permissions(self):
 
        if self.required_perms.issubset(self.user_perms.get('global')):
 
            return True
 
        return False
 

	
 

	
 
class HasPermissionAnyDecorator(PermsDecorator):
 
    """
 
    Checks for access permission for any of given predicates. In order to
 
    fulfill the request any of predicates must be meet
 
    """
 

	
 
    def check_permissions(self):
 
        if self.required_perms.intersection(self.user_perms.get('global')):
 
            return True
 
        return False
 

	
 

	
 
class HasRepoPermissionAllDecorator(PermsDecorator):
 
    """
 
    Checks for access permission for all given predicates for specific
 
    repository. All of them have to be meet in order to fulfill the request
 
    """
 

	
 
    def check_permissions(self):
 
        repo_name = get_repo_slug(request)
 
        try:
 
            user_perms = set([self.user_perms['repositories'][repo_name]])
 
        except KeyError:
 
            return False
 
        if self.required_perms.issubset(user_perms):
 
            return True
 
        return False
 

	
 

	
 
class HasRepoPermissionAnyDecorator(PermsDecorator):
 
    """
 
    Checks for access permission for any of given predicates for specific
 
    repository. In order to fulfill the request any of predicates must be meet
 
    """
 

	
 
    def check_permissions(self):
 
        repo_name = get_repo_slug(request)
 
        try:
 
            user_perms = set([self.user_perms['repositories'][repo_name]])
 
        except KeyError:
 
            return False
 

	
 
        if self.required_perms.intersection(user_perms):
 
            return True
 
        return False
 

	
 

	
 
class HasRepoGroupPermissionAllDecorator(PermsDecorator):
 
    """
 
    Checks for access permission for all given predicates for specific
 
    repository group. All of them have to be meet in order to fulfill the request
 
    """
 

	
 
    def check_permissions(self):
 
        group_name = get_repo_group_slug(request)
 
        try:
 
            user_perms = set([self.user_perms['repositories_groups'][group_name]])
 
        except KeyError:
 
            return False
 

	
 
        if self.required_perms.issubset(user_perms):
 
            return True
 
        return False
 

	
 

	
 
class HasRepoGroupPermissionAnyDecorator(PermsDecorator):
 
    """
 
    Checks for access permission for any of given predicates for specific
 
    repository group. In order to fulfill the request any of predicates must be meet
 
    """
 

	
 
    def check_permissions(self):
 
        group_name = get_repo_group_slug(request)
 
        try:
 
            user_perms = set([self.user_perms['repositories_groups'][group_name]])
 
        except KeyError:
 
            return False
 

	
 
        if self.required_perms.intersection(user_perms):
 
            return True
 
        return False
 

	
 

	
 
class HasUserGroupPermissionAllDecorator(PermsDecorator):
 
    """
 
    Checks for access permission for all given predicates for specific
 
    user group. All of them have to be meet in order to fulfill the request
 
    """
 

	
 
    def check_permissions(self):
 
        group_name = get_user_group_slug(request)
 
        try:
 
            user_perms = set([self.user_perms['user_groups'][group_name]])
 
        except KeyError:
 
            return False
 

	
 
        if self.required_perms.issubset(user_perms):
 
            return True
 
        return False
 

	
 

	
 
class HasUserGroupPermissionAnyDecorator(PermsDecorator):
 
    """
 
    Checks for access permission for any of given predicates for specific
 
    user group. In order to fulfill the request any of predicates must be meet
 
    """
 

	
 
    def check_permissions(self):
 
        group_name = get_user_group_slug(request)
 
        try:
 
            user_perms = set([self.user_perms['user_groups'][group_name]])
 
        except KeyError:
 
            return False
 

	
 
        if self.required_perms.intersection(user_perms):
 
            return True
 
        return False
 

	
kallithea/tests/functional/test_login.py
Show inline comments
 
@@ -130,246 +130,246 @@ class TestLoginController(TestController
 
        response.mustcontain(msg)
 

	
 
    def test_register_err_same_email_case_sensitive(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': 'test_admin_1',
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'TesT_Admin@mail.COM',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
 
        msg = validators.UniqSystemEmail()()._messages['email_taken']
 
        response.mustcontain(msg)
 

	
 
    def test_register_err_wrong_data(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': 'xs',
 
                                             'password': 'test',
 
                                             'password_confirmation': 'test',
 
                                             'email': 'goodmailm',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
 
        self.assertEqual(response.status, '200 OK')
 
        response.mustcontain('An email address must contain a single @')
 
        response.mustcontain('Enter a value 6 characters long or more')
 

	
 
    def test_register_err_username(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': 'error user',
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'goodmailm',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
 

	
 
        response.mustcontain('An email address must contain a single @')
 
        response.mustcontain('Username may only contain '
 
                'alphanumeric characters underscores, '
 
                'periods or dashes and must begin with '
 
                'alphanumeric character')
 

	
 
    def test_register_err_case_sensitive(self):
 
        usr = 'Test_Admin'
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': usr,
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'goodmailm',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
 

	
 
        response.mustcontain('An email address must contain a single @')
 
        msg = validators.ValidUsername()._messages['username_exists']
 
        msg = h.html_escape(msg % {'username': usr})
 
        response.mustcontain(msg)
 

	
 
    def test_register_special_chars(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                        {'username': 'xxxaxn',
 
                                         'password': 'ąćźżąśśśś',
 
                                         'password_confirmation': 'ąćźżąśśśś',
 
                                         'email': 'goodmailm@test.plx',
 
                                         'firstname': 'test',
 
                                         'lastname': 'test'})
 

	
 
        msg = validators.ValidPassword()._messages['invalid_password']
 
        response.mustcontain(msg)
 

	
 
    def test_register_password_mismatch(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': 'xs',
 
                                             'password': '123qwe',
 
                                             'password_confirmation': 'qwe123',
 
                                             'email': 'goodmailm@test.plxa',
 
                                             'firstname': 'test',
 
                                             'lastname': 'test'})
 
        msg = validators.ValidPasswordsMatch()._messages['password_mismatch']
 
        response.mustcontain(msg)
 

	
 
    def test_register_ok(self):
 
        username = 'test_regular4'
 
        password = 'qweqwe'
 
        email = 'username@test.com'
 
        name = 'testname'
 
        lastname = 'testlastname'
 

	
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username': username,
 
                                             'password': password,
 
                                             'password_confirmation': password,
 
                                             'email': email,
 
                                             'firstname': name,
 
                                             'lastname': lastname,
 
                                             'admin': True})  # This should be overriden
 
        self.assertEqual(response.status, '302 Found')
 
        self.checkSessionFlash(response, 'You have successfully registered into Kallithea')
 

	
 
        ret = Session().query(User).filter(User.username == 'test_regular4').one()
 
        self.assertEqual(ret.username, username)
 
        self.assertEqual(check_password(password, ret.password), True)
 
        self.assertEqual(ret.email, email)
 
        self.assertEqual(ret.name, name)
 
        self.assertEqual(ret.lastname, lastname)
 
        self.assertNotEqual(ret.api_key, None)
 
        self.assertEqual(ret.admin, False)
 

	
 
    def test_forgot_password_wrong_mail(self):
 
        bad_email = 'username@wrongmail.org'
 
        response = self.app.post(
 
                        url(controller='login', action='password_reset'),
 
                            {'email': bad_email, }
 
        )
 

	
 
        msg = validators.ValidSystemEmail()._messages['non_existing_email']
 
        msg = h.html_escape(msg % {'email': bad_email})
 
        response.mustcontain()
 

	
 
    def test_forgot_password(self):
 
        response = self.app.get(url(controller='login',
 
                                    action='password_reset'))
 
        self.assertEqual(response.status, '200 OK')
 

	
 
        username = 'test_password_reset_1'
 
        password = 'qweqwe'
 
        email = 'username@python-works.com'
 
        name = 'passwd'
 
        lastname = 'reset'
 

	
 
        new = User()
 
        new.username = username
 
        new.password = password
 
        new.email = email
 
        new.name = name
 
        new.lastname = lastname
 
        new.api_key = generate_api_key(username)
 
        Session().add(new)
 
        Session().commit()
 

	
 
        response = self.app.post(url(controller='login',
 
                                     action='password_reset'),
 
                                 {'email': email, })
 

	
 
        self.checkSessionFlash(response, 'Your password reset link was sent')
 

	
 
        response = response.follow()
 

	
 
        # BAD KEY
 

	
 
        key = "bad"
 
        response = self.app.get(url(controller='login',
 
                                    action='password_reset_confirmation',
 
                                    key=key))
 
        self.assertEqual(response.status, '302 Found')
 
        self.assertTrue(response.location.endswith(url('reset_password')))
 

	
 
        # GOOD KEY
 

	
 
        key = User.get_by_username(username).api_key
 
        response = self.app.get(url(controller='login',
 
                                    action='password_reset_confirmation',
 
                                    key=key))
 
        self.assertEqual(response.status, '302 Found')
 
        self.assertTrue(response.location.endswith(url('login_home')))
 

	
 
        self.checkSessionFlash(response,
 
                               ('Your password reset was successful, '
 
                                'new password has been sent to your email'))
 

	
 
        response = response.follow()
 

	
 
    def _get_api_whitelist(self, values=None):
 
        config = {'api_access_controllers_whitelist': values or []}
 
        return config
 

	
 
    @parameterized.expand([
 
        ('none', None),
 
        ('empty_string', ''),
 
        ('fake_number', '123456'),
 
        ('proper_api_key', None)
 
    ])
 
    def test_access_not_whitelisted_page_via_api_key(self, test_name, api_key):
 
        whitelist = self._get_api_whitelist([])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            self.assertEqual([],
 
                             whitelist['api_access_controllers_whitelist'])
 
            if test_name == 'proper_api_key':
 
                #use builtin if api_key is None
 
                api_key = User.get_first_admin().api_key
 

	
 
            with fixture.anon_access(False):
 
                self.app.get(url(controller='changeset',
 
                                 action='changeset_raw',
 
                                 repo_name=HG_REPO, revision='tip', api_key=api_key),
 
                             status=302)
 
                             status=403)
 

	
 
    @parameterized.expand([
 
        ('none', None, 302),
 
        ('empty_string', '', 302),
 
        ('fake_number', '123456', 302),
 
        ('proper_api_key', None, 200)
 
    ])
 
    def test_access_whitelisted_page_via_api_key(self, test_name, api_key, code):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            self.assertEqual(['ChangesetController:changeset_raw'],
 
                             whitelist['api_access_controllers_whitelist'])
 
            if test_name == 'proper_api_key':
 
                api_key = User.get_first_admin().api_key
 

	
 
            with fixture.anon_access(False):
 
                self.app.get(url(controller='changeset',
 
                                 action='changeset_raw',
 
                                 repo_name=HG_REPO, revision='tip', api_key=api_key),
 
                             status=code)
 

	
 
    def test_access_page_via_extra_api_key(self):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            self.assertEqual(['ChangesetController:changeset_raw'],
 
                             whitelist['api_access_controllers_whitelist'])
 

	
 
            new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
 
            Session().commit()
 
            with fixture.anon_access(False):
 
                self.app.get(url(controller='changeset',
 
                                 action='changeset_raw',
 
                                 repo_name=HG_REPO, revision='tip', api_key=new_api_key.api_key),
 
                             status=200)
 

	
 
    def test_access_page_via_expired_api_key(self):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            self.assertEqual(['ChangesetController:changeset_raw'],
 
                             whitelist['api_access_controllers_whitelist'])
 

	
 
            new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
 
            Session().commit()
 
            #patch the api key and make it expired
 
            new_api_key.expires = 0
 
            Session().add(new_api_key)
 
            Session().commit()
 
            with fixture.anon_access(False):
 
                self.app.get(url(controller='changeset',
 
                                 action='changeset_raw',
 
                                 repo_name=HG_REPO, revision='tip',
 
                                 api_key=new_api_key.api_key),
 
                             status=302)
0 comments (0 inline, 0 general)