Changeset - c9d859a89a88
[Not reviewed]
default
0 6 0
Mads Kiilerich - 7 years ago 2018-12-26 01:54:23
mads@kiilerich.com
auth: move 'active' handling out of the individual auth modules

The 'active' flag in the Kallithea user database is very fundamental and should
not be specific to auth modules. Modules should only care about whether the
user is active in the external authentication system.

user_activation_state is thus removed, and 'hg.extern_activate.auto' is now
consistently checked for all kinds of external authentication.
6 files changed with 27 insertions and 58 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/auth_modules/__init__.py
Show inline comments
 
@@ -45,25 +45,24 @@ class LazyFormencode(object):
 
        return formencode_obj(*self.args, **self.kwargs)
 

	
 

	
 
class KallitheaAuthPluginBase(object):
 
    auth_func_attrs = {
 
        "username": "unique username",
 
        "firstname": "first name",
 
        "lastname": "last name",
 
        "email": "email address",
 
        "groups": '["list", "of", "groups"]',
 
        "extern_name": "name in external source of record",
 
        "admin": 'True|False defines if user should be Kallithea admin',
 
        "active": 'True|False defines active state of user in Kallithea',
 
    }
 

	
 
    @property
 
    def validators(self):
 
        """
 
        Exposes Kallithea validators modules
 
        """
 
        # this is a hack to overcome issues with pylons threadlocals and
 
        # translator object _() not being registered properly.
 
        class LazyCaller(object):
 
            def __init__(self, name):
 
                self.validator_name = name
 
@@ -187,53 +186,40 @@ class KallitheaAuthPluginBase(object):
 

	
 
        rcsettings = self.settings()
 
        rcsettings.insert(0, {
 
            "name": "enabled",
 
            "validator": self.validators.StringBoolean(if_missing=False),
 
            "type": "bool",
 
            "description": "Enable or Disable this Authentication Plugin",
 
            "formname": "Enabled"
 
            }
 
        )
 
        return rcsettings
 

	
 
    def user_activation_state(self):
 
        """
 
        Defines user activation state when creating new users
 

	
 
        :returns: boolean
 
        """
 
        raise NotImplementedError("Not implemented in base class")
 

	
 
    def auth(self, userobj, username, passwd, settings, **kwargs):
 
        """
 
        Given a user object (which may be None), username, a plaintext password,
 
        and a settings object (containing all the keys needed as listed in settings()),
 
        authenticate this user's login attempt.
 

	
 
        Return None on failure. On success, return a dictionary with keys from
 
        KallitheaAuthPluginBase.auth_func_attrs.
 

	
 
        This is later validated for correctness.
 
        """
 
        raise NotImplementedError("not implemented in base class")
 

	
 
    def _authenticate(self, userobj, username, passwd, settings, **kwargs):
 
        """
 
        Wrapper to call self.auth() that validates call on it
 

	
 
        :param userobj: userobj
 
        :param username: username
 
        :param passwd: plaintext password
 
        :param settings: plugin settings
 
        """
 
        user_data = self.auth(userobj, username, passwd, settings, **kwargs)
 
        if user_data is not None:
 
            return self._validate_auth_return(user_data)
 
        return None
 

	
 
    def _validate_auth_return(self, user_data):
 
        if not isinstance(user_data, dict):
 
            raise Exception('returned value from auth must be a dict')
 
        for k in self.auth_func_attrs:
 
            if k not in user_data:
 
                raise Exception('Missing %s attribute from returned data' % k)
 
@@ -246,41 +232,47 @@ class KallitheaExternalAuthPlugin(Kallit
 
        Return a boolean that indicates whether or not we should set the user's
 
        password to a random value when it is authenticated by this plugin.
 
        If your plugin provides authentication, then you will generally want this.
 

	
 
        :returns: boolean
 
        """
 
        raise NotImplementedError("Not implemented in base class")
 

	
 
    def _authenticate(self, userobj, username, passwd, settings, **kwargs):
 
        user_data = super(KallitheaExternalAuthPlugin, self)._authenticate(
 
            userobj, username, passwd, settings, **kwargs)
 
        if user_data is not None:
 
            if userobj is None: # external authentication of unknown user that will be created soon
 
                def_user_perms = User.get_default_user().AuthUser.permissions['global']
 
                active = 'hg.extern_activate.auto' in def_user_perms
 
            else:
 
                active = userobj.active
 

	
 
            if self.use_fake_password():
 
                # Randomize the PW because we don't need it, but don't want
 
                # them blank either
 
                passwd = PasswordGenerator().gen_password(length=8)
 

	
 
            log.debug('Updating or creating user info from %s plugin',
 
                      self.name)
 
            user = UserModel().create_or_update(
 
                username=user_data['username'],
 
                password=passwd,
 
                email=user_data["email"],
 
                firstname=user_data["firstname"],
 
                lastname=user_data["lastname"],
 
                active=user_data["active"],
 
                active=active,
 
                admin=user_data["admin"],
 
                extern_name=user_data["extern_name"],
 
                extern_type=self.name
 
                extern_type=self.name,
 
            )
 
            # enforce user is just in given groups, all of them has to be ones
 
            # created from plugins. We store this info in _group_data JSON field
 
            groups = user_data['groups'] or []
 
            UserGroupModel().enforce_groups(user, groups, self.name)
 
            Session().commit()
 
        return user_data
 

	
 

	
 
def loadplugin(plugin):
 
    """
 
    Imports, instantiates, and returns the authentication plugin in the module named by plugin
 
@@ -360,24 +352,29 @@ def authenticate(username, password, env
 
            plugin_settings[v["name"]] = setting.app_settings_value if setting else None
 
        log.debug('Settings for auth plugin %s:\n%s', plugin_name, formatted_json(plugin_settings))
 

	
 
        if not str2bool(plugin_settings["enabled"]):
 
            log.info("Authentication plugin %s is disabled, skipping for %s",
 
                     module, username)
 
            continue
 

	
 
        # use plugin's method of user extraction.
 
        user = plugin.get_user(username, environ=environ,
 
                               settings=plugin_settings)
 
        log.debug('Plugin %s extracted user `%s`', module, user)
 

	
 
        if user is not None and not user.active:
 
            log.error("Rejecting authentication of in-active user %s", user)
 
            continue
 

	
 
        if not plugin.accepts(user):
 
            log.debug('Plugin %s does not accept user `%s` for authentication',
 
                      module, user)
 
            continue
 
        else:
 
            log.debug('Plugin %s accepted user `%s` for authentication',
 
                      module, user)
 
            # The user might have tried to authenticate using their email address,
 
            # then the username variable wouldn't contain a valid username.
 
            # But as the plugin has accepted the user, .username field should
 
            # have a valid username, so use it for authentication purposes.
 
            if user is not None:
 
@@ -399,22 +396,22 @@ def authenticate(username, password, env
 
            log.debug('Plugin returned proper authentication data')
 
            return user_data
 

	
 
        # we failed to Auth because .auth() method didn't return the user
 
        if username:
 
            log.warning("User `%s` failed to authenticate against %s",
 
                        username, module)
 
    return None
 

	
 

	
 
def get_managed_fields(user):
 
    """return list of fields that are managed by the user's auth source, usually some of
 
    'username', 'firstname', 'lastname', 'email', 'active', 'password'
 
    'username', 'firstname', 'lastname', 'email', 'password'
 
    """
 
    auth_plugins = get_auth_plugins()
 
    for plugin in auth_plugins:
 
        module = plugin.__class__.__module__
 
        log.debug('testing %s (%s) with auth plugin %s', user, user.extern_type, module)
 
        if plugin.name == user.extern_type:
 
            return plugin.get_managed_fields()
 
    log.error('no auth plugin %s found for %s', user.extern_type, user)
 
    return [] # TODO: Fail badly instead of allowing everything to be edited?
kallithea/lib/auth_modules/auth_container.py
Show inline comments
 
@@ -96,28 +96,24 @@ class KallitheaAuthPlugin(auth_modules.K
 
                "description": "Perform cleaning of user, if passed user has @ in username "
 
                               "then first part before @ is taken. "
 
                               "If there's \\ in the username only the part after \\ is taken",
 
                "default": "True",
 
                "formname": "Clean username"
 
            },
 
        ]
 
        return settings
 

	
 
    def use_fake_password(self):
 
        return True
 

	
 
    def user_activation_state(self):
 
        def_user_perms = User.get_default_user().AuthUser.permissions['global']
 
        return 'hg.extern_activate.auto' in def_user_perms
 

	
 
    def _clean_username(self, username):
 
        # Removing realm and domain from username
 
        username = username.partition('@')[0]
 
        username = username.rpartition('\\')[2]
 
        return username
 

	
 
    def _get_username(self, environ, settings):
 
        username = None
 
        environ = environ or {}
 
        if not environ:
 
            log.debug('got empty environ: %s', environ)
 

	
 
@@ -186,37 +182,35 @@ class KallitheaAuthPlugin(auth_modules.K
 

	
 
        if not username:
 
            # we don't have any objects in DB, user doesn't exist, extract
 
            # username from environ based on the settings
 
            username = self._get_username(environ, settings)
 

	
 
        # if cannot fetch username, it's a no-go for this plugin to proceed
 
        if not username:
 
            return None
 

	
 
        # old attrs fetched from Kallithea database
 
        admin = getattr(userobj, 'admin', False)
 
        active = getattr(userobj, 'active', True)
 
        email = environ.get(settings.get('email_header'), getattr(userobj, 'email', ''))
 
        firstname = environ.get(settings.get('firstname_header'), getattr(userobj, 'firstname', ''))
 
        lastname = environ.get(settings.get('lastname_header'), getattr(userobj, 'lastname', ''))
 

	
 
        user_data = {
 
            'username': username,
 
            'firstname': safe_unicode(firstname or username),
 
            'lastname': safe_unicode(lastname or ''),
 
            'groups': [],
 
            'email': email or '',
 
            'admin': admin or False,
 
            'active': active,
 
            'extern_name': username,
 
        }
 

	
 
        log.info('user `%s` authenticated correctly', user_data['username'])
 
        return user_data
 

	
 
    def get_managed_fields(self):
 
        fields = ['username', 'password']
 
        if(Setting.get_by_name('auth_container_email_header').app_settings_value):
 
            fields.append('email')
 
        if(Setting.get_by_name('auth_container_firstname_header').app_settings_value):
 
            fields.append('firstname')
kallithea/lib/auth_modules/auth_crowd.py
Show inline comments
 
@@ -184,28 +184,24 @@ class KallitheaAuthPlugin(auth_modules.K
 
                "name": "admin_groups",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "A comma separated list of group names that identify users as Kallithea Administrators",
 
                "formname": "Admin Groups"
 
            }
 
        ]
 
        return settings
 

	
 
    def use_fake_password(self):
 
        return True
 

	
 
    def user_activation_state(self):
 
        def_user_perms = User.get_default_user().AuthUser.permissions['global']
 
        return 'hg.extern_activate.auto' in def_user_perms
 

	
 
    def auth(self, userobj, username, password, settings, **kwargs):
 
        """
 
        Given a user object (which may be null), username, a plaintext password,
 
        and a settings object (containing all the keys needed as listed in settings()),
 
        authenticate this user's login attempt.
 

	
 
        Return None on failure. On success, return a dictionary of the form:
 

	
 
            see: KallitheaAuthPluginBase.auth_func_attrs
 
        This is later validated for correctness
 
        """
 
        if not username or not password:
 
@@ -222,37 +218,35 @@ class KallitheaAuthPlugin(auth_modules.K
 
            return None
 

	
 
        if not crowd_user.get('active'):
 
            log.error('Crowd authentication as %s returned in-active user', username)
 
            return None
 

	
 
        res = server.user_groups(crowd_user["name"])
 
        log.debug("Crowd groups: \n%s", formatted_json(res))
 
        crowd_user["groups"] = [x["name"] for x in res["groups"]]
 

	
 
        # old attrs fetched from Kallithea database
 
        admin = getattr(userobj, 'admin', False)
 
        active = getattr(userobj, 'active', True)
 
        email = getattr(userobj, 'email', '')
 
        firstname = getattr(userobj, 'firstname', '')
 
        lastname = getattr(userobj, 'lastname', '')
 

	
 
        user_data = {
 
            'username': crowd_user["name"] or username,
 
            'firstname': crowd_user["first-name"] or firstname,
 
            'lastname': crowd_user["last-name"] or lastname,
 
            'groups': crowd_user["groups"],
 
            'email': crowd_user["email"] or email,
 
            'admin': admin,
 
            'active': active,
 
            'extern_name': crowd_user["name"],
 
        }
 

	
 
        # set an admin if we're in admin_groups of crowd
 
        for group in settings["admin_groups"].split(","):
 
            if group in user_data["groups"]:
 
                user_data["admin"] = True
 
        log.debug("Final crowd user object: \n%s", formatted_json(user_data))
 
        log.info('user %s authenticated correctly', user_data['username'])
 
        return user_data
 

	
 
    def get_managed_fields(self):
kallithea/lib/auth_modules/auth_internal.py
Show inline comments
 
@@ -38,28 +38,24 @@ log = logging.getLogger(__name__)
 
class KallitheaAuthPlugin(auth_modules.KallitheaAuthPluginBase):
 
    def __init__(self):
 
        pass
 

	
 
    @hybrid_property
 
    def name(self):
 
        # Also found as kallithea.lib.model.db.User.DEFAULT_AUTH_TYPE
 
        return 'internal'
 

	
 
    def settings(self):
 
        return []
 

	
 
    def user_activation_state(self):
 
        def_user_perms = User.get_default_user().AuthUser.permissions['global']
 
        return 'hg.register.auto_activate' in def_user_perms
 

	
 
    def accepts(self, user, accepts_empty=True):
 
        """
 
        Custom accepts for this auth that doesn't accept empty users. We
 
        know that user exists in database.
 
        """
 
        return super(KallitheaAuthPlugin, self).accepts(user,
 
                                                        accepts_empty=False)
 

	
 
    def auth(self, userobj, username, password, settings, **kwargs):
 
        if not userobj:
 
            log.debug('userobj was:%s skipping', userobj)
 
            return None
 
@@ -69,37 +65,33 @@ class KallitheaAuthPlugin(auth_modules.K
 
            return None
 
        if not username:
 
            log.debug('Empty username - skipping...')
 
            return None
 

	
 
        user_data = {
 
            "username": userobj.username,
 
            "firstname": userobj.firstname,
 
            "lastname": userobj.lastname,
 
            "groups": [],
 
            "email": userobj.email,
 
            "admin": userobj.admin,
 
            "active": userobj.active,
 
            "extern_name": userobj.user_id,
 
        }
 
        log.debug(formatted_json(user_data))
 

	
 
        log.debug(formatted_json(user_data))
 
        if userobj.active:
 
            from kallithea.lib import auth
 
            password_match = auth.check_password(password, userobj.password)
 
            if userobj.is_default_user and userobj.active:
 
                log.info('user %s authenticated correctly as anonymous user',
 
                         username)
 
                return user_data
 
        from kallithea.lib import auth
 
        password_match = auth.check_password(password, userobj.password)
 
        if userobj.is_default_user:
 
            log.info('user %s authenticated correctly as anonymous user',
 
                     username)
 
            return user_data
 

	
 
            elif userobj.username == username and password_match:
 
                log.info('user %s authenticated correctly', user_data['username'])
 
                return user_data
 
            log.error("user %s had a bad password", username)
 
            return None
 
        else:
 
            log.warning('user %s tried auth but is disabled', username)
 
            return None
 
        elif userobj.username == username and password_match:
 
            log.info('user %s authenticated correctly', user_data['username'])
 
            return user_data
 

	
 
        log.error("user %s had a bad password", username)
 
        return None
 

	
 
    def get_managed_fields(self):
 
        # Note: 'username' should only be editable (at least for user) if self registration is enabled
 
        return []
kallithea/lib/auth_modules/auth_ldap.py
Show inline comments
 
@@ -280,28 +280,24 @@ class KallitheaAuthPlugin(auth_modules.K
 
                "name": "attr_email",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "LDAP Attribute to map to email address",
 
                "formname": "Email Attribute"
 
            }
 
        ]
 
        return settings
 

	
 
    def use_fake_password(self):
 
        return True
 

	
 
    def user_activation_state(self):
 
        def_user_perms = User.get_default_user().AuthUser.permissions['global']
 
        return 'hg.extern_activate.auto' in def_user_perms
 

	
 
    def auth(self, userobj, username, password, settings, **kwargs):
 
        """
 
        Given a user object (which may be null), username, a plaintext password,
 
        and a settings object (containing all the keys needed as listed in settings()),
 
        authenticate this user's login attempt.
 

	
 
        Return None on failure. On success, return a dictionary of the form:
 

	
 
            see: KallitheaAuthPluginBase.auth_func_attrs
 
        This is later validated for correctness
 
        """
 

	
 
@@ -330,37 +326,35 @@ class KallitheaAuthPlugin(auth_modules.K
 
            kwargs['bind_pass'] = password
 
        log.debug('Checking for ldap authentication')
 

	
 
        try:
 
            aldap = AuthLdap(**kwargs)
 
            (user_dn, ldap_attrs) = aldap.authenticate_ldap(username, password)
 
            log.debug('Got ldap DN response %s', user_dn)
 

	
 
            get_ldap_attr = lambda k: ldap_attrs.get(settings.get(k), [''])[0]
 

	
 
            # old attrs fetched from Kallithea database
 
            admin = getattr(userobj, 'admin', False)
 
            active = getattr(userobj, 'active', self.user_activation_state())
 
            email = getattr(userobj, 'email', '')
 
            firstname = getattr(userobj, 'firstname', '')
 
            lastname = getattr(userobj, 'lastname', '')
 

	
 
            user_data = {
 
                'username': username,
 
                'firstname': safe_unicode(get_ldap_attr('attr_firstname') or firstname),
 
                'lastname': safe_unicode(get_ldap_attr('attr_lastname') or lastname),
 
                'groups': [],
 
                'email': get_ldap_attr('attr_email') or email,
 
                'admin': admin,
 
                'active': active,
 
                'extern_name': user_dn,
 
            }
 
            log.info('user %s authenticated correctly', user_data['username'])
 
            return user_data
 

	
 
        except LdapUsernameError:
 
            log.info('Error authenticating %s with LDAP: User not found', username)
 
        except LdapPasswordError:
 
            log.info('Error authenticating %s with LDAP: Password error', username)
 
        except LdapImportError:
 
            log.error('Error authenticating %s with LDAP: LDAP not available', username)
 
        return None
kallithea/lib/auth_modules/auth_pam.py
Show inline comments
 
@@ -106,37 +106,35 @@ class KallitheaAuthPlugin(auth_modules.K
 
                    _auth_cache[username] = time.time()
 
            finally:
 
                _pam_lock.release()
 

	
 
            if not auth_result:
 
                log.error("PAM was unable to authenticate user: %s", username)
 
                return None
 
        else:
 
            log.debug("Using cached auth for user: %s", username)
 

	
 
        # old attrs fetched from Kallithea database
 
        admin = getattr(userobj, 'admin', False)
 
        active = getattr(userobj, 'active', True)
 
        email = getattr(userobj, 'email', '') or "%s@%s" % (username, socket.gethostname())
 
        firstname = getattr(userobj, 'firstname', '')
 
        lastname = getattr(userobj, 'lastname', '')
 

	
 
        user_data = {
 
            'username': username,
 
            'firstname': firstname,
 
            'lastname': lastname,
 
            'groups': [g.gr_name for g in grp.getgrall() if username in g.gr_mem],
 
            'email': email,
 
            'admin': admin,
 
            'active': active,
 
            'extern_name': username,
 
        }
 

	
 
        try:
 
            user_pw_data = pwd.getpwnam(username)
 
            regex = settings["gecos"]
 
            match = re.search(regex, user_pw_data.pw_gecos)
 
            if match:
 
                user_data["firstname"] = match.group('first_name')
 
                user_data["lastname"] = match.group('last_name')
 
        except Exception:
 
            log.warning("Cannot extract additional info for PAM user %s", username)
0 comments (0 inline, 0 general)