Changeset - 06398585de03
[Not reviewed]
default
0 3 0
Søren Løvborg - 9 years ago 2016-11-10 20:38:40
sorenl@unity3d.com
auth: track API key used for authentication in AuthUser

This allows us to define only once how an API key is passed to the app.
We might e.g. allow API keys to be passed in an HTTP header; with this
change, we only need to update the code in one place.

Also change the code to verify up front that the API key resolved to a
valid and active user, so LoginRequired doesn't need to do that.

Also return plain 403 Forbidden for bad API keys instead of redirecting
to the login form, which makes more sense for non-interactive clients
(the typical users of API keys).
3 files changed with 22 insertions and 22 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/auth.py
Show inline comments
 
@@ -452,53 +452,54 @@ class AuthUser(object):
 
    it looks up the user (if needed) and copies all attributes to itself,
 
    adding various non-persistent data. If lookup fails but anonymous
 
    access to Kallithea is enabled, the default user is loaded instead.
 

	
 
    `AuthUser` does not by itself authenticate users and the constructor
 
    sets the `is_authenticated` field to False. It's up to other parts
 
    of the code to check e.g. if a supplied password is correct, and if
 
    so, set `is_authenticated` to True.
 

	
 
    However, `AuthUser` does refuse to load a user that is not `active`.
 

	
 
    Note that Kallithea distinguishes between the default user (an actual
 
    user in the database with username "default") and "no user" (no actual
 
    User object, AuthUser filled with blank values and username "None").
 

	
 
    If the default user is active, that will always be used instead of
 
    "no user". On the other hand, if the default user is disabled (and
 
    there is no login information), we instead get "no user"; this should
 
    only happen on the login page (as all other requests are redirected).
 

	
 
    `is_default_user` specifically checks if the AuthUser is the user named
 
    "default". Use `is_anonymous` to check for both "default" and "no user".
 
    """
 

	
 
    def __init__(self, user_id=None, dbuser=None,
 
    def __init__(self, user_id=None, dbuser=None, authenticating_api_key=None,
 
            is_external_auth=False):
 

	
 
        self.is_authenticated = False
 
        self.is_external_auth = is_external_auth
 
        self.authenticating_api_key = authenticating_api_key
 

	
 
        user_model = UserModel()
 
        self._default_user = User.get_default_user(cache=True)
 

	
 
        # These attributes will be overridden by fill_data, below, unless the
 
        # requested user cannot be found and the default anonymous user is
 
        # not enabled.
 
        self.user_id = None
 
        self.username = None
 
        self.api_key = None
 
        self.name = ''
 
        self.lastname = ''
 
        self.email = ''
 
        self.admin = False
 
        self.inherit_default_permissions = False
 

	
 
        # Look up database user, if necessary.
 
        if user_id is not None:
 
            log.debug('Auth User lookup by USER ID %s', user_id)
 
            dbuser = user_model.get(user_id)
 
        else:
 
            # Note: dbuser is allowed to be None.
 
            log.debug('Auth User lookup by database user %s', dbuser)
 

	
 
@@ -717,64 +718,60 @@ def _redirect_to_login(message=None):
 
class LoginRequired(object):
 
    """Client must be logged in as a valid User (but the "default" user,
 
    if enabled, is considered valid), or we'll redirect to the login page.
 

	
 
    Also checks that IP address is allowed, and if using API key instead
 
    of regular cookie authentication, checks that API key access is allowed
 
    (based on `api_access` parameter and the API view whitelist).
 
    """
 

	
 
    def __init__(self, api_access=False):
 
        self.api_access = api_access
 

	
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 

	
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        controller = fargs[0]
 
        user = controller.authuser
 
        loc = "%s:%s" % (controller.__class__.__name__, func.__name__)
 
        log.debug('Checking access for user %s @ %s', user, loc)
 

	
 
        if not AuthUser.check_ip_allowed(user, controller.ip_addr):
 
            raise _redirect_to_login(_('IP %s not allowed') % controller.ip_addr)
 

	
 
        # check if we used an API key and it's a valid one
 
        api_key = request.GET.get('api_key')
 
        # Check if we used an API key to authenticate.
 
        api_key = user.authenticating_api_key
 
        if api_key is not None:
 
            # explicit controller is enabled or API is in our whitelist
 
            if self.api_access or allowed_api_access(loc, api_key=api_key):
 
                if api_key in user.api_keys:
 
            # Check that controller is enabled for API key usage.
 
            if not self.api_access and not allowed_api_access(loc, api_key=api_key):
 
                # controller does not allow API access
 
                log.warning('API access to %s is not allowed', loc)
 
                raise HTTPForbidden()
 

	
 
                    log.info('user %s authenticated with API key ****%s @ %s',
 
                             user, api_key[-4:], loc)
 
                    return func(*fargs, **fkwargs)
 
                else:
 
                    log.warning('API key ****%s is NOT valid', api_key[-4:])
 
                    raise _redirect_to_login(_('Invalid API key'))
 
            else:
 
                # controller does not allow API access
 
                log.warning('API access to %s is not allowed', loc)
 
                raise HTTPForbidden()
 

	
 
        # CSRF protection: Whenever a request has ambient authority (whether
 
        # through a session cookie or its origin IP address), it must include
 
        # the correct token, unless the HTTP method is GET or HEAD (and thus
 
        # guaranteed to be side effect free. In practice, the only situation
 
        # where we allow side effects without ambient authority is when the
 
        # authority comes from an API key; and that is handled above.
 
        if request.method not in ['GET', 'HEAD']:
 
            token = request.POST.get(secure_form.token_key)
 
            if not token or token != secure_form.authentication_token():
 
                log.error('CSRF check failed')
 
                raise HTTPForbidden()
 

	
 
        # regular user authentication
 
        if user.is_authenticated or user.is_default_user:
 
            log.info('user %s authenticated with regular auth @ %s', user, loc)
 
            return func(*fargs, **fkwargs)
 
        else:
 
            log.warning('user %s NOT authenticated with regular auth @ %s', user, loc)
 
            raise _redirect_to_login()
 

	
 
class NotAnonymous(object):
 
    """Ensures that client is not logged in as the "default" user, and
 
    redirects to the login page otherwise. Must be used together with
kallithea/lib/base.py
Show inline comments
 
@@ -351,52 +351,55 @@ class BaseController(WSGIController):
 
        c.instance_id = config.get('instance_id')
 
        c.issues_url = config.get('bugtracker', url('issues_url'))
 
        # END CONFIG VARS
 

	
 
        c.repo_name = get_repo_slug(request)  # can be empty
 
        c.backends = BACKENDS.keys()
 
        c.unread_notifications = NotificationModel() \
 
                        .get_unread_cnt_for_user(c.authuser.user_id)
 

	
 
        self.cut_off_limit = safe_int(config.get('cut_off_limit'))
 

	
 
        c.my_pr_count = PullRequest.query(reviewer_id=c.authuser.user_id, include_closed=False).count()
 

	
 
        self.sa = meta.Session
 
        self.scm_model = ScmModel(self.sa)
 

	
 
    @staticmethod
 
    def _determine_auth_user(api_key, session_authuser):
 
        """
 
        Create an `AuthUser` object given the API key (if any) and the
 
        value of the authuser session cookie.
 
        """
 

	
 
        # Authenticate by API key
 
        if api_key:
 
            # when using API_KEY we are sure user exists.
 
            return AuthUser(dbuser=User.get_by_api_key(api_key),
 
                            is_external_auth=True)
 
        if api_key is not None:
 
            au = AuthUser(dbuser=User.get_by_api_key(api_key),
 
                authenticating_api_key=api_key, is_external_auth=True)
 
            if au.is_anonymous:
 
                log.warning('API key ****%s is NOT valid', api_key[-4:])
 
                raise webob.exc.HTTPForbidden(_('Invalid API key'))
 
            return au
 

	
 
        # Authenticate by session cookie
 
        # In ancient login sessions, 'authuser' may not be a dict.
 
        # In that case, the user will have to log in again.
 
        # v0.3 and earlier included an 'is_authenticated' key; if present,
 
        # this must be True.
 
        if isinstance(session_authuser, dict) and session_authuser.get('is_authenticated', True):
 
            try:
 
                return AuthUser.from_cookie(session_authuser)
 
            except UserCreationError as e:
 
                # container auth or other auth functions that create users on
 
                # the fly can throw UserCreationError to signal issues with
 
                # user creation. Explanation should be provided in the
 
                # exception object.
 
                from kallithea.lib import helpers as h
 
                h.flash(e, 'error', logf=log.error)
 

	
 
        # Authenticate by auth_container plugin (if enabled)
 
        if any(
 
            auth_modules.importplugin(name).is_container_auth
 
            for name in Setting.get_auth_plugins()
 
        ):
 
            try:
 
                user_info = auth_modules.authenticate('', '', request.environ)
kallithea/tests/functional/test_login.py
Show inline comments
 
@@ -444,56 +444,56 @@ class TestLoginController(TestController
 
                params = {}
 
            else:
 
                if api_key is True:
 
                    api_key = User.get_first_admin().api_key
 
                params = {'api_key': api_key}
 

	
 
            self.app.get(url(controller='changeset', action='changeset_raw',
 
                             repo_name=HG_REPO, revision='tip', **params),
 
                         status=status)
 

	
 
    @parametrize('test_name,api_key,code', [
 
        ('none', None, 302),
 
        ('empty_string', '', 403),
 
        ('fake_number', '123456', 403),
 
        ('proper_api_key', True, 403)
 
    ])
 
    def test_access_not_whitelisted_page_via_api_key(self, test_name, api_key, code):
 
        whitelist = self._get_api_whitelist([])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            assert [] == whitelist['api_access_controllers_whitelist']
 
            self._api_key_test(api_key, code)
 

	
 
    @parametrize('test_name,api_key,code', [
 
        ('none', None, 302),
 
        ('empty_string', '', 302),
 
        ('fake_number', '123456', 302),
 
        ('fake_not_alnum', 'a-z', 302),
 
        ('fake_api_key', '0123456789abcdef0123456789ABCDEF01234567', 302),
 
        ('empty_string', '', 403),
 
        ('fake_number', '123456', 403),
 
        ('fake_not_alnum', 'a-z', 403),
 
        ('fake_api_key', '0123456789abcdef0123456789ABCDEF01234567', 403),
 
        ('proper_api_key', True, 200)
 
    ])
 
    def test_access_whitelisted_page_via_api_key(self, test_name, api_key, code):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            assert ['ChangesetController:changeset_raw'] == whitelist['api_access_controllers_whitelist']
 
            self._api_key_test(api_key, code)
 

	
 
    def test_access_page_via_extra_api_key(self):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            assert ['ChangesetController:changeset_raw'] == whitelist['api_access_controllers_whitelist']
 

	
 
            new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
 
            Session().commit()
 
            self._api_key_test(new_api_key.api_key, status=200)
 

	
 
    def test_access_page_via_expired_api_key(self):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            assert ['ChangesetController:changeset_raw'] == whitelist['api_access_controllers_whitelist']
 

	
 
            new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
 
            Session().commit()
 
            #patch the API key and make it expired
 
            new_api_key.expires = 0
 
            Session().commit()
 
            self._api_key_test(new_api_key.api_key, status=302)
 
            self._api_key_test(new_api_key.api_key, status=403)
0 comments (0 inline, 0 general)