Changeset - ada6571a6d27
[Not reviewed]
default
0 3 0
domruf - 10 years ago 2015-10-04 20:43:12
dominikruf@gmail.com
auth: let container authentication get email, first and last name from custom headers
3 files changed with 144 insertions and 8 deletions:
0 comments (0 inline, 0 general)
docs/setup.rst
Show inline comments
 
@@ -382,96 +382,164 @@ administrator can then modify it using K
 
It's also possible for an administrator to create accounts and configure their
 
permissions before the user logs in for the first time, using the :ref:`create-user` API.
 

	
 
Container-based authentication
 
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 

	
 
In a container-based authentication setup, Kallithea reads the user name from
 
the ``REMOTE_USER`` server variable provided by the WSGI container.
 

	
 
After setting up your container (see `Apache with mod_wsgi`_), you'll need
 
to configure it to require authentication on the location configured for
 
Kallithea.
 

	
 
Proxy pass-through authentication
 
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 

	
 
In a proxy pass-through authentication setup, Kallithea reads the user name
 
from the ``X-Forwarded-User`` request header, which should be configured to be
 
sent by the reverse-proxy server.
 

	
 
After setting up your proxy solution (see `Apache virtual host reverse proxy example`_,
 
`Apache as subdirectory`_ or `Nginx virtual host example`_), you'll need to
 
configure the authentication and add the username in a request header named
 
``X-Forwarded-User``.
 

	
 
For example, the following config section for Apache sets a subdirectory in a
 
reverse-proxy setup with basic auth:
 

	
 
.. code-block:: apache
 

	
 
    <Location /someprefix>
 
      ProxyPass http://127.0.0.1:5000/someprefix
 
      ProxyPassReverse http://127.0.0.1:5000/someprefix
 
      SetEnvIf X-Url-Scheme https HTTPS=1
 

	
 
      AuthType Basic
 
      AuthName "Kallithea authentication"
 
      AuthUserFile /srv/kallithea/.htpasswd
 
      Require valid-user
 

	
 
      RequestHeader unset X-Forwarded-User
 

	
 
      RewriteEngine On
 
      RewriteCond %{LA-U:REMOTE_USER} (.+)
 
      RewriteRule .* - [E=RU:%1]
 
      RequestHeader set X-Forwarded-User %{RU}e
 
    </Location>
 

	
 
Setting metadata in container/reverse-proxy
 
'''''''''''''''''''''''''''''''''''''''''''
 

	
 
When a new user account is created on the first login, Kallithea has no information about
 
the user's email and full name. So you can set some additional request headers like in the
 
example below. In this example the user is authenticated via Kerberos and an Apache
 
mod_python fixup handler is used to get the user information from a LDAP server. But you
 
could set the request headers however you want.
 

	
 
.. code-block:: apache
 

	
 
    <Location /someprefix>
 
      ProxyPass http://127.0.0.1:5000/someprefix
 
      ProxyPassReverse http://127.0.0.1:5000/someprefix
 
      SetEnvIf X-Url-Scheme https HTTPS=1
 

	
 
      AuthName "Kerberos Login"
 
      AuthType Kerberos
 
      Krb5Keytab /etc/apache2/http.keytab
 
      KrbMethodK5Passwd off
 
      KrbVerifyKDC on
 
      Require valid-user
 

	
 
      PythonFixupHandler ldapmetadata
 

	
 
      RequestHeader set X_REMOTE_USER %{X_REMOTE_USER}e
 
      RequestHeader set X_REMOTE_EMAIL %{X_REMOTE_EMAIL}e
 
      RequestHeader set X_REMOTE_FIRSTNAME %{X_REMOTE_FIRSTNAME}e
 
      RequestHeader set X_REMOTE_LASTNAME %{X_REMOTE_LASTNAME}e
 
    </Location>
 

	
 
.. code-block:: python
 

	
 
    from mod_python import apache
 
    import ldap
 

	
 
    LDAP_SERVER = "ldap://server.mydomain.com:389"
 
    LDAP_USER = ""
 
    LDAP_PASS = ""
 
    LDAP_ROOT = "dc=mydomain,dc=com"
 
    LDAP_FILTER = "sAMAcountName=%s"
 
    LDAP_ATTR_LIST = ['sAMAcountName','givenname','sn','mail']
 

	
 
    def fixuphandler(req):
 
        if req.user is None:
 
            # no user to search for
 
            return apache.OK
 
        else:
 
            try:
 
                if('\\' in req.user):
 
                    username = req.user.split('\\')[1]
 
                elif('@' in req.user):
 
                    username = req.user.split('@')[0]
 
                else:
 
                    username = req.user
 
                l = ldap.initialize(LDAP_SERVER)
 
                l.simple_bind_s(LDAP_USER, LDAP_PASS)
 
                r = l.search_s(LDAP_ROOT, ldap.SCOPE_SUBTREE, LDAP_FILTER % username, attrlist=LDAP_ATTR_LIST)
 

	
 
                req.subprocess_env['X_REMOTE_USER'] = username
 
                req.subprocess_env['X_REMOTE_EMAIL'] = r[0][1]['mail'][0].lower()
 
                req.subprocess_env['X_REMOTE_FIRSTNAME'] = "%s" % r[0][1]['givenname'][0]
 
                req.subprocess_env['X_REMOTE_LASTNAME'] = "%s" % r[0][1]['sn'][0]
 
            except Exception, e:
 
                apache.log_error("error getting data from ldap %s" % str(e), apache.APLOG_ERR)
 

	
 
            return apache.OK
 

	
 
.. note::
 
   If you enable proxy pass-through authentication, make sure your server is
 
   only accessible through the proxy. Otherwise, any client would be able to
 
   forge the authentication header and could effectively become authenticated
 
   using any account of their liking.
 

	
 

	
 
Integration with issue trackers
 
-------------------------------
 

	
 
Kallithea provides a simple integration with issue trackers. It's possible
 
to define a regular expression that will match an issue ID in commit messages,
 
and have that replaced with a URL to the issue. To enable this simply
 
uncomment the following variables in the ini file::
 

	
 
    issue_pat = (?:^#|\s#)(\w+)
 
    issue_server_link = https://issues.example.com/{repo}/issue/{id}
 
    issue_prefix = #
 

	
 
``issue_pat`` is the regular expression describing which strings in
 
commit messages will be treated as issue references. A match group in
 
parentheses should be used to specify the actual issue id.
 

	
 
The default expression matches issues in the format ``#<number>``, e.g., ``#300``.
 

	
 
Matched issue references are replaced with the link specified in
 
``issue_server_link``. ``{id}`` is replaced with the issue ID, and
 
``{repo}`` with the repository name.  Since the # is stripped away,
 
``issue_prefix`` is prepended to the link text.  ``issue_prefix`` doesn't
 
necessarily need to be ``#``: if you set issue prefix to ``ISSUE-`` this will
 
generate a URL in the format:
 

	
 
.. code-block:: html
 

	
 
  <a href="https://issues.example.com/example_repo/issue/300">ISSUE-300</a>
 

	
 
If needed, more than one pattern can be specified by appending a unique suffix to
 
the variables. For example::
 

	
 
    issue_pat_wiki = (?:wiki-)(.+)
 
    issue_server_link_wiki = https://wiki.example.com/{id}
 
    issue_prefix_wiki = WIKI-
 

	
 
With these settings, wiki pages can be referenced as wiki-some-id, and every
 
such reference will be transformed into:
 

	
 
.. code-block:: html
 

	
kallithea/lib/auth_modules/auth_container.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.auth_modules.auth_container
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Kallithea container based authentication plugin
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Created on Nov 17, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
from kallithea.lib import auth_modules
 
from kallithea.lib.utils2 import str2bool, safe_unicode
 
from kallithea.lib.compat import hybrid_property
 
from kallithea.model.db import User
 
from kallithea.model.db import User, Setting
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class KallitheaAuthPlugin(auth_modules.KallitheaExternalAuthPlugin):
 
    def __init__(self):
 
        pass
 

	
 
    @hybrid_property
 
    def name(self):
 
        return "container"
 

	
 
    @hybrid_property
 
    def is_container_auth(self):
 
        return True
 

	
 
    def settings(self):
 

	
 
        settings = [
 
            {
 
                "name": "header",
 
                "validator": self.validators.UnicodeString(strip=True, not_empty=True),
 
                "type": "string",
 
                "description": "Header to extract the user from",
 
                "description": "Request header to extract the username from",
 
                "default": "REMOTE_USER",
 
                "formname": "Header"
 
                "formname": "Username header"
 
            },
 
            {
 
                "name": "email_header",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "Optional request header to extract the email from",
 
                "default": "",
 
                "formname": "Email header"
 
            },
 
            {
 
                "name": "firstname_header",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "Optional request header to extract the first name from",
 
                "default": "",
 
                "formname": "Firstname header"
 
            },
 
            {
 
                "name": "lastname_header",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "Optional request header to extract the last name from",
 
                "default": "",
 
                "formname": "Lastname header"
 
            },
 
            {
 
                "name": "fallback_header",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "Header to extract the user from when main one fails",
 
                "description": "Request header to extract the user from when main one fails",
 
                "default": "HTTP_X_FORWARDED_USER",
 
                "formname": "Fallback header"
 
            },
 
            {
 
                "name": "clean_username",
 
                "validator": self.validators.StringBoolean(if_missing=False),
 
                "type": "bool",
 
                "description": "Perform cleaning of user, if passed user has @ in username "
 
                               "then first part before @ is taken. "
 
                               "If there's \\ in the username only the part after \\ is taken",
 
                "default": "True",
 
                "formname": "Clean username"
 
            },
 
        ]
 
        return settings
 

	
 
    def use_fake_password(self):
 
        return True
 

	
 
    def user_activation_state(self):
 
        def_user_perms = User.get_default_user().AuthUser.permissions['global']
 
        return 'hg.extern_activate.auto' in def_user_perms
 

	
 
    def _clean_username(self, username):
 
        # Removing realm and domain from username
 
        username = username.partition('@')[0]
 
        username = username.rpartition('\\')[2]
 
        return username
 

	
 
    def _get_username(self, environ, settings):
 
        username = None
 
        environ = environ or {}
 
        if not environ:
 
            log.debug('got empty environ: %s', environ)
 

	
 
        settings = settings or {}
 
        if settings.get('header'):
 
            header = settings.get('header')
 
            username = environ.get(header)
 
            log.debug('extracted %s:%s', header, username)
 

	
 
        # fallback mode
 
        if not username and settings.get('fallback_header'):
 
            header = settings.get('fallback_header')
 
            username = environ.get(header)
 
            log.debug('extracted %s:%s', header, username)
 

	
 
        if username and str2bool(settings.get('clean_username')):
 
@@ -127,69 +151,76 @@ class KallitheaAuthPlugin(auth_modules.K
 
        settings = kwargs.get('settings') or {}
 
        username = self._get_username(environ, settings)
 
        # we got the username, so use default method now
 
        return super(KallitheaAuthPlugin, self).get_user(username)
 

	
 
    def auth(self, userobj, username, password, settings, **kwargs):
 
        """
 
        Gets the container_auth username (or email). It tries to get username
 
        from REMOTE_USER if this plugin is enabled, if that fails
 
        it tries to get username from HTTP_X_FORWARDED_USER if fallback header
 
        is set. clean_username extracts the username from this data if it's
 
        having @ in it.
 
        Return None on failure. On success, return a dictionary of the form:
 

	
 
            see: KallitheaAuthPluginBase.auth_func_attrs
 

	
 
        :param userobj:
 
        :param username:
 
        :param password:
 
        :param settings:
 
        :param kwargs:
 
        """
 
        environ = kwargs.get('environ')
 
        if not environ:
 
            log.debug('Empty environ data skipping...')
 
            return None
 

	
 
        if not userobj:
 
            userobj = self.get_user('', environ=environ, settings=settings)
 

	
 
        # we don't care passed username/password for container auth plugins.
 
        # only way to log in is using environ
 
        username = None
 
        if userobj:
 
            username = getattr(userobj, 'username')
 

	
 
        if not username:
 
            # we don't have any objects in DB, user doesn't exist, extract
 
            # username from environ based on the settings
 
            username = self._get_username(environ, settings)
 

	
 
        # if cannot fetch username, it's a no-go for this plugin to proceed
 
        if not username:
 
            return None
 

	
 
        # old attrs fetched from Kallithea database
 
        admin = getattr(userobj, 'admin', False)
 
        active = getattr(userobj, 'active', True)
 
        email = getattr(userobj, 'email', '')
 
        firstname = getattr(userobj, 'firstname', '')
 
        lastname = getattr(userobj, 'lastname', '')
 
        email = environ.get(settings.get('email_header'), getattr(userobj, 'email', ''))
 
        firstname = environ.get(settings.get('firstname_header'), getattr(userobj, 'firstname', ''))
 
        lastname = environ.get(settings.get('lastname_header'), getattr(userobj, 'lastname', ''))
 

	
 
        user_data = {
 
            'username': username,
 
            'firstname': safe_unicode(firstname or username),
 
            'lastname': safe_unicode(lastname or ''),
 
            'groups': [],
 
            'email': email or '',
 
            'admin': admin or False,
 
            'active': active,
 
            'active_from_extern': True,
 
            'extern_name': username,
 
        }
 

	
 
        log.info('user `%s` authenticated correctly', user_data['username'])
 
        return user_data
 

	
 
    def get_managed_fields(self):
 
        return ['username', 'password']
 
        fields = ['username', 'password']
 
        if(Setting.get_by_name('auth_container_email_header').app_settings_value):
 
            fields.append('email')
 
        if(Setting.get_by_name('auth_container_firstname_header').app_settings_value):
 
            fields.append('firstname')
 
        if(Setting.get_by_name('auth_container_lastname_header').app_settings_value):
 
            fields.append('lastname')
 
        return fields
kallithea/tests/functional/test_admin_auth_settings.py
Show inline comments
 
@@ -90,140 +90,177 @@ class TestAuthSettingsController(TestCon
 
                       'auth_ldap_tls_reqcert': 'NEVER',
 
                       'auth_ldap_dn_user': '',
 
                       'auth_ldap_dn_pass': '',
 
                       'auth_ldap_base_dn': '',
 
                       'auth_ldap_filter': '',
 
                       'auth_ldap_search_scope': 'BASE',
 
                       'auth_ldap_attr_login': '',  # <----- missing required input
 
                       'auth_ldap_attr_firstname': '',
 
                       'auth_ldap_attr_lastname': '',
 
                       'auth_ldap_attr_email': ''})
 

	
 
        test_url = url(controller='admin/auth_settings',
 
                       action='auth_settings')
 

	
 
        response = self.app.post(url=test_url, params=params)
 

	
 
        response.mustcontain("""<span class="error-message">The LDAP Login"""
 
                             """ attribute of the CN must be specified""")
 

	
 
    def test_ldap_login(self):
 
        pass
 

	
 
    def test_ldap_login_incorrect(self):
 
        pass
 

	
 
    def _container_auth_setup(self, **settings):
 
        self.log_user()
 

	
 
        params = self._enable_plugins('kallithea.lib.auth_modules.auth_internal,kallithea.lib.auth_modules.auth_container')
 
        params.update(settings)
 

	
 
        test_url = url(controller='admin/auth_settings',
 
                       action='auth_settings')
 

	
 
        response = self.app.post(url=test_url, params=params)
 
        response = response.follow()
 
        response.click('Log Out') # end admin login session
 

	
 
    def _container_auth_verify_login(self, resulting_username, **get_kwargs):
 
        response = self.app.get(
 
            url=url(controller='admin/my_account', action='my_account'),
 
            **get_kwargs
 
        )
 
        response.mustcontain('My Account %s' % resulting_username)
 

	
 
    def test_container_auth_login_header(self):
 
        self._container_auth_setup(
 
            auth_container_header='THE_USER_NAME',
 
            auth_container_email_header='',
 
            auth_container_firstname_header='',
 
            auth_container_lastname_header='',
 
            auth_container_fallback_header='',
 
            auth_container_clean_username='False',
 
        )
 
        self._container_auth_verify_login(
 
            extra_environ={'THE_USER_NAME': 'john@example.org'},
 
            resulting_username='john@example.org',
 
        )
 

	
 
    def test_container_auth_login_header_attr(self):
 
        self._container_auth_setup(
 
            auth_container_header='THE_USER_NAME',
 
            auth_container_email_header='THE_USER_EMAIL',
 
            auth_container_firstname_header='THE_USER_FIRSTNAME',
 
            auth_container_lastname_header='THE_USER_LASTNAME',
 
            auth_container_fallback_header='',
 
            auth_container_clean_username='False',
 
        )
 
        response = self.app.get(
 
            url=url(controller='admin/my_account', action='my_account'),
 
            extra_environ={'THE_USER_NAME': 'johnd',
 
                           'THE_USER_EMAIL': 'john@example.org',
 
                           'THE_USER_FIRSTNAME': 'John',
 
                           'THE_USER_LASTNAME': 'Doe',
 
                           }
 
        )
 
        self.assertEqual(response.form['email'].value, 'john@example.org')
 
        self.assertEqual(response.form['firstname'].value, 'John')
 
        self.assertEqual(response.form['lastname'].value, 'Doe')
 

	
 

	
 
    def test_container_auth_login_fallback_header(self):
 
        self._container_auth_setup(
 
            auth_container_header='THE_USER_NAME',
 
            auth_container_email_header='',
 
            auth_container_firstname_header='',
 
            auth_container_lastname_header='',
 
            auth_container_fallback_header='HTTP_X_YZZY',
 
            auth_container_clean_username='False',
 
        )
 
        self._container_auth_verify_login(
 
            headers={'X-Yzzy': r'foo\bar'},
 
            resulting_username=r'foo\bar',
 
        )
 

	
 
    def test_container_auth_clean_username_at(self):
 
        self._container_auth_setup(
 
            auth_container_header='REMOTE_USER',
 
            auth_container_email_header='',
 
            auth_container_firstname_header='',
 
            auth_container_lastname_header='',
 
            auth_container_fallback_header='',
 
            auth_container_clean_username='True',
 
        )
 
        self._container_auth_verify_login(
 
            extra_environ={'REMOTE_USER': 'john@example.org'},
 
            resulting_username='john',
 
        )
 

	
 
    def test_container_auth_clean_username_backslash(self):
 
        self._container_auth_setup(
 
            auth_container_header='REMOTE_USER',
 
            auth_container_email_header='',
 
            auth_container_firstname_header='',
 
            auth_container_lastname_header='',
 
            auth_container_fallback_header='',
 
            auth_container_clean_username='True',
 
        )
 
        self._container_auth_verify_login(
 
            extra_environ={'REMOTE_USER': r'example\jane'},
 
            resulting_username=r'jane',
 
        )
 

	
 
    def test_container_auth_no_logout(self):
 
        self._container_auth_setup(
 
            auth_container_header='REMOTE_USER',
 
            auth_container_email_header='',
 
            auth_container_firstname_header='',
 
            auth_container_lastname_header='',
 
            auth_container_fallback_header='',
 
            auth_container_clean_username='True',
 
        )
 
        response = self.app.get(
 
            url=url(controller='admin/my_account', action='my_account'),
 
            extra_environ={'REMOTE_USER': 'john'},
 
        )
 
        self.assertNotIn('Log Out', response.normal_body)
 

	
 
    def test_crowd_save_settings(self):
 
        self.log_user()
 

	
 
        params = self._enable_plugins('kallithea.lib.auth_modules.auth_internal,kallithea.lib.auth_modules.auth_crowd')
 
        params.update({'auth_crowd_host': ' hostname ',
 
                       'auth_crowd_app_password': 'secret',
 
                       'auth_crowd_admin_groups': 'mygroup',
 
                       'auth_crowd_port': '123',
 
                       'auth_crowd_app_name': 'xyzzy'})
 

	
 
        test_url = url(controller='admin/auth_settings',
 
                       action='auth_settings')
 

	
 
        response = self.app.post(url=test_url, params=params)
 
        self.checkSessionFlash(response, 'Auth settings updated successfully')
 

	
 
        new_settings = Setting.get_auth_settings()
 
        self.assertEqual(new_settings['auth_crowd_host'], u'hostname',
 
                         'fail db write compare')
 

	
 
    def test_pam_save_settings(self):
 
        self.log_user()
 

	
 
        if not pam_lib_installed:
 
            raise SkipTest('skipping due to missing pam lib')
 

	
 
        params = self._enable_plugins('kallithea.lib.auth_modules.auth_internal,kallithea.lib.auth_modules.auth_pam')
 
        params.update({'auth_pam_service': 'kallithea',
 
                       'auth_pam_gecos': '^foo-.*'})
 

	
 
        test_url = url(controller='admin/auth_settings',
 
                       action='auth_settings')
 

	
 
        response = self.app.post(url=test_url, params=params)
 
        self.checkSessionFlash(response, 'Auth settings updated successfully')
 

	
 
        new_settings = Setting.get_auth_settings()
 
        self.assertEqual(new_settings['auth_pam_service'], u'kallithea',
 
                         'fail db write compare')
0 comments (0 inline, 0 general)