Changeset - dd87009b518b
[Not reviewed]
default
0 5 0
Mads Kiilerich - 10 years ago 2015-07-31 15:44:07
madski@unity3d.com
auth: various minor cleanup of general auth functionality
5 files changed with 23 insertions and 21 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/auth_modules/__init__.py
Show inline comments
 
@@ -34,52 +34,52 @@ log = logging.getLogger(__name__)
 
class LazyFormencode(object):
 
    def __init__(self, formencode_obj, *args, **kwargs):
 
        self.formencode_obj = formencode_obj
 
        self.args = args
 
        self.kwargs = kwargs
 

	
 
    def __call__(self, *args, **kwargs):
 
        from inspect import isfunction
 
        formencode_obj = self.formencode_obj
 
        if isfunction(formencode_obj):
 
            #case we wrap validators into functions
 
            formencode_obj = self.formencode_obj(*args, **kwargs)
 
        return formencode_obj(*self.args, **self.kwargs)
 

	
 

	
 
class KallitheaAuthPluginBase(object):
 
    auth_func_attrs = {
 
        "username": "unique username",
 
        "firstname": "first name",
 
        "lastname": "last name",
 
        "email": "email address",
 
        "groups": '["list", "of", "groups"]',
 
        "extern_name": "name in external source of record",
 
        "extern_type": "type of external source of record",
 
        "admin": 'True|False defines if user should be Kallithea super admin',
 
        "active": 'True|False defines active state of user internally for Kallithea',
 
        "active_from_extern": "True|False\None, active state from the external auth, "
 
                              "None means use definition from Kallithea extern_type active value"
 
        "admin": 'True|False defines if user should be Kallithea admin',
 
        "active": 'True|False defines active state of user in Kallithea',
 
        "active_from_extern": "True|False|None, active state from the external auth, "
 
                              "None means use value from the auth plugin"
 
    }
 

	
 
    @property
 
    def validators(self):
 
        """
 
        Exposes Kallithea validators modules
 
        """
 
        # this is a hack to overcome issues with pylons threadlocals and
 
        # translator object _() not being registered properly.
 
        class LazyCaller(object):
 
            def __init__(self, name):
 
                self.validator_name = name
 

	
 
            def __call__(self, *args, **kwargs):
 
                from kallithea.model import validators as v
 
                obj = getattr(v, self.validator_name)
 
                #log.debug('Initializing lazy formencode object: %s' % obj)
 
                return LazyFormencode(obj, *args, **kwargs)
 

	
 

	
 
        class ProxyGet(object):
 
            def __getattribute__(self, name):
 
                return LazyCaller(name)
 

	
 
@@ -163,124 +163,124 @@ class KallitheaAuthPluginBase(object):
 
            } [, ...]
 
        ]
 

	
 
        This is used to interrogate the authentication plugin as to what
 
        settings it expects to be present and configured.
 

	
 
        'type' is a shorthand notation for what kind of value this option is.
 
        This is primarily used by the auth web form to control how the option
 
        is configured.
 
                bool : checkbox
 
                password : password input box
 
                string : input box
 
                select : single select dropdown
 

	
 
        'validator' is an lazy instantiated form field validator object, ala
 
        formencode. You need to *call* this object to init the validators.
 
        All calls to Kallithea validators should be used through self.validators
 
        which is a lazy loading proxy of formencode module.
 
        """
 
        raise NotImplementedError("Not implemented in base class")
 

	
 
    def plugin_settings(self):
 
        """
 
        This method is called by the authentication framework, not the .settings()
 
        method. This method adds a few default settings (e.g., "active"), so that
 
        method. This method adds a few default settings (e.g., "enabled"), so that
 
        plugin authors don't have to maintain a bunch of boilerplate.
 

	
 
        OVERRIDING THIS METHOD WILL CAUSE YOUR PLUGIN TO FAIL.
 
        """
 

	
 
        rcsettings = self.settings()
 
        rcsettings.insert(0, {
 
            "name": "enabled",
 
            "validator": self.validators.StringBoolean(if_missing=False),
 
            "type": "bool",
 
            "description": "Enable or Disable this Authentication Plugin",
 
            "formname": "Enabled"
 
            }
 
        )
 
        return rcsettings
 

	
 
    def user_activation_state(self):
 
        """
 
        Defines user activation state when creating new users
 

	
 
        :returns: boolean
 
        """
 
        raise NotImplementedError("Not implemented in base class")
 

	
 
    def auth(self, userobj, username, passwd, settings, **kwargs):
 
        """
 
        Given a user object (which may be null), username, a plaintext password,
 
        Given a user object (which may be None), username, a plaintext password,
 
        and a settings object (containing all the keys needed as listed in settings()),
 
        authenticate this user's login attempt.
 

	
 
        Return None on failure. On success, return a dictionary of the form:
 
        Return None on failure. On success, return a dictionary with keys from
 
        KallitheaAuthPluginBase.auth_func_attrs.
 

	
 
            see: KallitheaAuthPluginBase.auth_func_attrs
 
        This is later validated for correctness
 
        This is later validated for correctness.
 
        """
 
        raise NotImplementedError("not implemented in base class")
 

	
 
    def _authenticate(self, userobj, username, passwd, settings, **kwargs):
 
        """
 
        Wrapper to call self.auth() that validates call on it
 

	
 
        :param userobj: userobj
 
        :param username: username
 
        :param passwd: plaintext password
 
        :param settings: plugin settings
 
        """
 
        auth = self.auth(userobj, username, passwd, settings, **kwargs)
 
        if auth:
 
        if auth is not None:
 
            return self._validate_auth_return(auth)
 
        return auth
 
        return None
 

	
 
    def _validate_auth_return(self, ret):
 
        if not isinstance(ret, dict):
 
            raise Exception('returned value from auth must be a dict')
 
        for k in self.auth_func_attrs:
 
            if k not in ret:
 
                raise Exception('Missing %s attribute from returned data' % k)
 
        return ret
 

	
 

	
 
class KallitheaExternalAuthPlugin(KallitheaAuthPluginBase):
 
    def use_fake_password(self):
 
        """
 
        Return a boolean that indicates whether or not we should set the user's
 
        password to a random value when it is authenticated by this plugin.
 
        If your plugin provides authentication, then you will generally want this.
 

	
 
        :returns: boolean
 
        """
 
        raise NotImplementedError("Not implemented in base class")
 

	
 
    def _authenticate(self, userobj, username, passwd, settings, **kwargs):
 
        auth = super(KallitheaExternalAuthPlugin, self)._authenticate(
 
            userobj, username, passwd, settings, **kwargs)
 
        if auth:
 
        if auth is not None:
 
            # maybe plugin will clean the username ?
 
            # we should use the return value
 
            username = auth['username']
 
            # if user is not active from our extern type we should fail to auth
 
            # this can prevent from creating users in Kallithea when using
 
            # external authentication, but if it's inactive user we shouldn't
 
            # create that user anyway
 
            if auth['active_from_extern'] is False:
 
                log.warning("User %s authenticated against %s, but is inactive"
 
                            % (username, self.__module__))
 
                return None
 

	
 
            if self.use_fake_password():
 
                # Randomize the PW because we don't need it, but don't want
 
                # them blank either
 
                passwd = PasswordGenerator().gen_password(length=8)
 

	
 
            log.debug('Updating or creating user info from %s plugin'
 
                      % self.name)
 
            user = UserModel().create_or_update(
 
                username=username,
 
                password=passwd,
 
                email=auth["email"],
 
                firstname=auth["firstname"],
 
@@ -387,33 +387,33 @@ def authenticate(username, password, env
 

	
 
        # use plugin's method of user extraction.
 
        user = plugin.get_user(username, environ=environ,
 
                               settings=plugin_settings)
 
        log.debug('Plugin %s extracted user is `%s`' % (module, user))
 
        if not plugin.accepts(user):
 
            log.debug('Plugin %s does not accept user `%s` for authentication'
 
                      % (module, user))
 
            continue
 
        else:
 
            log.debug('Plugin %s accepted user `%s` for authentication'
 
                      % (module, user))
 

	
 
        log.info('Authenticating user using %s plugin' % plugin.__module__)
 
        # _authenticate is a wrapper for .auth() method of plugin.
 
        # it checks if .auth() sends proper data. For KallitheaExternalAuthPlugin
 
        # it also maps users to Database and maps the attributes returned
 
        # from .auth() to Kallithea database. If this function returns data
 
        # then auth is correct.
 
        plugin_user = plugin._authenticate(user, username, password,
 
                                           plugin_settings,
 
                                           environ=environ or {})
 
        log.debug('PLUGIN USER DATA: %s' % plugin_user)
 

	
 
        if plugin_user:
 
        if plugin_user is not None:
 
            log.debug('Plugin returned proper authentication data')
 
            return plugin_user
 

	
 
        # we failed to Auth because .auth() method didn't return proper the user
 
        # we failed to Auth because .auth() method didn't return the user
 
        if username:
 
            log.warning("User `%s` failed to authenticate against %s"
 
                        % (username, plugin.__module__))
 
    return None
kallithea/lib/base.py
Show inline comments
 
@@ -144,63 +144,63 @@ class BasicAuth(paste.auth.basic.AuthBas
 
    def __init__(self, realm, authfunc, auth_http_code=None):
 
        self.realm = realm
 
        self.authfunc = authfunc
 
        self._rc_auth_http_code = auth_http_code
 

	
 
    def build_authentication(self):
 
        head = paste.httpheaders.WWW_AUTHENTICATE.tuples('Basic realm="%s"' % self.realm)
 
        if self._rc_auth_http_code and self._rc_auth_http_code == '403':
 
            # return 403 if alternative http return code is specified in
 
            # Kallithea config
 
            return paste.httpexceptions.HTTPForbidden(headers=head)
 
        return paste.httpexceptions.HTTPUnauthorized(headers=head)
 

	
 
    def authenticate(self, environ):
 
        authorization = paste.httpheaders.AUTHORIZATION(environ)
 
        if not authorization:
 
            return self.build_authentication()
 
        (authmeth, auth) = authorization.split(' ', 1)
 
        if 'basic' != authmeth.lower():
 
            return self.build_authentication()
 
        auth = auth.strip().decode('base64')
 
        _parts = auth.split(':', 1)
 
        if len(_parts) == 2:
 
            username, password = _parts
 
            if self.authfunc(username, password, environ):
 
            if self.authfunc(username, password, environ) is not None:
 
                return username
 
        return self.build_authentication()
 

	
 
    __call__ = authenticate
 

	
 

	
 
class BaseVCSController(object):
 

	
 
    def __init__(self, application, config):
 
        self.application = application
 
        self.config = config
 
        # base path of repo locations
 
        self.basepath = self.config['base_path']
 
        #authenticate this VCS request using authfunc
 
        # authenticate this VCS request using the authentication modules
 
        self.authenticate = BasicAuth('', auth_modules.authenticate,
 
                                      config.get('auth_ret_code'))
 
        self.ip_addr = '0.0.0.0'
 

	
 
    def _handle_request(self, environ, start_response):
 
        raise NotImplementedError()
 

	
 
    def _get_by_id(self, repo_name):
 
        """
 
        Gets a special pattern _<ID> from clone url and tries to replace it
 
        with a repository_name for support of _<ID> permanent URLs
 

	
 
        :param repo_name:
 
        """
 

	
 
        data = repo_name.split('/')
 
        if len(data) >= 2:
 
            from kallithea.lib.utils import get_repo_by_id
 
            by_id_match = get_repo_by_id(repo_name)
 
            if by_id_match:
 
                data[1] = by_id_match
 

	
 
        return '/'.join(data)
 

	
 
@@ -392,49 +392,49 @@ class BaseController(WSGIController):
 
        # In ancient login sessions, 'authuser' may not be a dict.
 
        # In that case, the user will have to log in again.
 
        if isinstance(session_authuser, dict):
 
            try:
 
                return AuthUser.from_cookie(session_authuser)
 
            except UserCreationError as e:
 
                # container auth or other auth functions that create users on
 
                # the fly can throw UserCreationError to signal issues with
 
                # user creation. Explanation should be provided in the
 
                # exception object.
 
                from kallithea.lib import helpers as h
 
                h.flash(e, 'error', logf=log.error)
 

	
 
        # Authenticate by auth_container plugin (if enabled)
 
        if any(
 
            auth_modules.importplugin(name).is_container_auth
 
            for name in Setting.get_auth_plugins()
 
        ):
 
            try:
 
                auth_info = auth_modules.authenticate('', '', request.environ)
 
            except UserCreationError as e:
 
                from kallithea.lib import helpers as h
 
                h.flash(e, 'error', logf=log.error)
 
            else:
 
                if auth_info:
 
                if auth_info is not None:
 
                    username = auth_info['username']
 
                    user = User.get_by_username(username, case_insensitive=True)
 
                    return log_in_user(user, remember=False,
 
                                       is_external_auth=True)
 

	
 
        # User is anonymous
 
        return AuthUser()
 

	
 
    def __call__(self, environ, start_response):
 
        """Invoke the Controller"""
 

	
 
        # WSGIController.__call__ dispatches to the Controller method
 
        # the request is routed to. This routing information is
 
        # available in environ['pylons.routes_dict']
 
        try:
 
            self.ip_addr = _get_ip_addr(environ)
 
            # make sure that we update permissions each time we call controller
 

	
 
            #set globals for auth user
 
            self.authuser = c.authuser = request.user = self._determine_auth_user(
 
                request.GET.get('api_key'),
 
                session.get('authuser'),
 
            )
 

	
kallithea/lib/middleware/simplegit.py
Show inline comments
 
@@ -103,49 +103,49 @@ class SimpleGit(BaseVCSController):
 
                # ONLY check permissions if the user is activated
 
                anonymous_perm = self._check_permission(action, anonymous_user,
 
                                                        repo_name, ip_addr)
 
            else:
 
                anonymous_perm = False
 

	
 
            if not anonymous_user.active or not anonymous_perm:
 
                if not anonymous_user.active:
 
                    log.debug('Anonymous access is disabled, running '
 
                              'authentication')
 

	
 
                if not anonymous_perm:
 
                    log.debug('Not enough credentials to access this '
 
                              'repository as anonymous user')
 

	
 
                username = None
 
                #==============================================================
 
                # DEFAULT PERM FAILED OR ANONYMOUS ACCESS IS DISABLED SO WE
 
                # NEED TO AUTHENTICATE AND ASK FOR AUTH USER PERMISSIONS
 
                #==============================================================
 

	
 
                # try to auth based on environ, container auth methods
 
                log.debug('Running PRE-AUTH for container based authentication')
 
                pre_auth = auth_modules.authenticate('', '', environ)
 
                if pre_auth and pre_auth.get('username'):
 
                if pre_auth is not None and pre_auth.get('username'):
 
                    username = pre_auth['username']
 
                log.debug('PRE-AUTH got %s as username' % username)
 

	
 
                # If not authenticated by the container, running basic auth
 
                if not username:
 
                    self.authenticate.realm = \
 
                        safe_str(self.config['realm'])
 
                    result = self.authenticate(environ)
 
                    if isinstance(result, str):
 
                        AUTH_TYPE.update(environ, 'basic')
 
                        REMOTE_USER.update(environ, result)
 
                        username = result
 
                    else:
 
                        return result.wsgi_application(environ, start_response)
 

	
 
                #==============================================================
 
                # CHECK PERMISSIONS FOR THIS REQUEST USING GIVEN USERNAME
 
                #==============================================================
 
                try:
 
                    user = self.__get_user(username)
 
                    if user is None or not user.active:
 
                        return HTTPForbidden()(environ, start_response)
 
                    username = user.username
 
                except Exception:
kallithea/lib/middleware/simplehg.py
Show inline comments
 
@@ -107,49 +107,49 @@ class SimpleHg(BaseVCSController):
 
                # ONLY check permissions if the user is activated
 
                anonymous_perm = self._check_permission(action, anonymous_user,
 
                                                        repo_name, ip_addr)
 
            else:
 
                anonymous_perm = False
 

	
 
            if not anonymous_user.active or not anonymous_perm:
 
                if not anonymous_user.active:
 
                    log.debug('Anonymous access is disabled, running '
 
                              'authentication')
 

	
 
                if not anonymous_perm:
 
                    log.debug('Not enough credentials to access this '
 
                              'repository as anonymous user')
 

	
 
                username = None
 
                #==============================================================
 
                # DEFAULT PERM FAILED OR ANONYMOUS ACCESS IS DISABLED SO WE
 
                # NEED TO AUTHENTICATE AND ASK FOR AUTH USER PERMISSIONS
 
                #==============================================================
 

	
 
                # try to auth based on environ, container auth methods
 
                log.debug('Running PRE-AUTH for container based authentication')
 
                pre_auth = auth_modules.authenticate('', '', environ)
 
                if pre_auth and pre_auth.get('username'):
 
                if pre_auth is not None and pre_auth.get('username'):
 
                    username = pre_auth['username']
 
                log.debug('PRE-AUTH got %s as username' % username)
 

	
 
                # If not authenticated by the container, running basic auth
 
                if not username:
 
                    self.authenticate.realm = \
 
                        safe_str(self.config['realm'])
 
                    result = self.authenticate(environ)
 
                    if isinstance(result, str):
 
                        AUTH_TYPE.update(environ, 'basic')
 
                        REMOTE_USER.update(environ, result)
 
                        username = result
 
                    else:
 
                        return result.wsgi_application(environ, start_response)
 

	
 
                #==============================================================
 
                # CHECK PERMISSIONS FOR THIS REQUEST USING GIVEN USERNAME
 
                #==============================================================
 
                try:
 
                    user = self.__get_user(username)
 
                    if user is None or not user.active:
 
                        return HTTPForbidden()(environ, start_response)
 
                    username = user.username
 
                except Exception:
kallithea/model/validators.py
Show inline comments
 
@@ -251,86 +251,88 @@ def ValidRepoGroup(edit=False, old_data=
 
def ValidPassword():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_password':
 
                _('Invalid characters (non-ascii) in password')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            try:
 
                (value or '').decode('ascii')
 
            except UnicodeError:
 
                msg = M(self, 'invalid_password', state)
 
                raise formencode.Invalid(msg, value, state,)
 
    return _validator
 

	
 

	
 
def ValidOldPassword(username):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_password': _('Invalid old password')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            from kallithea.lib import auth_modules
 
            if not auth_modules.authenticate(username, value, ''):
 
            if auth_modules.authenticate(username, value, '') is None:
 
                msg = M(self, 'invalid_password', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(current_password=msg)
 
                )
 
    return _validator
 

	
 

	
 
def ValidPasswordsMatch(password_field, password_confirmation_field):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'password_mismatch': _('Passwords do not match'),
 
        }
 

	
 
        def validate_python(self, value, state):
 
            if value.get(password_field) != value[password_confirmation_field]:
 
                msg = M(self, 'password_mismatch', state)
 
                raise formencode.Invalid(msg, value, state,
 
                     error_dict={password_field:msg, password_confirmation_field: msg}
 
                )
 
    return _validator
 

	
 

	
 
def ValidAuth():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_password': _('Invalid password'),
 
            'invalid_username': _('Invalid username'),
 
            'disabled_account': _('Account has been disabled')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            from kallithea.lib import auth_modules
 

	
 
            password = value['password']
 
            username = value['username']
 

	
 
            if not auth_modules.authenticate(username, password):
 
            # authenticate returns unused dict but has called
 
            # plugin._authenticate which has create_or_update'ed the username user in db
 
            if auth_modules.authenticate(username, password) is None:
 
                user = User.get_by_username(username)
 
                if user and not user.active:
 
                    log.warning('user %s is disabled' % username)
 
                    msg = M(self, 'disabled_account', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(username=msg)
 
                    )
 
                else:
 
                    log.warning('user %s failed to authenticate' % username)
 
                    msg = M(self, 'invalid_username', state)
 
                    msg2 = M(self, 'invalid_password', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(username=msg, password=msg2)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidAuthToken():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_token': _('Token mismatch')
 
        }
 

	
 
        def validate_python(self, value, state):
0 comments (0 inline, 0 general)