Changeset - 0e3e0864f210
[Not reviewed]
default
0 7 0
Mads Kiilerich - 7 years ago 2019-01-03 01:16:36
mads@kiilerich.com
auth: drop api_access_controllers_whitelist and give API key auth same access as other kinds of auth

All authentication methods are created equal. There is no point in
discriminating api key authentication and limit it to few APIs.
7 files changed with 22 insertions and 121 deletions:
0 comments (0 inline, 0 general)
development.ini
Show inline comments
 
@@ -132,60 +132,48 @@ git_path = git
 
## git rev filter option, --all is the default filter, if you need to
 
## hide all refs in changelog switch this to --branches --tags
 
#git_rev_filter = --branches --tags
 

	
 
## RSS feed options
 
rss_cut_off_limit = 256000
 
rss_items_per_page = 10
 
rss_include_diff = false
 

	
 
## options for showing and identifying changesets
 
show_sha_length = 12
 
show_revision_number = false
 

	
 
## Canonical URL to use when creating full URLs in UI and texts.
 
## Useful when the site is available under different names or protocols.
 
## Defaults to what is provided in the WSGI environment.
 
#canonical_url = https://kallithea.example.com/repos
 

	
 
## gist URL alias, used to create nicer urls for gist. This should be an
 
## url that does rewrites to _admin/gists/<gistid>.
 
## example: http://gist.example.com/{gistid}. Empty means use the internal
 
## Kallithea url, ie. http[s]://kallithea.example.com/_admin/gists/<gistid>
 
gist_alias_url =
 

	
 
## white list of API enabled controllers. This allows to add list of
 
## controllers to which access will be enabled by api_key. eg: to enable
 
## api access to raw_files put `FilesController:raw`, to enable access to patches
 
## add `ChangesetController:changeset_patch`. This list should be "," separated
 
## Syntax is <ControllerClass>:<function>. Check debug logs for generated names
 
## Recommended settings below are commented out:
 
api_access_controllers_whitelist =
 
#    ChangesetController:changeset_patch,
 
#    ChangesetController:changeset_raw,
 
#    FilesController:raw,
 
#    FilesController:archivefile
 

	
 
## default encoding used to convert from and to unicode
 
## can be also a comma separated list of encoding in case of mixed encodings
 
default_encoding = utf-8
 

	
 
## Set Mercurial encoding, similar to setting HGENCODING before launching Kallithea
 
hgencoding = utf-8
 

	
 
## issue tracker for Kallithea (leave blank to disable, absent for default)
 
#bugtracker = https://bitbucket.org/conservancy/kallithea/issues
 

	
 
## issue tracking mapping for commit messages, comments, PR descriptions, ...
 
## Refer to the documentation ("Integration with issue trackers") for more details.
 

	
 
## regular expression to match issue references
 
## This pattern may/should contain parenthesized groups, that can
 
## be referred to in issue_server_link or issue_sub using Python backreferences
 
## (e.g. \1, \2, ...). You can also create named groups with '(?P<groupname>)'.
 
## To require mandatory whitespace before the issue pattern, use:
 
## (?:^|(?<=\s)) before the actual pattern, and for mandatory whitespace
 
## behind the issue pattern, use (?:$|(?=\s)) after the actual pattern.
 

	
 
issue_pat = #(\d+)
 

	
 
## server url to the issue
docs/api/api.rst
Show inline comments
 
@@ -1217,51 +1217,35 @@ Add comment, change status or close a gi
 
using the api_key of a user with read permissions to the original repository.
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method  : "comment_pullrequest"
 
    args:     {
 
                "pull_request_id":  "<pull_request_id>",
 
                "comment_msg":      Optional(''),
 
                "status":           Optional(None),     # "under_review", "approved" or "rejected"
 
                "close_pr":         Optional(False)",
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: True
 
    error:  null
 

	
 

	
 
API access for web views
 
------------------------
 

	
 
API access can also be turned on for each web view in Kallithea that is
 
decorated with the ``@LoginRequired`` decorator. Some views use
 
``@LoginRequired(api_access=True)`` and are always available. By default only
 
RSS/Atom feed views are enabled. Other views are
 
only available if they have been whitelisted. Edit the
 
``api_access_controllers_whitelist`` option in your .ini file and define views
 
that should have API access enabled.
 

	
 
For example, to enable API access to patch/diff, raw file and archive::
 

	
 
    api_access_controllers_whitelist =
 
        ChangesetController:changeset_patch,
 
        ChangesetController:changeset_raw,
 
        FilesController:raw,
 
        FilesController:archivefile
 

	
 
After this change, a Kallithea view can be accessed without login using
 
bearer authentication, by including this header with the request::
 
Kallithea HTTP entry points can also be accessed without login using bearer
 
authentication by including this header with the request::
 

	
 
    Authentication: Bearer <api_key>
 

	
 
Alternatively, the API key can be passed in the URL query string using
 
``?api_key=<api_key>``, though this is not recommended due to the increased
 
risk of API key leaks, and support will likely be removed in the future.
 

	
 
Exposing raw diffs is a good way to integrate with
 
third-party services like code review, or build farms that can download archives.
kallithea/controllers/feed.py
Show inline comments
 
@@ -30,49 +30,49 @@ import logging
 

	
 
from tg import response, tmpl_context as c
 
from tg.i18n import ugettext as _
 

	
 
from beaker.cache import cache_region, region_invalidate
 
from webhelpers.feedgenerator import Atom1Feed, Rss201rev2Feed
 

	
 
from kallithea import CONFIG
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import LoginRequired, HasRepoPermissionLevelDecorator
 
from kallithea.lib.base import BaseRepoController
 
from kallithea.lib.diffs import DiffProcessor
 
from kallithea.model.db import CacheInvalidation
 
from kallithea.lib.utils2 import safe_int, str2bool, safe_unicode
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
language = 'en-us'
 
ttl = "5"
 

	
 

	
 
class FeedController(BaseRepoController):
 

	
 
    @LoginRequired(api_access=True, allow_default_user=True)
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def _before(self, *args, **kwargs):
 
        super(FeedController, self)._before(*args, **kwargs)
 

	
 
    def _get_title(self, cs):
 
        return h.shorter(cs.message, 160)
 

	
 
    def __get_desc(self, cs):
 
        desc_msg = [(_('%s committed on %s')
 
                     % (h.person(cs.author), h.fmt_date(cs.date))) + '<br/>']
 
        # branches, tags, bookmarks
 
        for branch in cs.branches:
 
            desc_msg.append('branch: %s<br/>' % branch)
 
        for book in cs.bookmarks:
 
            desc_msg.append('bookmark: %s<br/>' % book)
 
        for tag in cs.tags:
 
            desc_msg.append('tag: %s<br/>' % tag)
 

	
 
        changes = []
 
        diff_limit = safe_int(CONFIG.get('rss_cut_off_limit', 32 * 1024))
 
        raw_diff = cs.diff()
 
        diff_processor = DiffProcessor(raw_diff,
 
                                       diff_limit=diff_limit,
 
                                       inline_diff=False)
kallithea/controllers/journal.py
Show inline comments
 
@@ -199,60 +199,60 @@ class JournalController(BaseController):
 
            .filter(UserFollowing.user_id == request.authuser.user_id) \
 
            .options(joinedload(UserFollowing.follows_repository)) \
 
            .all()
 

	
 
        journal = self._get_journal_data(c.following)
 

	
 
        def url_generator(**kw):
 
            return url.current(filter=c.search_term, **kw)
 

	
 
        c.journal_pager = Page(journal, page=p, items_per_page=20, url=url_generator)
 
        c.journal_day_aggregate = self._get_daily_aggregate(c.journal_pager)
 

	
 
        if request.environ.get('HTTP_X_PARTIAL_XHR'):
 
            return render('journal/journal_data.html')
 

	
 
        repos_list = Repository.query(sorted=True) \
 
            .filter_by(owner_id=request.authuser.user_id).all()
 

	
 
        repos_data = RepoModel().get_repos_as_dict(repos_list, admin=True)
 
        # data used to render the grid
 
        c.data = repos_data
 

	
 
        return render('journal/journal.html')
 

	
 
    @LoginRequired(api_access=True)
 
    @LoginRequired()
 
    def journal_atom(self):
 
        """
 
        Produce an atom-1.0 feed via feedgenerator module
 
        """
 
        following = UserFollowing.query() \
 
            .filter(UserFollowing.user_id == request.authuser.user_id) \
 
            .options(joinedload(UserFollowing.follows_repository)) \
 
            .all()
 
        return self._atom_feed(following, public=False)
 

	
 
    @LoginRequired(api_access=True)
 
    @LoginRequired()
 
    def journal_rss(self):
 
        """
 
        Produce an rss feed via feedgenerator module
 
        """
 
        following = UserFollowing.query() \
 
            .filter(UserFollowing.user_id == request.authuser.user_id) \
 
            .options(joinedload(UserFollowing.follows_repository)) \
 
            .all()
 
        return self._rss_feed(following, public=False)
 

	
 
    @LoginRequired()
 
    def toggle_following(self):
 
        user_id = request.POST.get('follows_user_id')
 
        if user_id:
 
            try:
 
                self.scm_model.toggle_following_user(user_id,
 
                                            request.authuser.user_id)
 
                Session().commit()
 
                return 'ok'
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                raise HTTPBadRequest()
 

	
 
        repo_id = request.POST.get('follows_repository_id')
 
@@ -268,47 +268,47 @@ class JournalController(BaseController):
 

	
 
        raise HTTPBadRequest()
 

	
 
    @LoginRequired(allow_default_user=True)
 
    def public_journal(self):
 
        # Return a rendered template
 
        p = safe_int(request.GET.get('page'), 1)
 

	
 
        c.following = UserFollowing.query() \
 
            .filter(UserFollowing.user_id == request.authuser.user_id) \
 
            .options(joinedload(UserFollowing.follows_repository)) \
 
            .all()
 

	
 
        journal = self._get_journal_data(c.following)
 

	
 
        c.journal_pager = Page(journal, page=p, items_per_page=20)
 

	
 
        c.journal_day_aggregate = self._get_daily_aggregate(c.journal_pager)
 

	
 
        if request.environ.get('HTTP_X_PARTIAL_XHR'):
 
            return render('journal/journal_data.html')
 

	
 
        return render('journal/public_journal.html')
 

	
 
    @LoginRequired(api_access=True, allow_default_user=True)
 
    @LoginRequired(allow_default_user=True)
 
    def public_journal_atom(self):
 
        """
 
        Produce an atom-1.0 feed via feedgenerator module
 
        """
 
        c.following = UserFollowing.query() \
 
            .filter(UserFollowing.user_id == request.authuser.user_id) \
 
            .options(joinedload(UserFollowing.follows_repository)) \
 
            .all()
 

	
 
        return self._atom_feed(c.following)
 

	
 
    @LoginRequired(api_access=True, allow_default_user=True)
 
    @LoginRequired(allow_default_user=True)
 
    def public_journal_rss(self):
 
        """
 
        Produce an rss2 feed via feedgenerator module
 
        """
 
        c.following = UserFollowing.query() \
 
            .filter(UserFollowing.user_id == request.authuser.user_id) \
 
            .options(joinedload(UserFollowing.follows_repository)) \
 
            .all()
 

	
 
        return self._rss_feed(c.following)
kallithea/lib/auth.py
Show inline comments
 
@@ -346,70 +346,48 @@ def _cached_perms_data(user_id, user_is_
 
            == Permission.permission_id)) \
 
     .join((UserGroupMember, UserGroupUserGroupToPerm.user_group_id
 
            == UserGroupMember.users_group_id)) \
 
     .filter(UserGroupMember.user_id == user_id) \
 
     .join((UserGroup, UserGroupMember.users_group_id ==
 
            UserGroup.users_group_id), aliased=True, from_joinpoint=True) \
 
     .filter(UserGroup.users_group_active == True) \
 
     .all()
 

	
 
    for perm in user_group_user_groups_perms:
 
        bump_permission(UK,
 
            perm.UserGroupUserGroupToPerm.target_user_group.users_group_name,
 
            perm.Permission.permission_name)
 

	
 
    # user explicit permission for user groups
 
    user_user_groups_perms = Permission.get_default_user_group_perms(user_id)
 
    for perm in user_user_groups_perms:
 
        bump_permission(UK,
 
            perm.UserUserGroupToPerm.user_group.users_group_name,
 
            perm.Permission.permission_name)
 

	
 
    return permissions
 

	
 

	
 
def allowed_api_access(controller_name, whitelist=None, api_key=None):
 
    """
 
    Check if given controller_name is in whitelist API access
 
    """
 
    if not whitelist:
 
        from kallithea import CONFIG
 
        whitelist = aslist(CONFIG.get('api_access_controllers_whitelist'),
 
                           sep=',')
 
        log.debug('whitelist of API access is: %s', whitelist)
 
    api_access_valid = controller_name in whitelist
 
    if api_access_valid:
 
        log.debug('controller:%s is in API whitelist', controller_name)
 
    else:
 
        msg = 'controller: %s is *NOT* in API whitelist' % (controller_name)
 
        if api_key:
 
            # if we use API key and don't have access it's a warning
 
            log.warning(msg)
 
        else:
 
            log.debug(msg)
 
    return api_access_valid
 

	
 

	
 
class AuthUser(object):
 
    """
 
    Represents a Kallithea user, including various authentication and
 
    authorization information. Typically used to store the current user,
 
    but is also used as a generic user information data structure in
 
    parts of the code, e.g. user management.
 

	
 
    Constructed from a database `User` object, a user ID or cookie dict,
 
    it looks up the user (if needed) and copies all attributes to itself,
 
    adding various non-persistent data. If lookup fails but anonymous
 
    access to Kallithea is enabled, the default user is loaded instead.
 

	
 
    `AuthUser` does not by itself authenticate users and the constructor
 
    sets the `is_authenticated` field to False. It's up to other parts
 
    of the code to check e.g. if a supplied password is correct, and if
 
    so, set `is_authenticated` to True.
 

	
 
    However, `AuthUser` does refuse to load a user that is not `active`.
 

	
 
    Note that Kallithea distinguishes between the default user (an actual
 
    user in the database with username "default") and "no user" (no actual
 
    User object, AuthUser filled with blank values and username "None").
 

	
 
    If the default user is active, that will always be used instead of
 
@@ -668,77 +646,67 @@ def set_available_permissions(config):
 
#==============================================================================
 
# CHECK DECORATORS
 
#==============================================================================
 

	
 
def _redirect_to_login(message=None):
 
    """Return an exception that must be raised. It will redirect to the login
 
    page which will redirect back to the current URL after authentication.
 
    The optional message will be shown in a flash message."""
 
    from kallithea.lib import helpers as h
 
    if message:
 
        h.flash(message, category='warning')
 
    p = request.path_qs
 
    log.debug('Redirecting to login page, origin: %s', p)
 
    return HTTPFound(location=url('login_home', came_from=p))
 

	
 

	
 
# Use as decorator
 
class LoginRequired(object):
 
    """Client must be logged in as a valid User, or we'll redirect to the login
 
    page.
 

	
 
    If the "default" user is enabled and allow_default_user is true, that is
 
    considered valid too.
 

	
 
    Also checks that IP address is allowed, and if using API key instead
 
    of regular cookie authentication, checks that API key access is allowed
 
    (based on `api_access` parameter and the API view whitelist).
 
    Also checks that IP address is allowed.
 
    """
 

	
 
    def __init__(self, api_access=False, allow_default_user=False):
 
        self.api_access = api_access
 
    def __init__(self, allow_default_user=False):
 
        self.allow_default_user = allow_default_user
 

	
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 

	
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        controller = fargs[0]
 
        user = request.authuser
 
        loc = "%s:%s" % (controller.__class__.__name__, func.__name__)
 
        log.debug('Checking access for user %s @ %s', user, loc)
 

	
 
        # Check if we used an API key to authenticate.
 
        api_key = user.authenticating_api_key
 
        if api_key is not None:
 
            # Check that controller is enabled for API key usage.
 
            if not self.api_access and not allowed_api_access(loc, api_key=api_key):
 
                # controller does not allow API access
 
                log.warning('API access to %s is not allowed', loc)
 
                raise HTTPForbidden()
 

	
 
        if user.authenticating_api_key is not None:
 
            log.info('user %s authenticated with API key ****%s @ %s',
 
                     user, api_key[-4:], loc)
 
                     user, user.authenticating_api_key[-4:], loc)
 
            return func(*fargs, **fkwargs)
 

	
 
        # CSRF protection: Whenever a request has ambient authority (whether
 
        # through a session cookie or its origin IP address), it must include
 
        # the correct token, unless the HTTP method is GET or HEAD (and thus
 
        # guaranteed to be side effect free. In practice, the only situation
 
        # where we allow side effects without ambient authority is when the
 
        # authority comes from an API key; and that is handled above.
 
        if request.method not in ['GET', 'HEAD']:
 
            token = request.POST.get(secure_form.token_key)
 
            if not token or token != secure_form.authentication_token():
 
                log.error('CSRF check failed')
 
                raise HTTPForbidden()
 

	
 
        # regular user authentication
 
        if user.is_authenticated:
 
            log.info('user %s authenticated with regular auth @ %s', user, loc)
 
            return func(*fargs, **fkwargs)
 
        elif user.is_default_user:
 
            if self.allow_default_user:
 
                log.info('default user @ %s', loc)
 
                return func(*fargs, **fkwargs)
 
            log.info('default user is not accepted here @ %s', loc)
 
        else:
kallithea/lib/paster_commands/template.ini.mako
Show inline comments
 
@@ -229,60 +229,48 @@ git_path = git
 
<%text>## git rev filter option, --all is the default filter, if you need to</%text>
 
<%text>## hide all refs in changelog switch this to --branches --tags</%text>
 
#git_rev_filter = --branches --tags
 

	
 
<%text>## RSS feed options</%text>
 
rss_cut_off_limit = 256000
 
rss_items_per_page = 10
 
rss_include_diff = false
 

	
 
<%text>## options for showing and identifying changesets</%text>
 
show_sha_length = 12
 
show_revision_number = false
 

	
 
<%text>## Canonical URL to use when creating full URLs in UI and texts.</%text>
 
<%text>## Useful when the site is available under different names or protocols.</%text>
 
<%text>## Defaults to what is provided in the WSGI environment.</%text>
 
#canonical_url = https://kallithea.example.com/repos
 

	
 
<%text>## gist URL alias, used to create nicer urls for gist. This should be an</%text>
 
<%text>## url that does rewrites to _admin/gists/<gistid>.</%text>
 
<%text>## example: http://gist.example.com/{gistid}. Empty means use the internal</%text>
 
<%text>## Kallithea url, ie. http[s]://kallithea.example.com/_admin/gists/<gistid></%text>
 
gist_alias_url =
 

	
 
<%text>## white list of API enabled controllers. This allows to add list of</%text>
 
<%text>## controllers to which access will be enabled by api_key. eg: to enable</%text>
 
<%text>## api access to raw_files put `FilesController:raw`, to enable access to patches</%text>
 
<%text>## add `ChangesetController:changeset_patch`. This list should be "," separated</%text>
 
<%text>## Syntax is <ControllerClass>:<function>. Check debug logs for generated names</%text>
 
<%text>## Recommended settings below are commented out:</%text>
 
api_access_controllers_whitelist =
 
#    ChangesetController:changeset_patch,
 
#    ChangesetController:changeset_raw,
 
#    FilesController:raw,
 
#    FilesController:archivefile
 

	
 
<%text>## default encoding used to convert from and to unicode</%text>
 
<%text>## can be also a comma separated list of encoding in case of mixed encodings</%text>
 
default_encoding = utf-8
 

	
 
<%text>## Set Mercurial encoding, similar to setting HGENCODING before launching Kallithea</%text>
 
hgencoding = utf-8
 

	
 
<%text>## issue tracker for Kallithea (leave blank to disable, absent for default)</%text>
 
#bugtracker = https://bitbucket.org/conservancy/kallithea/issues
 

	
 
<%text>## issue tracking mapping for commit messages, comments, PR descriptions, ...</%text>
 
<%text>## Refer to the documentation ("Integration with issue trackers") for more details.</%text>
 

	
 
<%text>## regular expression to match issue references</%text>
 
<%text>## This pattern may/should contain parenthesized groups, that can</%text>
 
<%text>## be referred to in issue_server_link or issue_sub using Python backreferences</%text>
 
<%text>## (e.g. \1, \2, ...). You can also create named groups with '(?P<groupname>)'.</%text>
 
<%text>## To require mandatory whitespace before the issue pattern, use:</%text>
 
<%text>## (?:^|(?<=\s)) before the actual pattern, and for mandatory whitespace</%text>
 
<%text>## behind the issue pattern, use (?:$|(?=\s)) after the actual pattern.</%text>
 

	
 
issue_pat = #(\d+)
 

	
 
<%text>## server url to the issue</%text>
kallithea/tests/functional/test_login.py
Show inline comments
 
@@ -420,101 +420,74 @@ class TestLoginController(TestController
 
                                    action='password_reset_confirmation',
 
                                    email=email,
 
                                    timestamp=timestamp,
 
                                    token=token))
 
        assert response.status == '200 OK'
 
        response.mustcontain("You are about to set a new password for the email address %s" % email)
 

	
 
        response = self.app.post(url(controller='login',
 
                                     action='password_reset_confirmation'),
 
                                 {'email': email,
 
                                  'timestamp': timestamp,
 
                                  'password': "p@ssw0rd",
 
                                  'password_confirm': "p@ssw0rd",
 
                                  'token': token,
 
                                 })
 
        assert response.status == '302 Found'
 
        self.checkSessionFlash(response, 'Successfully updated password')
 

	
 
        response = response.follow()
 

	
 
    #==========================================================================
 
    # API
 
    #==========================================================================
 

	
 
    def _get_api_whitelist(self, values=None):
 
        config = {'api_access_controllers_whitelist': values or []}
 
        return config
 

	
 
    def _api_key_test(self, api_key, status):
 
        """Verifies HTTP status code for accessing an auth-requiring page,
 
        using the given api_key URL parameter as well as using the API key
 
        with bearer authentication.
 

	
 
        If api_key is None, no api_key is passed at all. If api_key is True,
 
        a real, working API key is used.
 
        """
 
        with fixture.anon_access(False):
 
            if api_key is None:
 
                params = {}
 
                headers = {}
 
            else:
 
                if api_key is True:
 
                    api_key = User.get_first_admin().api_key
 
                params = {'api_key': api_key}
 
                headers = {'Authorization': 'Bearer ' + str(api_key)}
 

	
 
            self.app.get(url(controller='changeset', action='changeset_raw',
 
                             repo_name=HG_REPO, revision='tip', **params),
 
                         status=status)
 

	
 
            self.app.get(url(controller='changeset', action='changeset_raw',
 
                             repo_name=HG_REPO, revision='tip'),
 
                         headers=headers,
 
                         status=status)
 

	
 
    @parametrize('test_name,api_key,code', [
 
        ('none', None, 302),
 
        ('empty_string', '', 403),
 
        ('fake_number', '123456', 403),
 
        ('proper_api_key', True, 403)
 
    ])
 
    def test_access_not_whitelisted_page_via_api_key(self, test_name, api_key, code):
 
        whitelist = self._get_api_whitelist([])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            assert [] == whitelist['api_access_controllers_whitelist']
 
            self._api_key_test(api_key, code)
 

	
 
    @parametrize('test_name,api_key,code', [
 
        ('none', None, 302),
 
        ('empty_string', '', 403),
 
        ('fake_number', '123456', 403),
 
        ('fake_not_alnum', 'a-z', 403),
 
        ('fake_api_key', '0123456789abcdef0123456789ABCDEF01234567', 403),
 
        ('proper_api_key', True, 200)
 
    ])
 
    def test_access_whitelisted_page_via_api_key(self, test_name, api_key, code):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            assert ['ChangesetController:changeset_raw'] == whitelist['api_access_controllers_whitelist']
 
            self._api_key_test(api_key, code)
 
    def test_access_page_via_api_key(self, test_name, api_key, code):
 
        self._api_key_test(api_key, code)
 

	
 
    def test_access_page_via_extra_api_key(self):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            assert ['ChangesetController:changeset_raw'] == whitelist['api_access_controllers_whitelist']
 

	
 
            new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
 
            Session().commit()
 
            self._api_key_test(new_api_key.api_key, status=200)
 
        new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
 
        Session().commit()
 
        self._api_key_test(new_api_key.api_key, status=200)
 

	
 
    def test_access_page_via_expired_api_key(self):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            assert ['ChangesetController:changeset_raw'] == whitelist['api_access_controllers_whitelist']
 

	
 
            new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
 
            Session().commit()
 
            # patch the API key and make it expired
 
            new_api_key.expires = 0
 
            Session().commit()
 
            self._api_key_test(new_api_key.api_key, status=403)
 
        new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
 
        Session().commit()
 
        # patch the API key and make it expired
 
        new_api_key.expires = 0
 
        Session().commit()
 
        self._api_key_test(new_api_key.api_key, status=403)
0 comments (0 inline, 0 general)