Changeset - b5551ad26fa3
[Not reviewed]
default
0 3 0
Søren Løvborg - 9 years ago 2017-01-04 23:01:48
sorenl@unity3d.com
vcs: dedup auth code between Hg and Git middleware

There's a lot of verbatim duplicated code in the Hg and Git VCS
middleware. This attempts to deduplicate a bit of it.

The _authorize function is a bit awkward, but for now the goal is simply
to remove duplicated code, not improving program structure and design.

As such, the code in _authorize is almost a verbatim copy of the code
removed in the two controllers.
3 files changed with 81 insertions and 118 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/base.py
Show inline comments
 
@@ -134,106 +134,181 @@ def log_in_user(user, remember, is_exter
 
        t = datetime.datetime.now() + datetime.timedelta(days=365)
 
        session._set_cookie_expires(t)
 

	
 
    session.save()
 

	
 
    log.info('user %s is now authenticated and stored in '
 
             'session, session attrs %s', user.username, cookie)
 

	
 
    # dumps session attrs back to cookie
 
    session._update_cookie_out()
 

	
 
    return auth_user
 

	
 

	
 
class BasicAuth(paste.auth.basic.AuthBasicAuthenticator):
 

	
 
    def __init__(self, realm, authfunc, auth_http_code=None):
 
        self.realm = realm
 
        self.authfunc = authfunc
 
        self._rc_auth_http_code = auth_http_code
 

	
 
    def build_authentication(self):
 
        head = paste.httpheaders.WWW_AUTHENTICATE.tuples('Basic realm="%s"' % self.realm)
 
        if self._rc_auth_http_code and self._rc_auth_http_code == '403':
 
            # return 403 if alternative http return code is specified in
 
            # Kallithea config
 
            return paste.httpexceptions.HTTPForbidden(headers=head)
 
        return paste.httpexceptions.HTTPUnauthorized(headers=head)
 

	
 
    def authenticate(self, environ):
 
        authorization = paste.httpheaders.AUTHORIZATION(environ)
 
        if not authorization:
 
            return self.build_authentication()
 
        (authmeth, auth) = authorization.split(' ', 1)
 
        if 'basic' != authmeth.lower():
 
            return self.build_authentication()
 
        auth = auth.strip().decode('base64')
 
        _parts = auth.split(':', 1)
 
        if len(_parts) == 2:
 
            username, password = _parts
 
            if self.authfunc(username, password, environ) is not None:
 
                return username
 
        return self.build_authentication()
 

	
 
    __call__ = authenticate
 

	
 

	
 
class BaseVCSController(object):
 
    """Base controller for handling Mercurial/Git protocol requests
 
    (coming from a VCS client, and not a browser).
 
    """
 

	
 
    def __init__(self, application, config):
 
        self.application = application
 
        self.config = config
 
        # base path of repo locations
 
        self.basepath = self.config['base_path']
 
        # authenticate this VCS request using the authentication modules
 
        self.authenticate = BasicAuth('', auth_modules.authenticate,
 
                                      config.get('auth_ret_code'))
 

	
 
    def _authorize(self, environ, start_response, action, repo_name, ip_addr):
 
        """Authenticate and authorize user.
 

	
 
        Since we're dealing with a VCS client and not a browser, we only
 
        support HTTP basic authentication, either directly via raw header
 
        inspection, or by using container authentication to delegate the
 
        authentication to the web server.
 

	
 
        Returns (user, None) on successful authentication and authorization.
 
        Returns (None, wsgi_app) to send the wsgi_app response to the client.
 
        """
 
        anonymous_user = User.get_default_user(cache=True)
 
        user = anonymous_user
 
        if anonymous_user.active:
 
            # ONLY check permissions if the user is activated
 
            anonymous_perm = self._check_permission(action, anonymous_user,
 
                                                    repo_name, ip_addr)
 
        else:
 
            anonymous_perm = False
 

	
 
        if not anonymous_user.active or not anonymous_perm:
 
            if not anonymous_user.active:
 
                log.debug('Anonymous access is disabled, running '
 
                          'authentication')
 

	
 
            if not anonymous_perm:
 
                log.debug('Not enough credentials to access this '
 
                          'repository as anonymous user')
 

	
 
            username = None
 
            #==============================================================
 
            # DEFAULT PERM FAILED OR ANONYMOUS ACCESS IS DISABLED SO WE
 
            # NEED TO AUTHENTICATE AND ASK FOR AUTH USER PERMISSIONS
 
            #==============================================================
 

	
 
            # try to auth based on environ, container auth methods
 
            log.debug('Running PRE-AUTH for container based authentication')
 
            pre_auth = auth_modules.authenticate('', '', environ)
 
            if pre_auth is not None and pre_auth.get('username'):
 
                username = pre_auth['username']
 
            log.debug('PRE-AUTH got %s as username', username)
 

	
 
            # If not authenticated by the container, running basic auth
 
            if not username:
 
                self.authenticate.realm = \
 
                    safe_str(self.config['realm'])
 
                result = self.authenticate(environ)
 
                if isinstance(result, str):
 
                    paste.httpheaders.AUTH_TYPE.update(environ, 'basic')
 
                    paste.httpheaders.REMOTE_USER.update(environ, result)
 
                    username = result
 
                else:
 
                    return None, result.wsgi_application
 

	
 
            #==============================================================
 
            # CHECK PERMISSIONS FOR THIS REQUEST USING GIVEN USERNAME
 
            #==============================================================
 
            try:
 
                user = User.get_by_username_or_email(username)
 
                if user is None or not user.active:
 
                    return None, webob.exc.HTTPForbidden()
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                return None, webob.exc.HTTPInternalServerError()
 

	
 
            #check permissions for this repository
 
            perm = self._check_permission(action, user, repo_name, ip_addr)
 
            if not perm:
 
                return None, webob.exc.HTTPForbidden()
 

	
 
        return user, None
 

	
 
    def _handle_request(self, environ, start_response):
 
        raise NotImplementedError()
 

	
 
    def _get_by_id(self, repo_name):
 
        """
 
        Gets a special pattern _<ID> from clone url and tries to replace it
 
        with a repository_name for support of _<ID> permanent URLs
 

	
 
        :param repo_name:
 
        """
 

	
 
        data = repo_name.split('/')
 
        if len(data) >= 2:
 
            from kallithea.lib.utils import get_repo_by_id
 
            by_id_match = get_repo_by_id(repo_name)
 
            if by_id_match:
 
                data[1] = safe_str(by_id_match)
 

	
 
        return '/'.join(data)
 

	
 
    def _invalidate_cache(self, repo_name):
 
        """
 
        Sets cache for this repository for invalidation on next access
 

	
 
        :param repo_name: full repo name, also a cache key
 
        """
 
        ScmModel().mark_for_invalidation(repo_name)
 

	
 
    def _check_permission(self, action, user, repo_name, ip_addr=None):
 
        """
 
        Checks permissions using action (push/pull) user and repository
 
        name
 

	
 
        :param action: push or pull action
 
        :param user: `User` instance
 
        :param repo_name: repository name
 
        """
 
        # check IP
 
        ip_allowed = AuthUser.check_ip_allowed(user, ip_addr)
 
        if ip_allowed:
 
            log.info('Access for IP:%s allowed', ip_addr)
 
        else:
 
            return False
 

	
 
        if action == 'push':
 
            if not HasPermissionAnyMiddleware('repository.write',
 
                                              'repository.admin')(user,
 
                                                                  repo_name):
kallithea/lib/middleware/simplegit.py
Show inline comments
 
@@ -50,154 +50,99 @@ log = logging.getLogger(__name__)
 

	
 

	
 
GIT_PROTO_PAT = re.compile(r'^/(.+)/(info/refs|git-upload-pack|git-receive-pack)')
 

	
 

	
 
def is_git(environ):
 
    path_info = environ['PATH_INFO']
 
    isgit_path = GIT_PROTO_PAT.match(path_info)
 
    log.debug('pathinfo: %s detected as Git %s',
 
        path_info, isgit_path is not None
 
    )
 
    return isgit_path
 

	
 

	
 
class SimpleGit(BaseVCSController):
 

	
 
    def _handle_request(self, environ, start_response):
 
        if not is_git(environ):
 
            return self.application(environ, start_response)
 

	
 
        ip_addr = self._get_ip_addr(environ)
 
        self._git_first_op = False
 
        # skip passing error to error controller
 
        environ['pylons.status_code_redirect'] = True
 

	
 
        #======================================================================
 
        # EXTRACT REPOSITORY NAME FROM ENV
 
        #======================================================================
 
        try:
 
            str_repo_name = self.__get_repository(environ)
 
            repo_name = safe_unicode(str_repo_name)
 
            log.debug('Extracted repo name is %s', repo_name)
 
        except Exception as e:
 
            log.error('error extracting repo_name: %r', e)
 
            return HTTPInternalServerError()(environ, start_response)
 

	
 
        # quick check if that dir exists...
 
        if not is_valid_repo(repo_name, self.basepath, 'git'):
 
            return HTTPNotFound()(environ, start_response)
 

	
 
        #======================================================================
 
        # GET ACTION PULL or PUSH
 
        #======================================================================
 
        action = self.__get_action(environ)
 

	
 
        #======================================================================
 
        # CHECK PERMISSIONS
 
        #======================================================================
 
        anonymous_user = User.get_default_user(cache=True)
 
        user = anonymous_user
 
        if anonymous_user.active:
 
            # ONLY check permissions if the user is activated
 
            anonymous_perm = self._check_permission(action, anonymous_user,
 
                                                    repo_name, ip_addr)
 
        else:
 
            anonymous_perm = False
 

	
 
        if not anonymous_user.active or not anonymous_perm:
 
            if not anonymous_user.active:
 
                log.debug('Anonymous access is disabled, running '
 
                          'authentication')
 

	
 
            if not anonymous_perm:
 
                log.debug('Not enough credentials to access this '
 
                          'repository as anonymous user')
 

	
 
            username = None
 
            #==============================================================
 
            # DEFAULT PERM FAILED OR ANONYMOUS ACCESS IS DISABLED SO WE
 
            # NEED TO AUTHENTICATE AND ASK FOR AUTH USER PERMISSIONS
 
            #==============================================================
 

	
 
            # try to auth based on environ, container auth methods
 
            log.debug('Running PRE-AUTH for container based authentication')
 
            pre_auth = auth_modules.authenticate('', '', environ)
 
            if pre_auth is not None and pre_auth.get('username'):
 
                username = pre_auth['username']
 
            log.debug('PRE-AUTH got %s as username', username)
 

	
 
            # If not authenticated by the container, running basic auth
 
            if not username:
 
                self.authenticate.realm = \
 
                    safe_str(self.config['realm'])
 
                result = self.authenticate(environ)
 
                if isinstance(result, str):
 
                    AUTH_TYPE.update(environ, 'basic')
 
                    REMOTE_USER.update(environ, result)
 
                    username = result
 
                else:
 
                    return result.wsgi_application(environ, start_response)
 

	
 
            #==============================================================
 
            # CHECK PERMISSIONS FOR THIS REQUEST USING GIVEN USERNAME
 
            #==============================================================
 
            try:
 
                user = User.get_by_username_or_email(username)
 
                if user is None or not user.active:
 
                    return HTTPForbidden()(environ, start_response)
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                return HTTPInternalServerError()(environ, start_response)
 

	
 
            #check permissions for this repository
 
            perm = self._check_permission(action, user, repo_name, ip_addr)
 
            if not perm:
 
                return HTTPForbidden()(environ, start_response)
 
        user, response_app = self._authorize(environ, start_response, action, repo_name, ip_addr)
 
        if response_app is not None:
 
            return response_app(environ, start_response)
 

	
 
        # extras are injected into UI object and later available
 
        # in hooks executed by kallithea
 
        from kallithea import CONFIG
 
        server_url = get_server_url(environ)
 
        extras = {
 
            'ip': ip_addr,
 
            'username': user.username,
 
            'action': action,
 
            'repository': repo_name,
 
            'scm': 'git',
 
            'config': CONFIG['__file__'],
 
            'server_url': server_url,
 
            'make_lock': None,
 
            'locked_by': [None, None]
 
        }
 

	
 
        #===================================================================
 
        # GIT REQUEST HANDLING
 
        #===================================================================
 
        repo_path = os.path.join(safe_str(self.basepath),str_repo_name)
 
        log.debug('Repository path is %s', repo_path)
 

	
 
        # CHECK LOCKING only if it's not ANONYMOUS USER
 
        if user.username != User.DEFAULT_USER:
 
            log.debug('Checking locking on repository')
 
            (make_lock,
 
             locked,
 
             locked_by) = self._check_locking_state(
 
                            environ=environ, action=action,
 
                            repo=repo_name, user_id=user.user_id
 
                       )
 
            # store the make_lock for later evaluation in hooks
 
            extras.update({'make_lock': make_lock,
 
                           'locked_by': locked_by})
 

	
 
        fix_PATH()
 
        log.debug('HOOKS extras is %s', extras)
 
        baseui = make_ui('db')
 
        self.__inject_extras(repo_path, baseui, extras)
 

	
 
        try:
 
            self._handle_githooks(repo_name, action, baseui, environ)
 
            log.info('%s action on Git repo "%s" by "%s" from %s',
 
                     action, str_repo_name, safe_str(user.username), ip_addr)
 
            app = self.__make_app(repo_name, repo_path, extras)
 
            result = app(environ, start_response)
 
            if action == 'push':
kallithea/lib/middleware/simplehg.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.middleware.simplehg
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
SimpleHg middleware for handling mercurial protocol request
 
(push/clone etc.). It's implemented with basic auth function
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 28, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 

	
 
import os
 
import logging
 
import traceback
 

	
 
from paste.httpheaders import REMOTE_USER, AUTH_TYPE
 
from webob.exc import HTTPNotFound, HTTPForbidden, HTTPInternalServerError, \
 
    HTTPNotAcceptable, HTTPBadRequest
 
from kallithea.model.db import User
 

	
 
from kallithea.lib.utils2 import safe_str, safe_unicode, fix_PATH, get_server_url, \
 
    _set_extras
 
from kallithea.lib.base import BaseVCSController, WSGIResultCloseCallback
 
from kallithea.lib.utils import make_ui, is_valid_repo, ui_sections
 
from kallithea.lib.vcs.utils.hgcompat import RepoError, hgweb_mod
 
from kallithea.lib.exceptions import HTTPLockedRC
 
from kallithea.lib import auth_modules
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def is_mercurial(environ):
 
    """
 
    Returns True if request's target is mercurial server - header
 
    ``HTTP_ACCEPT`` of such request would start with ``application/mercurial``.
 
    """
 
    http_accept = environ.get('HTTP_ACCEPT')
 
    path_info = environ['PATH_INFO']
 
    if http_accept and http_accept.startswith('application/mercurial'):
 
        ishg_path = True
 
    else:
 
        ishg_path = False
 

	
 
    log.debug('pathinfo: %s detected as Mercurial %s',
 
        path_info, ishg_path
 
    )
 
    return ishg_path
 

	
 
class SimpleHg(BaseVCSController):
 

	
 
    def _handle_request(self, environ, start_response):
 
        if not is_mercurial(environ):
 
            return self.application(environ, start_response)
 

	
 
        ip_addr = self._get_ip_addr(environ)
 
        # skip passing error to error controller
 
        environ['pylons.status_code_redirect'] = True
 

	
 
        #======================================================================
 
        # EXTRACT REPOSITORY NAME FROM ENV
 
        #======================================================================
 
        try:
 
            str_repo_name = environ['REPO_NAME'] = self.__get_repository(environ)
 
            assert isinstance(str_repo_name, str), str_repo_name
 
            repo_name = safe_unicode(str_repo_name)
 
            assert safe_str(repo_name) == str_repo_name, (str_repo_name, repo_name)
 
            log.debug('Extracted repo name is %s', repo_name)
 
        except Exception as e:
 
            log.error('error extracting repo_name: %r', e)
 
            return HTTPInternalServerError()(environ, start_response)
 

	
 
        # quick check if that dir exists...
 
        if not is_valid_repo(repo_name, self.basepath, 'hg'):
 
            return HTTPNotFound()(environ, start_response)
 

	
 
        #======================================================================
 
        # GET ACTION PULL or PUSH
 
        #======================================================================
 
        try:
 
            action = self.__get_action(environ)
 
        except HTTPBadRequest as e:
 
            return e(environ, start_response)
 

	
 
        #======================================================================
 
        # CHECK PERMISSIONS
 
        #======================================================================
 
        anonymous_user = User.get_default_user(cache=True)
 
        user = anonymous_user
 
        if anonymous_user.active:
 
            # ONLY check permissions if the user is activated
 
            anonymous_perm = self._check_permission(action, anonymous_user,
 
                                                    repo_name, ip_addr)
 
        else:
 
            anonymous_perm = False
 

	
 
        if not anonymous_user.active or not anonymous_perm:
 
            if not anonymous_user.active:
 
                log.debug('Anonymous access is disabled, running '
 
                          'authentication')
 

	
 
            if not anonymous_perm:
 
                log.debug('Not enough credentials to access this '
 
                          'repository as anonymous user')
 

	
 
            username = None
 
            #==============================================================
 
            # DEFAULT PERM FAILED OR ANONYMOUS ACCESS IS DISABLED SO WE
 
            # NEED TO AUTHENTICATE AND ASK FOR AUTH USER PERMISSIONS
 
            #==============================================================
 

	
 
            # try to auth based on environ, container auth methods
 
            log.debug('Running PRE-AUTH for container based authentication')
 
            pre_auth = auth_modules.authenticate('', '', environ)
 
            if pre_auth is not None and pre_auth.get('username'):
 
                username = pre_auth['username']
 
            log.debug('PRE-AUTH got %s as username', username)
 

	
 
            # If not authenticated by the container, running basic auth
 
            if not username:
 
                self.authenticate.realm = \
 
                    safe_str(self.config['realm'])
 
                result = self.authenticate(environ)
 
                if isinstance(result, str):
 
                    AUTH_TYPE.update(environ, 'basic')
 
                    REMOTE_USER.update(environ, result)
 
                    username = result
 
                else:
 
                    return result.wsgi_application(environ, start_response)
 

	
 
            #==============================================================
 
            # CHECK PERMISSIONS FOR THIS REQUEST USING GIVEN USERNAME
 
            #==============================================================
 
            try:
 
                user = User.get_by_username_or_email(username)
 
                if user is None or not user.active:
 
                    return HTTPForbidden()(environ, start_response)
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                return HTTPInternalServerError()(environ, start_response)
 

	
 
            #check permissions for this repository
 
            perm = self._check_permission(action, user, repo_name, ip_addr)
 
            if not perm:
 
                return HTTPForbidden()(environ, start_response)
 
        user, response_app = self._authorize(environ, start_response, action, repo_name, ip_addr)
 
        if response_app is not None:
 
            return response_app(environ, start_response)
 

	
 
        # extras are injected into mercurial UI object and later available
 
        # in hg hooks executed by kallithea
 
        from kallithea import CONFIG
 
        server_url = get_server_url(environ)
 
        extras = {
 
            'ip': ip_addr,
 
            'username': user.username,
 
            'action': action,
 
            'repository': repo_name,
 
            'scm': 'hg',
 
            'config': CONFIG['__file__'],
 
            'server_url': server_url,
 
            'make_lock': None,
 
            'locked_by': [None, None]
 
        }
 
        #======================================================================
 
        # MERCURIAL REQUEST HANDLING
 
        #======================================================================
 
        repo_path = os.path.join(safe_str(self.basepath), str_repo_name)
 
        log.debug('Repository path is %s', repo_path)
 

	
 
        # CHECK LOCKING only if it's not ANONYMOUS USER
 
        if user.username != User.DEFAULT_USER:
 
            log.debug('Checking locking on repository')
 
            (make_lock,
 
             locked,
 
             locked_by) = self._check_locking_state(
 
                            environ=environ, action=action,
 
                            repo=repo_name, user_id=user.user_id
 
                       )
 
            # store the make_lock for later evaluation in hooks
 
            extras.update({'make_lock': make_lock,
 
                           'locked_by': locked_by})
 

	
 
        fix_PATH()
 
        log.debug('HOOKS extras is %s', extras)
 
        baseui = make_ui('db')
 
        self.__inject_extras(repo_path, baseui, extras)
 

	
 
        try:
 
            log.info('%s action on Mercurial repo "%s" by "%s" from %s',
 
                     action, str_repo_name, safe_str(user.username), ip_addr)
 
            app = self.__make_app(repo_path, baseui, extras)
 
            result = app(environ, start_response)
 
            if action == 'push':
 
                result = WSGIResultCloseCallback(result,
 
                    lambda: self._invalidate_cache(repo_name))
0 comments (0 inline, 0 general)