Changeset - 06398585de03
[Not reviewed]
default
0 3 0
Søren Løvborg - 9 years ago 2016-11-10 20:38:40
sorenl@unity3d.com
auth: track API key used for authentication in AuthUser

This allows us to define only once how an API key is passed to the app.
We might e.g. allow API keys to be passed in an HTTP header; with this
change, we only need to update the code in one place.

Also change the code to verify up front that the API key resolved to a
valid and active user, so LoginRequired doesn't need to do that.

Also return plain 403 Forbidden for bad API keys instead of redirecting
to the login form, which makes more sense for non-interactive clients
(the typical users of API keys).
3 files changed with 22 insertions and 22 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/auth.py
Show inline comments
 
@@ -428,101 +428,102 @@ def allowed_api_access(controller_name, 
 
        whitelist = aslist(CONFIG.get('api_access_controllers_whitelist'),
 
                           sep=',')
 
        log.debug('whitelist of API access is: %s', whitelist)
 
    api_access_valid = controller_name in whitelist
 
    if api_access_valid:
 
        log.debug('controller:%s is in API whitelist', controller_name)
 
    else:
 
        msg = 'controller: %s is *NOT* in API whitelist' % (controller_name)
 
        if api_key:
 
            #if we use API key and don't have access it's a warning
 
            log.warning(msg)
 
        else:
 
            log.debug(msg)
 
    return api_access_valid
 

	
 

	
 
class AuthUser(object):
 
    """
 
    Represents a Kallithea user, including various authentication and
 
    authorization information. Typically used to store the current user,
 
    but is also used as a generic user information data structure in
 
    parts of the code, e.g. user management.
 

	
 
    Constructed from a database `User` object, a user ID or cookie dict,
 
    it looks up the user (if needed) and copies all attributes to itself,
 
    adding various non-persistent data. If lookup fails but anonymous
 
    access to Kallithea is enabled, the default user is loaded instead.
 

	
 
    `AuthUser` does not by itself authenticate users and the constructor
 
    sets the `is_authenticated` field to False. It's up to other parts
 
    of the code to check e.g. if a supplied password is correct, and if
 
    so, set `is_authenticated` to True.
 

	
 
    However, `AuthUser` does refuse to load a user that is not `active`.
 

	
 
    Note that Kallithea distinguishes between the default user (an actual
 
    user in the database with username "default") and "no user" (no actual
 
    User object, AuthUser filled with blank values and username "None").
 

	
 
    If the default user is active, that will always be used instead of
 
    "no user". On the other hand, if the default user is disabled (and
 
    there is no login information), we instead get "no user"; this should
 
    only happen on the login page (as all other requests are redirected).
 

	
 
    `is_default_user` specifically checks if the AuthUser is the user named
 
    "default". Use `is_anonymous` to check for both "default" and "no user".
 
    """
 

	
 
    def __init__(self, user_id=None, dbuser=None,
 
    def __init__(self, user_id=None, dbuser=None, authenticating_api_key=None,
 
            is_external_auth=False):
 

	
 
        self.is_authenticated = False
 
        self.is_external_auth = is_external_auth
 
        self.authenticating_api_key = authenticating_api_key
 

	
 
        user_model = UserModel()
 
        self._default_user = User.get_default_user(cache=True)
 

	
 
        # These attributes will be overridden by fill_data, below, unless the
 
        # requested user cannot be found and the default anonymous user is
 
        # not enabled.
 
        self.user_id = None
 
        self.username = None
 
        self.api_key = None
 
        self.name = ''
 
        self.lastname = ''
 
        self.email = ''
 
        self.admin = False
 
        self.inherit_default_permissions = False
 

	
 
        # Look up database user, if necessary.
 
        if user_id is not None:
 
            log.debug('Auth User lookup by USER ID %s', user_id)
 
            dbuser = user_model.get(user_id)
 
        else:
 
            # Note: dbuser is allowed to be None.
 
            log.debug('Auth User lookup by database user %s', dbuser)
 

	
 
        is_user_loaded = self._fill_data(dbuser)
 

	
 
        # If user cannot be found, try falling back to anonymous.
 
        if not is_user_loaded:
 
            is_user_loaded =  self._fill_data(self._default_user)
 

	
 
        self.is_default_user = (self.user_id == self._default_user.user_id)
 
        self.is_anonymous = not is_user_loaded or self.is_default_user
 

	
 
        if not self.username:
 
            self.username = 'None'
 

	
 
        log.debug('Auth User is now %s', self)
 

	
 
    def _fill_data(self, dbuser):
 
        """
 
        Copies database fields from a `db.User` to this `AuthUser`. Does
 
        not copy `api_keys` and `permissions` attributes.
 

	
 
        Checks that `dbuser` is `active` (and not None) before copying;
 
        returns True on success.
 
        """
 
        if dbuser is not None and dbuser.active:
 
            log.debug('filling %s data', dbuser)
 
@@ -693,112 +694,108 @@ def set_available_permissions(config):
 
    try:
 
        sa = meta.Session
 
        all_perms = sa.query(Permission).all()
 
        config['available_permissions'] = [x.permission_name for x in all_perms]
 
    finally:
 
        meta.Session.remove()
 

	
 

	
 
#==============================================================================
 
# CHECK DECORATORS
 
#==============================================================================
 

	
 
def _redirect_to_login(message=None):
 
    """Return an exception that must be raised. It will redirect to the login
 
    page which will redirect back to the current URL after authentication.
 
    The optional message will be shown in a flash message."""
 
    from kallithea.lib import helpers as h
 
    if message:
 
        h.flash(h.literal(message), category='warning')
 
    p = request.path_qs
 
    log.debug('Redirecting to login page, origin: %s', p)
 
    return HTTPFound(location=url('login_home', came_from=p))
 

	
 

	
 
class LoginRequired(object):
 
    """Client must be logged in as a valid User (but the "default" user,
 
    if enabled, is considered valid), or we'll redirect to the login page.
 

	
 
    Also checks that IP address is allowed, and if using API key instead
 
    of regular cookie authentication, checks that API key access is allowed
 
    (based on `api_access` parameter and the API view whitelist).
 
    """
 

	
 
    def __init__(self, api_access=False):
 
        self.api_access = api_access
 

	
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 

	
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        controller = fargs[0]
 
        user = controller.authuser
 
        loc = "%s:%s" % (controller.__class__.__name__, func.__name__)
 
        log.debug('Checking access for user %s @ %s', user, loc)
 

	
 
        if not AuthUser.check_ip_allowed(user, controller.ip_addr):
 
            raise _redirect_to_login(_('IP %s not allowed') % controller.ip_addr)
 

	
 
        # check if we used an API key and it's a valid one
 
        api_key = request.GET.get('api_key')
 
        # Check if we used an API key to authenticate.
 
        api_key = user.authenticating_api_key
 
        if api_key is not None:
 
            # explicit controller is enabled or API is in our whitelist
 
            if self.api_access or allowed_api_access(loc, api_key=api_key):
 
                if api_key in user.api_keys:
 
            # Check that controller is enabled for API key usage.
 
            if not self.api_access and not allowed_api_access(loc, api_key=api_key):
 
                # controller does not allow API access
 
                log.warning('API access to %s is not allowed', loc)
 
                raise HTTPForbidden()
 

	
 
                    log.info('user %s authenticated with API key ****%s @ %s',
 
                             user, api_key[-4:], loc)
 
                    return func(*fargs, **fkwargs)
 
                else:
 
                    log.warning('API key ****%s is NOT valid', api_key[-4:])
 
                    raise _redirect_to_login(_('Invalid API key'))
 
            else:
 
                # controller does not allow API access
 
                log.warning('API access to %s is not allowed', loc)
 
                raise HTTPForbidden()
 

	
 
        # CSRF protection: Whenever a request has ambient authority (whether
 
        # through a session cookie or its origin IP address), it must include
 
        # the correct token, unless the HTTP method is GET or HEAD (and thus
 
        # guaranteed to be side effect free. In practice, the only situation
 
        # where we allow side effects without ambient authority is when the
 
        # authority comes from an API key; and that is handled above.
 
        if request.method not in ['GET', 'HEAD']:
 
            token = request.POST.get(secure_form.token_key)
 
            if not token or token != secure_form.authentication_token():
 
                log.error('CSRF check failed')
 
                raise HTTPForbidden()
 

	
 
        # regular user authentication
 
        if user.is_authenticated or user.is_default_user:
 
            log.info('user %s authenticated with regular auth @ %s', user, loc)
 
            return func(*fargs, **fkwargs)
 
        else:
 
            log.warning('user %s NOT authenticated with regular auth @ %s', user, loc)
 
            raise _redirect_to_login()
 

	
 
class NotAnonymous(object):
 
    """Ensures that client is not logged in as the "default" user, and
 
    redirects to the login page otherwise. Must be used together with
 
    LoginRequired."""
 

	
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 

	
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        cls = fargs[0]
 
        self.user = cls.authuser
 

	
 
        log.debug('Checking if user is not anonymous @%s', cls)
 

	
 
        if self.user.is_default_user:
 
            raise _redirect_to_login(_('You need to be a registered user to '
 
                                       'perform this action'))
 
        else:
 
            return func(*fargs, **fkwargs)
 

	
 

	
 
class PermsDecorator(object):
 
    """Base class for controller decorators"""
 

	
 
    def __init__(self, *required_perms):
 
        self.required_perms = set(required_perms)
 
        self.user_perms = None
kallithea/lib/base.py
Show inline comments
 
@@ -327,100 +327,103 @@ class BaseController(WSGIController):
 
        c.visual.use_gravatar = str2bool(rc_config.get('use_gravatar'))
 
        c.visual.gravatar_url = rc_config.get('gravatar_url')
 

	
 
        c.ga_code = rc_config.get('ga_code')
 
        # TODO: replace undocumented backwards compatibility hack with db upgrade and rename ga_code
 
        if c.ga_code and '<' not in c.ga_code:
 
            c.ga_code = '''<script type="text/javascript">
 
                var _gaq = _gaq || [];
 
                _gaq.push(['_setAccount', '%s']);
 
                _gaq.push(['_trackPageview']);
 

	
 
                (function() {
 
                    var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true;
 
                    ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js';
 
                    var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s);
 
                    })();
 
            </script>''' % c.ga_code
 
        c.site_name = rc_config.get('title')
 
        c.clone_uri_tmpl = rc_config.get('clone_uri_tmpl')
 

	
 
        ## INI stored
 
        c.visual.allow_repo_location_change = str2bool(config.get('allow_repo_location_change', True))
 
        c.visual.allow_custom_hooks_settings = str2bool(config.get('allow_custom_hooks_settings', True))
 

	
 
        c.instance_id = config.get('instance_id')
 
        c.issues_url = config.get('bugtracker', url('issues_url'))
 
        # END CONFIG VARS
 

	
 
        c.repo_name = get_repo_slug(request)  # can be empty
 
        c.backends = BACKENDS.keys()
 
        c.unread_notifications = NotificationModel() \
 
                        .get_unread_cnt_for_user(c.authuser.user_id)
 

	
 
        self.cut_off_limit = safe_int(config.get('cut_off_limit'))
 

	
 
        c.my_pr_count = PullRequest.query(reviewer_id=c.authuser.user_id, include_closed=False).count()
 

	
 
        self.sa = meta.Session
 
        self.scm_model = ScmModel(self.sa)
 

	
 
    @staticmethod
 
    def _determine_auth_user(api_key, session_authuser):
 
        """
 
        Create an `AuthUser` object given the API key (if any) and the
 
        value of the authuser session cookie.
 
        """
 

	
 
        # Authenticate by API key
 
        if api_key:
 
            # when using API_KEY we are sure user exists.
 
            return AuthUser(dbuser=User.get_by_api_key(api_key),
 
                            is_external_auth=True)
 
        if api_key is not None:
 
            au = AuthUser(dbuser=User.get_by_api_key(api_key),
 
                authenticating_api_key=api_key, is_external_auth=True)
 
            if au.is_anonymous:
 
                log.warning('API key ****%s is NOT valid', api_key[-4:])
 
                raise webob.exc.HTTPForbidden(_('Invalid API key'))
 
            return au
 

	
 
        # Authenticate by session cookie
 
        # In ancient login sessions, 'authuser' may not be a dict.
 
        # In that case, the user will have to log in again.
 
        # v0.3 and earlier included an 'is_authenticated' key; if present,
 
        # this must be True.
 
        if isinstance(session_authuser, dict) and session_authuser.get('is_authenticated', True):
 
            try:
 
                return AuthUser.from_cookie(session_authuser)
 
            except UserCreationError as e:
 
                # container auth or other auth functions that create users on
 
                # the fly can throw UserCreationError to signal issues with
 
                # user creation. Explanation should be provided in the
 
                # exception object.
 
                from kallithea.lib import helpers as h
 
                h.flash(e, 'error', logf=log.error)
 

	
 
        # Authenticate by auth_container plugin (if enabled)
 
        if any(
 
            auth_modules.importplugin(name).is_container_auth
 
            for name in Setting.get_auth_plugins()
 
        ):
 
            try:
 
                user_info = auth_modules.authenticate('', '', request.environ)
 
            except UserCreationError as e:
 
                from kallithea.lib import helpers as h
 
                h.flash(e, 'error', logf=log.error)
 
            else:
 
                if user_info is not None:
 
                    username = user_info['username']
 
                    user = User.get_by_username(username, case_insensitive=True)
 
                    return log_in_user(user, remember=False,
 
                                       is_external_auth=True)
 

	
 
        # User is anonymous
 
        return AuthUser()
 

	
 
    @staticmethod
 
    def _basic_security_checks():
 
        """Perform basic security/sanity checks before processing the request."""
 

	
 
        # Only allow the following HTTP request methods.
 
        if request.method not in ['GET', 'HEAD', 'POST']:
 
            raise webob.exc.HTTPMethodNotAllowed()
 

	
 
        # Also verify the _method override - no longer allowed.
 
        if request.params.get('_method') is None:
 
            pass # no override, no problem
kallithea/tests/functional/test_login.py
Show inline comments
 
@@ -420,80 +420,80 @@ class TestLoginController(TestController
 
                                  'password_confirm': "p@ssw0rd",
 
                                  'token': token,
 
                                 })
 
        assert response.status == '302 Found'
 
        self.checkSessionFlash(response, 'Successfully updated password')
 

	
 
        response = response.follow()
 

	
 
    #==========================================================================
 
    # API
 
    #==========================================================================
 

	
 
    def _get_api_whitelist(self, values=None):
 
        config = {'api_access_controllers_whitelist': values or []}
 
        return config
 

	
 
    def _api_key_test(self, api_key, status):
 
        """Verifies HTTP status code for accessing an auth-requiring page,
 
        using the given api_key URL parameter. If api_key is None, no api_key
 
        parameter is passed at all. If api_key is True, a real, working API key
 
        is used.
 
        """
 
        with fixture.anon_access(False):
 
            if api_key is None:
 
                params = {}
 
            else:
 
                if api_key is True:
 
                    api_key = User.get_first_admin().api_key
 
                params = {'api_key': api_key}
 

	
 
            self.app.get(url(controller='changeset', action='changeset_raw',
 
                             repo_name=HG_REPO, revision='tip', **params),
 
                         status=status)
 

	
 
    @parametrize('test_name,api_key,code', [
 
        ('none', None, 302),
 
        ('empty_string', '', 403),
 
        ('fake_number', '123456', 403),
 
        ('proper_api_key', True, 403)
 
    ])
 
    def test_access_not_whitelisted_page_via_api_key(self, test_name, api_key, code):
 
        whitelist = self._get_api_whitelist([])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            assert [] == whitelist['api_access_controllers_whitelist']
 
            self._api_key_test(api_key, code)
 

	
 
    @parametrize('test_name,api_key,code', [
 
        ('none', None, 302),
 
        ('empty_string', '', 302),
 
        ('fake_number', '123456', 302),
 
        ('fake_not_alnum', 'a-z', 302),
 
        ('fake_api_key', '0123456789abcdef0123456789ABCDEF01234567', 302),
 
        ('empty_string', '', 403),
 
        ('fake_number', '123456', 403),
 
        ('fake_not_alnum', 'a-z', 403),
 
        ('fake_api_key', '0123456789abcdef0123456789ABCDEF01234567', 403),
 
        ('proper_api_key', True, 200)
 
    ])
 
    def test_access_whitelisted_page_via_api_key(self, test_name, api_key, code):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            assert ['ChangesetController:changeset_raw'] == whitelist['api_access_controllers_whitelist']
 
            self._api_key_test(api_key, code)
 

	
 
    def test_access_page_via_extra_api_key(self):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            assert ['ChangesetController:changeset_raw'] == whitelist['api_access_controllers_whitelist']
 

	
 
            new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
 
            Session().commit()
 
            self._api_key_test(new_api_key.api_key, status=200)
 

	
 
    def test_access_page_via_expired_api_key(self):
 
        whitelist = self._get_api_whitelist(['ChangesetController:changeset_raw'])
 
        with mock.patch('kallithea.CONFIG', whitelist):
 
            assert ['ChangesetController:changeset_raw'] == whitelist['api_access_controllers_whitelist']
 

	
 
            new_api_key = ApiKeyModel().create(TEST_USER_ADMIN_LOGIN, u'test')
 
            Session().commit()
 
            #patch the API key and make it expired
 
            new_api_key.expires = 0
 
            Session().commit()
 
            self._api_key_test(new_api_key.api_key, status=302)
 
            self._api_key_test(new_api_key.api_key, status=403)
0 comments (0 inline, 0 general)