Changeset - 03afa7766ac7
[Not reviewed]
default
0 7 0
Mads Kiilerich - 10 years ago 2015-07-31 15:44:07
madski@unity3d.com
auth: consistently use the name 'user_data' for the dict with data for the authenticated user
7 files changed with 50 insertions and 50 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/auth_modules/__init__.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Authentication modules
 
"""
 

	
 
import logging
 
import traceback
 

	
 
from kallithea import EXTERN_TYPE_INTERNAL
 
from kallithea.lib.compat import importlib
 
from kallithea.lib.utils2 import str2bool
 
from kallithea.lib.compat import formatted_json, hybrid_property
 
from kallithea.lib.auth import PasswordGenerator
 
from kallithea.model.user import UserModel
 
from kallithea.model.db import Setting, User
 
from kallithea.model.meta import Session
 
from kallithea.model.user_group import UserGroupModel
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class LazyFormencode(object):
 
    def __init__(self, formencode_obj, *args, **kwargs):
 
        self.formencode_obj = formencode_obj
 
        self.args = args
 
        self.kwargs = kwargs
 

	
 
    def __call__(self, *args, **kwargs):
 
        from inspect import isfunction
 
        formencode_obj = self.formencode_obj
 
        if isfunction(formencode_obj):
 
            #case we wrap validators into functions
 
            formencode_obj = self.formencode_obj(*args, **kwargs)
 
        return formencode_obj(*self.args, **self.kwargs)
 

	
 

	
 
class KallitheaAuthPluginBase(object):
 
    auth_func_attrs = {
 
        "username": "unique username",
 
        "firstname": "first name",
 
        "lastname": "last name",
 
        "email": "email address",
 
        "groups": '["list", "of", "groups"]',
 
        "extern_name": "name in external source of record",
 
        "extern_type": "type of external source of record",
 
        "admin": 'True|False defines if user should be Kallithea admin',
 
        "active": 'True|False defines active state of user in Kallithea',
 
        "active_from_extern": "True|False|None, active state from the external auth, "
 
                              "None means use value from the auth plugin"
 
    }
 

	
 
    @property
 
    def validators(self):
 
        """
 
        Exposes Kallithea validators modules
 
        """
 
        # this is a hack to overcome issues with pylons threadlocals and
 
        # translator object _() not being registered properly.
 
        class LazyCaller(object):
 
            def __init__(self, name):
 
                self.validator_name = name
 

	
 
            def __call__(self, *args, **kwargs):
 
                from kallithea.model import validators as v
 
                obj = getattr(v, self.validator_name)
 
                #log.debug('Initializing lazy formencode object: %s' % obj)
 
                return LazyFormencode(obj, *args, **kwargs)
 

	
 

	
 
        class ProxyGet(object):
 
            def __getattribute__(self, name):
 
                return LazyCaller(name)
 

	
 
        return ProxyGet()
 

	
 
    @hybrid_property
 
    def name(self):
 
        """
 
        Returns the name of this authentication plugin.
 

	
 
        :returns: string
 
        """
 
        raise NotImplementedError("Not implemented in base class")
 

	
 
    @hybrid_property
 
    def is_container_auth(self):
 
        """
 
        Returns bool if this module uses container auth.
 

	
 
        This property will trigger an automatic call to authenticate on
 
        a visit to the website or during a push/pull.
 

	
 
        :returns: bool
 
        """
 
        return False
 

	
 
    def accepts(self, user, accepts_empty=True):
 
        """
 
        Checks if this authentication module should accept a request for
 
        the current user.
 

	
 
        :param user: user object fetched using plugin's get_user() method.
 
        :param accepts_empty: if True accepts don't allow the user to be empty
 
        :returns: boolean
 
        """
 
        plugin_name = self.name
 
        if not user and not accepts_empty:
 
            log.debug('User is empty not allowed to authenticate')
 
            return False
 

	
 
        if user and user.extern_type and user.extern_type != plugin_name:
 
            log.debug('User %s should authenticate using %s this is %s, skipping'
 
                      % (user, user.extern_type, plugin_name))
 

	
 
            return False
 
        return True
 

	
 
    def get_user(self, username=None, **kwargs):
 
        """
 
        Helper method for user fetching in plugins, by default it's using
 
        simple fetch by username, but this method can be customized in plugins
 
        eg. container auth plugin to fetch user by environ params
 

	
 
        :param username: username if given to fetch from database
 
        :param kwargs: extra arguments needed for user fetching.
 
        """
 
        user = None
 
        log.debug('Trying to fetch user `%s` from Kallithea database'
 
                  % (username))
 
        if username:
 
            user = User.get_by_username(username)
 
            if not user:
 
                log.debug('Fallback to fetch user in case insensitive mode')
 
                user = User.get_by_username(username, case_insensitive=True)
 
        else:
 
            log.debug('provided username:`%s` is empty skipping...' % username)
 
        return user
 

	
 
    def settings(self):
 
        """
 
        Return a list of the form:
 
        [
 
            {
 
                "name": "OPTION_NAME",
 
                "type": "[bool|password|string|int|select]",
 
                ["values": ["opt1", "opt2", ...]]
 
                "validator": "expr"
 
                "description": "A short description of the option" [,
 
                "default": Default Value],
 
                ["formname": "Friendly Name for Forms"]
 
            } [, ...]
 
        ]
 

	
 
        This is used to interrogate the authentication plugin as to what
 
        settings it expects to be present and configured.
 

	
 
        'type' is a shorthand notation for what kind of value this option is.
 
        This is primarily used by the auth web form to control how the option
 
        is configured.
 
                bool : checkbox
 
                password : password input box
 
                string : input box
 
                select : single select dropdown
 

	
 
        'validator' is an lazy instantiated form field validator object, ala
 
        formencode. You need to *call* this object to init the validators.
 
        All calls to Kallithea validators should be used through self.validators
 
        which is a lazy loading proxy of formencode module.
 
        """
 
        raise NotImplementedError("Not implemented in base class")
 

	
 
    def plugin_settings(self):
 
        """
 
        This method is called by the authentication framework, not the .settings()
 
        method. This method adds a few default settings (e.g., "enabled"), so that
 
        plugin authors don't have to maintain a bunch of boilerplate.
 

	
 
        OVERRIDING THIS METHOD WILL CAUSE YOUR PLUGIN TO FAIL.
 
        """
 

	
 
        rcsettings = self.settings()
 
        rcsettings.insert(0, {
 
            "name": "enabled",
 
            "validator": self.validators.StringBoolean(if_missing=False),
 
            "type": "bool",
 
            "description": "Enable or Disable this Authentication Plugin",
 
            "formname": "Enabled"
 
            }
 
        )
 
        return rcsettings
 

	
 
    def user_activation_state(self):
 
        """
 
        Defines user activation state when creating new users
 

	
 
        :returns: boolean
 
        """
 
        raise NotImplementedError("Not implemented in base class")
 

	
 
    def auth(self, userobj, username, passwd, settings, **kwargs):
 
        """
 
        Given a user object (which may be None), username, a plaintext password,
 
        and a settings object (containing all the keys needed as listed in settings()),
 
        authenticate this user's login attempt.
 

	
 
        Return None on failure. On success, return a dictionary with keys from
 
        KallitheaAuthPluginBase.auth_func_attrs.
 

	
 
        This is later validated for correctness.
 
        """
 
        raise NotImplementedError("not implemented in base class")
 

	
 
    def _authenticate(self, userobj, username, passwd, settings, **kwargs):
 
        """
 
        Wrapper to call self.auth() that validates call on it
 

	
 
        :param userobj: userobj
 
        :param username: username
 
        :param passwd: plaintext password
 
        :param settings: plugin settings
 
        """
 
        auth = self.auth(userobj, username, passwd, settings, **kwargs)
 
        if auth is not None:
 
            return self._validate_auth_return(auth)
 
        user_data = self.auth(userobj, username, passwd, settings, **kwargs)
 
        if user_data is not None:
 
            return self._validate_auth_return(user_data)
 
        return None
 

	
 
    def _validate_auth_return(self, ret):
 
        if not isinstance(ret, dict):
 
    def _validate_auth_return(self, user_data):
 
        if not isinstance(user_data, dict):
 
            raise Exception('returned value from auth must be a dict')
 
        for k in self.auth_func_attrs:
 
            if k not in ret:
 
            if k not in user_data:
 
                raise Exception('Missing %s attribute from returned data' % k)
 
        return ret
 
        return user_data
 

	
 

	
 
class KallitheaExternalAuthPlugin(KallitheaAuthPluginBase):
 
    def use_fake_password(self):
 
        """
 
        Return a boolean that indicates whether or not we should set the user's
 
        password to a random value when it is authenticated by this plugin.
 
        If your plugin provides authentication, then you will generally want this.
 

	
 
        :returns: boolean
 
        """
 
        raise NotImplementedError("Not implemented in base class")
 

	
 
    def _authenticate(self, userobj, username, passwd, settings, **kwargs):
 
        auth = super(KallitheaExternalAuthPlugin, self)._authenticate(
 
        user_data = super(KallitheaExternalAuthPlugin, self)._authenticate(
 
            userobj, username, passwd, settings, **kwargs)
 
        if auth is not None:
 
        if user_data is not None:
 
            # maybe plugin will clean the username ?
 
            # we should use the return value
 
            username = auth['username']
 
            username = user_data['username']
 
            # if user is not active from our extern type we should fail to auth
 
            # this can prevent from creating users in Kallithea when using
 
            # external authentication, but if it's inactive user we shouldn't
 
            # create that user anyway
 
            if auth['active_from_extern'] is False:
 
            if user_data['active_from_extern'] is False:
 
                log.warning("User %s authenticated against %s, but is inactive"
 
                            % (username, self.__module__))
 
                return None
 

	
 
            if self.use_fake_password():
 
                # Randomize the PW because we don't need it, but don't want
 
                # them blank either
 
                passwd = PasswordGenerator().gen_password(length=8)
 

	
 
            log.debug('Updating or creating user info from %s plugin'
 
                      % self.name)
 
            user = UserModel().create_or_update(
 
                username=username,
 
                password=passwd,
 
                email=auth["email"],
 
                firstname=auth["firstname"],
 
                lastname=auth["lastname"],
 
                active=auth["active"],
 
                admin=auth["admin"],
 
                extern_name=auth["extern_name"],
 
                email=user_data["email"],
 
                firstname=user_data["firstname"],
 
                lastname=user_data["lastname"],
 
                active=user_data["active"],
 
                admin=user_data["admin"],
 
                extern_name=user_data["extern_name"],
 
                extern_type=self.name
 
            )
 
            Session().flush()
 
            # enforce user is just in given groups, all of them has to be ones
 
            # created from plugins. We store this info in _group_data JSON field
 
            groups = auth['groups'] or []
 
            groups = user_data['groups'] or []
 
            UserGroupModel().enforce_groups(user, groups, self.name)
 
            Session().commit()
 
        return auth
 
        return user_data
 

	
 

	
 
def importplugin(plugin):
 
    """
 
    Imports and returns the authentication plugin in the module named by plugin
 
    (e.g., plugin='kallithea.lib.auth_modules.auth_internal'). Returns the
 
    KallitheaAuthPluginBase subclass on success, raises exceptions on failure.
 

	
 
    raises:
 
        AttributeError -- no KallitheaAuthPlugin class in the module
 
        TypeError -- if the KallitheaAuthPlugin is not a subclass of ours KallitheaAuthPluginBase
 
        ImportError -- if we couldn't import the plugin at all
 
    """
 
    log.debug("Importing %s" % plugin)
 
    if not plugin.startswith(u'kallithea.lib.auth_modules.auth_'):
 
        parts = plugin.split(u'.lib.auth_modules.auth_', 1)
 
        if len(parts) == 2:
 
            _module, pn = parts
 
            if pn == EXTERN_TYPE_INTERNAL:
 
                pn = "internal"
 
            plugin = u'kallithea.lib.auth_modules.auth_' + pn
 
    PLUGIN_CLASS_NAME = "KallitheaAuthPlugin"
 
    try:
 
        module = importlib.import_module(plugin)
 
    except (ImportError, TypeError):
 
        log.error(traceback.format_exc())
 
        # TODO: make this more error prone, if by some accident we screw up
 
        # the plugin name, the crash is pretty bad and hard to recover
 
        raise
 

	
 
    log.debug("Loaded auth plugin from %s (module:%s, file:%s)"
 
              % (plugin, module.__name__, module.__file__))
 

	
 
    pluginclass = getattr(module, PLUGIN_CLASS_NAME)
 
    if not issubclass(pluginclass, KallitheaAuthPluginBase):
 
        raise TypeError("Authentication class %s.KallitheaAuthPlugin is not "
 
                        "a subclass of %s" % (plugin, KallitheaAuthPluginBase))
 
    return pluginclass
 

	
 

	
 
def loadplugin(plugin):
 
    """
 
    Loads and returns an instantiated authentication plugin.
 

	
 
        see: importplugin
 
    """
 
    plugin = importplugin(plugin)()
 
    if plugin.plugin_settings.im_func != KallitheaAuthPluginBase.plugin_settings.im_func:
 
        raise TypeError("Authentication class %s.KallitheaAuthPluginBase "
 
                        "has overridden the plugin_settings method, which is "
 
                        "forbidden." % plugin)
 
    return plugin
 

	
 

	
 
def authenticate(username, password, environ=None):
 
    """
 
    Authentication function used for access control,
 
    It tries to authenticate based on enabled authentication modules.
 

	
 
    :param username: username can be empty for container auth
 
    :param password: password can be empty for container auth
 
    :param environ: environ headers passed for container auth
 
    :returns: None if auth failed, plugin_user dict if auth is correct
 
    :returns: None if auth failed, user_data dict if auth is correct
 
    """
 

	
 
    auth_plugins = Setting.get_auth_plugins()
 
    log.debug('Authentication against %s plugins' % (auth_plugins,))
 
    for module in auth_plugins:
 
        try:
 
            plugin = loadplugin(module)
 
        except (ImportError, AttributeError, TypeError), e:
 
            raise ImportError('Failed to load authentication module %s : %s'
 
                              % (module, str(e)))
 
        log.debug('Trying authentication using ** %s **' % (module,))
 
        # load plugin settings from Kallithea database
 
        plugin_name = plugin.name
 
        plugin_settings = {}
 
        for v in plugin.plugin_settings():
 
            conf_key = "auth_%s_%s" % (plugin_name, v["name"])
 
            setting = Setting.get_by_name(conf_key)
 
            plugin_settings[v["name"]] = setting.app_settings_value if setting else None
 
        log.debug('Plugin settings \n%s' % formatted_json(plugin_settings))
 

	
 
        if not str2bool(plugin_settings["enabled"]):
 
            log.info("Authentication plugin %s is disabled, skipping for %s"
 
                     % (module, username))
 
            continue
 

	
 
        # use plugin's method of user extraction.
 
        user = plugin.get_user(username, environ=environ,
 
                               settings=plugin_settings)
 
        log.debug('Plugin %s extracted user is `%s`' % (module, user))
 
        if not plugin.accepts(user):
 
            log.debug('Plugin %s does not accept user `%s` for authentication'
 
                      % (module, user))
 
            continue
 
        else:
 
            log.debug('Plugin %s accepted user `%s` for authentication'
 
                      % (module, user))
 

	
 
        log.info('Authenticating user using %s plugin' % plugin.__module__)
 
        # _authenticate is a wrapper for .auth() method of plugin.
 
        # it checks if .auth() sends proper data. For KallitheaExternalAuthPlugin
 
        # it also maps users to Database and maps the attributes returned
 
        # from .auth() to Kallithea database. If this function returns data
 
        # then auth is correct.
 
        plugin_user = plugin._authenticate(user, username, password,
 
        user_data = plugin._authenticate(user, username, password,
 
                                           plugin_settings,
 
                                           environ=environ or {})
 
        log.debug('PLUGIN USER DATA: %s' % plugin_user)
 
        log.debug('PLUGIN USER DATA: %s' % user_data)
 

	
 
        if plugin_user is not None:
 
        if user_data is not None:
 
            log.debug('Plugin returned proper authentication data')
 
            return plugin_user
 
            return user_data
 

	
 
        # we failed to Auth because .auth() method didn't return the user
 
        if username:
 
            log.warning("User `%s` failed to authenticate against %s"
 
                        % (username, plugin.__module__))
 
    return None
kallithea/lib/auth_modules/auth_container.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.auth_modules.auth_container
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Kallithea container based authentication plugin
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Created on Nov 17, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
from kallithea.lib import auth_modules
 
from kallithea.lib.utils2 import str2bool, safe_unicode
 
from kallithea.lib.compat import hybrid_property
 
from kallithea.model.db import User
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class KallitheaAuthPlugin(auth_modules.KallitheaExternalAuthPlugin):
 
    def __init__(self):
 
        pass
 

	
 
    @hybrid_property
 
    def name(self):
 
        return "container"
 

	
 
    @hybrid_property
 
    def is_container_auth(self):
 
        return True
 

	
 
    def settings(self):
 

	
 
        settings = [
 
            {
 
                "name": "header",
 
                "validator": self.validators.UnicodeString(strip=True, not_empty=True),
 
                "type": "string",
 
                "description": "Header to extract the user from",
 
                "default": "REMOTE_USER",
 
                "formname": "Header"
 
            },
 
            {
 
                "name": "fallback_header",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "Header to extract the user from when main one fails",
 
                "default": "HTTP_X_FORWARDED_USER",
 
                "formname": "Fallback header"
 
            },
 
            {
 
                "name": "clean_username",
 
                "validator": self.validators.StringBoolean(if_missing=False),
 
                "type": "bool",
 
                "description": "Perform cleaning of user, if passed user has @ in username "
 
                               "then first part before @ is taken. "
 
                               "If there's \\ in the username only the part after \\ is taken",
 
                "default": "True",
 
                "formname": "Clean username"
 
            },
 
        ]
 
        return settings
 

	
 
    def use_fake_password(self):
 
        return True
 

	
 
    def user_activation_state(self):
 
        def_user_perms = User.get_default_user().AuthUser.permissions['global']
 
        return 'hg.extern_activate.auto' in def_user_perms
 

	
 
    def _clean_username(self, username):
 
        # Removing realm and domain from username
 
        username = username.partition('@')[0]
 
        username = username.rpartition('\\')[2]
 
        return username
 

	
 
    def _get_username(self, environ, settings):
 
        username = None
 
        environ = environ or {}
 
        if not environ:
 
            log.debug('got empty environ: %s' % environ)
 

	
 
        settings = settings or {}
 
        if settings.get('header'):
 
            header = settings.get('header')
 
            username = environ.get(header)
 
            log.debug('extracted %s:%s' % (header, username))
 

	
 
        # fallback mode
 
        if not username and settings.get('fallback_header'):
 
            header = settings.get('fallback_header')
 
            username = environ.get(header)
 
            log.debug('extracted %s:%s' % (header, username))
 

	
 
        if username and str2bool(settings.get('clean_username')):
 
            log.debug('Received username %s from container' % username)
 
            username = self._clean_username(username)
 
            log.debug('New cleanup user is: %s' % username)
 
        return username
 

	
 
    def get_user(self, username=None, **kwargs):
 
        """
 
        Helper method for user fetching in plugins, by default it's using
 
        simple fetch by username, but this method can be customized in plugins
 
        eg. container auth plugin to fetch user by environ params
 
        :param username: username if given to fetch
 
        :param kwargs: extra arguments needed for user fetching.
 
        """
 
        environ = kwargs.get('environ') or {}
 
        settings = kwargs.get('settings') or {}
 
        username = self._get_username(environ, settings)
 
        # we got the username, so use default method now
 
        return super(KallitheaAuthPlugin, self).get_user(username)
 

	
 
    def auth(self, userobj, username, password, settings, **kwargs):
 
        """
 
        Gets the container_auth username (or email). It tries to get username
 
        from REMOTE_USER if this plugin is enabled, if that fails
 
        it tries to get username from HTTP_X_FORWARDED_USER if fallback header
 
        is set. clean_username extracts the username from this data if it's
 
        having @ in it.
 
        Return None on failure. On success, return a dictionary of the form:
 

	
 
            see: KallitheaAuthPluginBase.auth_func_attrs
 

	
 
        :param userobj:
 
        :param username:
 
        :param password:
 
        :param settings:
 
        :param kwargs:
 
        """
 
        environ = kwargs.get('environ')
 
        if not environ:
 
            log.debug('Empty environ data skipping...')
 
            return None
 

	
 
        if not userobj:
 
            userobj = self.get_user('', environ=environ, settings=settings)
 

	
 
        # we don't care passed username/password for container auth plugins.
 
        # only way to log in is using environ
 
        username = None
 
        if userobj:
 
            username = getattr(userobj, 'username')
 

	
 
        if not username:
 
            # we don't have any objects in DB, user doesn't exist, extract
 
            # username from environ based on the settings
 
            username = self._get_username(environ, settings)
 

	
 
        # if cannot fetch username, it's a no-go for this plugin to proceed
 
        if not username:
 
            return None
 

	
 
        # old attrs fetched from Kallithea database
 
        admin = getattr(userobj, 'admin', False)
 
        active = getattr(userobj, 'active', True)
 
        email = getattr(userobj, 'email', '')
 
        firstname = getattr(userobj, 'firstname', '')
 
        lastname = getattr(userobj, 'lastname', '')
 
        extern_type = getattr(userobj, 'extern_type', '')
 

	
 
        user_attrs = {
 
        user_data = {
 
            'username': username,
 
            'firstname': safe_unicode(firstname or username),
 
            'lastname': safe_unicode(lastname or ''),
 
            'groups': [],
 
            'email': email or '',
 
            'admin': admin or False,
 
            'active': active,
 
            'active_from_extern': True,
 
            'extern_name': username,
 
            'extern_type': extern_type,
 
        }
 

	
 
        log.info('user `%s` authenticated correctly' % user_attrs['username'])
 
        return user_attrs
 
        log.info('user `%s` authenticated correctly' % user_data['username'])
 
        return user_data
kallithea/lib/auth_modules/auth_crowd.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.auth_modules.auth_crowd
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Kallithea authentication plugin for Atlassian CROWD
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Created on Nov 17, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import base64
 
import logging
 
import urllib2
 
from kallithea.lib import auth_modules
 
from kallithea.lib.compat import json, formatted_json, hybrid_property
 
from kallithea.model.db import User
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class CrowdServer(object):
 
    def __init__(self, *args, **kwargs):
 
        """
 
        Create a new CrowdServer object that points to IP/Address 'host',
 
        on the given port, and using the given method (https/http). user and
 
        passwd can be set here or with set_credentials. If unspecified,
 
        "version" defaults to "latest".
 

	
 
        example::
 

	
 
            cserver = CrowdServer(host="127.0.0.1",
 
                                  port="8095",
 
                                  user="some_app",
 
                                  passwd="some_passwd",
 
                                  version="1")
 
        """
 
        if not "port" in kwargs:
 
            kwargs["port"] = "8095"
 
        self._logger = kwargs.get("logger", logging.getLogger(__name__))
 
        self._uri = "%s://%s:%s/crowd" % (kwargs.get("method", "http"),
 
                                    kwargs.get("host", "127.0.0.1"),
 
                                    kwargs.get("port", "8095"))
 
        self.set_credentials(kwargs.get("user", ""),
 
                             kwargs.get("passwd", ""))
 
        self._version = kwargs.get("version", "latest")
 
        self._url_list = None
 
        self._appname = "crowd"
 

	
 
    def set_credentials(self, user, passwd):
 
        self.user = user
 
        self.passwd = passwd
 
        self._make_opener()
 

	
 
    def _make_opener(self):
 
        mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
 
        mgr.add_password(None, self._uri, self.user, self.passwd)
 
        handler = urllib2.HTTPBasicAuthHandler(mgr)
 
        self.opener = urllib2.build_opener(handler)
 

	
 
    def _request(self, url, body=None, headers=None,
 
                 method=None, noformat=False,
 
                 empty_response_ok=False):
 
        _headers = {"Content-type": "application/json",
 
                    "Accept": "application/json"}
 
        if self.user and self.passwd:
 
            authstring = base64.b64encode("%s:%s" % (self.user, self.passwd))
 
            _headers["Authorization"] = "Basic %s" % authstring
 
        if headers:
 
            _headers.update(headers)
 
        log.debug("Sent crowd: \n%s"
 
                  % (formatted_json({"url": url, "body": body,
 
                                           "headers": _headers})))
 
        request = urllib2.Request(url, body, _headers)
 
        if method:
 
            request.get_method = lambda: method
 

	
 
        global msg
 
        msg = ""
 
        try:
 
            rdoc = self.opener.open(request)
 
            msg = "".join(rdoc.readlines())
 
            if not msg and empty_response_ok:
 
                rval = {}
 
                rval["status"] = True
 
                rval["error"] = "Response body was empty"
 
            elif not noformat:
 
                rval = json.loads(msg)
 
                rval["status"] = True
 
            else:
 
                rval = "".join(rdoc.readlines())
 
        except Exception, e:
 
            if not noformat:
 
                rval = {"status": False,
 
                        "body": body,
 
                        "error": str(e) + "\n" + msg}
 
            else:
 
                rval = None
 
        return rval
 

	
 
    def user_auth(self, username, password):
 
        """Authenticate a user against crowd. Returns brief information about
 
        the user."""
 
        url = ("%s/rest/usermanagement/%s/authentication?username=%s"
 
               % (self._uri, self._version, username))
 
        body = json.dumps({"value": password})
 
        return self._request(url, body)
 

	
 
    def user_groups(self, username):
 
        """Retrieve a list of groups to which this user belongs."""
 
        url = ("%s/rest/usermanagement/%s/user/group/nested?username=%s"
 
               % (self._uri, self._version, username))
 
        return self._request(url)
 

	
 

	
 
class KallitheaAuthPlugin(auth_modules.KallitheaExternalAuthPlugin):
 

	
 
    @hybrid_property
 
    def name(self):
 
        return "crowd"
 

	
 
    def settings(self):
 
        settings = [
 
            {
 
                "name": "host",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "The FQDN or IP of the Atlassian CROWD Server",
 
                "default": "127.0.0.1",
 
                "formname": "Host"
 
            },
 
            {
 
                "name": "port",
 
                "validator": self.validators.Number(strip=True),
 
                "type": "int",
 
                "description": "The Port in use by the Atlassian CROWD Server",
 
                "default": 8095,
 
                "formname": "Port"
 
            },
 
            {
 
                "name": "app_name",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "The Application Name to authenticate to CROWD",
 
                "default": "",
 
                "formname": "Application Name"
 
            },
 
            {
 
                "name": "app_password",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "The password to authenticate to CROWD",
 
                "default": "",
 
                "formname": "Application Password"
 
            },
 
            {
 
                "name": "admin_groups",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "A comma separated list of group names that identify users as Kallithea Administrators",
 
                "formname": "Admin Groups"
 
            }
 
        ]
 
        return settings
 

	
 
    def use_fake_password(self):
 
        return True
 

	
 
    def user_activation_state(self):
 
        def_user_perms = User.get_default_user().AuthUser.permissions['global']
 
        return 'hg.extern_activate.auto' in def_user_perms
 

	
 
    def auth(self, userobj, username, password, settings, **kwargs):
 
        """
 
        Given a user object (which may be null), username, a plaintext password,
 
        and a settings object (containing all the keys needed as listed in settings()),
 
        authenticate this user's login attempt.
 

	
 
        Return None on failure. On success, return a dictionary of the form:
 

	
 
            see: KallitheaAuthPluginBase.auth_func_attrs
 
        This is later validated for correctness
 
        """
 
        if not username or not password:
 
            log.debug('Empty username or password skipping...')
 
            return None
 

	
 
        log.debug("Crowd settings: \n%s" % (formatted_json(settings)))
 
        server = CrowdServer(**settings)
 
        server.set_credentials(settings["app_name"], settings["app_password"])
 
        crowd_user = server.user_auth(username, password)
 
        log.debug("Crowd returned: \n%s" % (formatted_json(crowd_user)))
 
        if not crowd_user["status"]:
 
            return None
 

	
 
        res = server.user_groups(crowd_user["name"])
 
        log.debug("Crowd groups: \n%s" % (formatted_json(res)))
 
        crowd_user["groups"] = [x["name"] for x in res["groups"]]
 

	
 
        # old attrs fetched from Kallithea database
 
        admin = getattr(userobj, 'admin', False)
 
        active = getattr(userobj, 'active', True)
 
        email = getattr(userobj, 'email', '')
 
        firstname = getattr(userobj, 'firstname', '')
 
        lastname = getattr(userobj, 'lastname', '')
 
        extern_type = getattr(userobj, 'extern_type', '')
 

	
 
        user_attrs = {
 
        user_data = {
 
            'username': username,
 
            'firstname': crowd_user["first-name"] or firstname,
 
            'lastname': crowd_user["last-name"] or lastname,
 
            'groups': crowd_user["groups"],
 
            'email': crowd_user["email"] or email,
 
            'admin': admin,
 
            'active': active,
 
            'active_from_extern': crowd_user.get('active'),
 
            'extern_name': crowd_user["name"],
 
            'extern_type': extern_type,
 
        }
 

	
 
        # set an admin if we're in admin_groups of crowd
 
        for group in settings["admin_groups"].split(","):
 
            if group in user_attrs["groups"]:
 
                user_attrs["admin"] = True
 
        log.debug("Final crowd user object: \n%s" % (formatted_json(user_attrs)))
 
        log.info('user %s authenticated correctly' % user_attrs['username'])
 
        return user_attrs
 
            if group in user_data["groups"]:
 
                user_data["admin"] = True
 
        log.debug("Final crowd user object: \n%s" % (formatted_json(user_data)))
 
        log.info('user %s authenticated correctly' % user_data['username'])
 
        return user_data
kallithea/lib/auth_modules/auth_internal.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.auth_modules.auth_internal
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Kallithea authentication plugin for built in internal auth
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Created on Nov 17, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import logging
 

	
 
from kallithea import EXTERN_TYPE_INTERNAL
 
from kallithea.lib import auth_modules
 
from kallithea.lib.compat import formatted_json, hybrid_property
 
from kallithea.model.db import User
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class KallitheaAuthPlugin(auth_modules.KallitheaAuthPluginBase):
 
    def __init__(self):
 
        pass
 

	
 
    @hybrid_property
 
    def name(self):
 
        return EXTERN_TYPE_INTERNAL
 

	
 
    def settings(self):
 
        return []
 

	
 
    def user_activation_state(self):
 
        def_user_perms = User.get_default_user().AuthUser.permissions['global']
 
        return 'hg.register.auto_activate' in def_user_perms
 

	
 
    def accepts(self, user, accepts_empty=True):
 
        """
 
        Custom accepts for this auth that doesn't accept empty users. We
 
        know that user exists in database.
 
        """
 
        return super(KallitheaAuthPlugin, self).accepts(user,
 
                                                        accepts_empty=False)
 

	
 
    def auth(self, userobj, username, password, settings, **kwargs):
 
        if not userobj:
 
            log.debug('userobj was:%s skipping' % (userobj, ))
 
            return None
 
        if userobj.extern_type != self.name:
 
            log.warning("userobj:%s extern_type mismatch got:`%s` expected:`%s`"
 
                     % (userobj, userobj.extern_type, self.name))
 
            return None
 

	
 
        user_attrs = {
 
        user_data = {
 
            "username": userobj.username,
 
            "firstname": userobj.firstname,
 
            "lastname": userobj.lastname,
 
            "groups": [],
 
            "email": userobj.email,
 
            "admin": userobj.admin,
 
            "active": userobj.active,
 
            "active_from_extern": userobj.active,
 
            "extern_name": userobj.user_id,
 
            'extern_type': userobj.extern_type,
 
        }
 

	
 
        log.debug(formatted_json(user_attrs))
 
        log.debug(formatted_json(user_data))
 
        if userobj.active:
 
            from kallithea.lib import auth
 
            password_match = auth.KallitheaCrypto.hash_check(password, userobj.password)
 
            if userobj.username == User.DEFAULT_USER and userobj.active:
 
                log.info('user %s authenticated correctly as anonymous user' %
 
                         username)
 
                return user_attrs
 
                return user_data
 

	
 
            elif userobj.username == username and password_match:
 
                log.info('user %s authenticated correctly' % user_attrs['username'])
 
                return user_attrs
 
                log.info('user %s authenticated correctly' % user_data['username'])
 
                return user_data
 
            log.error("user %s had a bad password" % username)
 
            return None
 
        else:
 
            log.warning('user %s tried auth but is disabled' % username)
 
            return None
kallithea/lib/auth_modules/auth_ldap.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.auth_modules.auth_ldap
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Kallithea authentication plugin for LDAP
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Created on Nov 17, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import logging
 
import traceback
 

	
 
from kallithea.lib import auth_modules
 
from kallithea.lib.compat import hybrid_property
 
from kallithea.lib.utils2 import safe_unicode, safe_str
 
from kallithea.lib.exceptions import (
 
    LdapConnectionError, LdapUsernameError, LdapPasswordError, LdapImportError
 
)
 
from kallithea.model.db import User
 

	
 
log = logging.getLogger(__name__)
 

	
 
try:
 
    import ldap
 
except ImportError:
 
    # means that python-ldap is not installed
 
    ldap = None
 

	
 

	
 
class AuthLdap(object):
 

	
 
    def __init__(self, server, base_dn, port=389, bind_dn='', bind_pass='',
 
                 tls_kind='PLAIN', tls_reqcert='DEMAND', ldap_version=3,
 
                 ldap_filter='(&(objectClass=user)(!(objectClass=computer)))',
 
                 search_scope='SUBTREE', attr_login='uid'):
 
        if ldap is None:
 
            raise LdapImportError
 

	
 
        self.ldap_version = ldap_version
 
        ldap_server_type = 'ldap'
 

	
 
        self.TLS_KIND = tls_kind
 

	
 
        if self.TLS_KIND == 'LDAPS':
 
            port = port or 689
 
            ldap_server_type = ldap_server_type + 's'
 

	
 
        OPT_X_TLS_DEMAND = 2
 
        self.TLS_REQCERT = getattr(ldap, 'OPT_X_TLS_%s' % tls_reqcert,
 
                                   OPT_X_TLS_DEMAND)
 
        # split server into list
 
        self.LDAP_SERVER_ADDRESS = server.split(',')
 
        self.LDAP_SERVER_PORT = port
 

	
 
        # USE FOR READ ONLY BIND TO LDAP SERVER
 
        self.LDAP_BIND_DN = safe_str(bind_dn)
 
        self.LDAP_BIND_PASS = safe_str(bind_pass)
 
        _LDAP_SERVERS = []
 
        for host in self.LDAP_SERVER_ADDRESS:
 
            _LDAP_SERVERS.append("%s://%s:%s" % (ldap_server_type,
 
                                                     host.replace(' ', ''),
 
                                                     self.LDAP_SERVER_PORT))
 
        self.LDAP_SERVER = str(', '.join(s for s in _LDAP_SERVERS))
 
        self.BASE_DN = safe_str(base_dn)
 
        self.LDAP_FILTER = safe_str(ldap_filter)
 
        self.SEARCH_SCOPE = getattr(ldap, 'SCOPE_%s' % search_scope)
 
        self.attr_login = attr_login
 

	
 
    def authenticate_ldap(self, username, password):
 
        """
 
        Authenticate a user via LDAP and return his/her LDAP properties.
 

	
 
        Raises AuthenticationError if the credentials are rejected, or
 
        EnvironmentError if the LDAP server can't be reached.
 

	
 
        :param username: username
 
        :param password: password
 
        """
 

	
 
        from kallithea.lib.helpers import chop_at
 

	
 
        uid = chop_at(username, "@%s" % self.LDAP_SERVER_ADDRESS)
 

	
 
        if not password:
 
            log.debug("Attempt to authenticate LDAP user "
 
                      "with blank password rejected.")
 
            raise LdapPasswordError()
 
        if "," in username:
 
            raise LdapUsernameError("invalid character in username: ,")
 
        try:
 
            if hasattr(ldap, 'OPT_X_TLS_CACERTDIR'):
 
                ldap.set_option(ldap.OPT_X_TLS_CACERTDIR,
 
                                '/etc/openldap/cacerts')
 
            ldap.set_option(ldap.OPT_REFERRALS, ldap.OPT_OFF)
 
            ldap.set_option(ldap.OPT_RESTART, ldap.OPT_ON)
 
            ldap.set_option(ldap.OPT_TIMEOUT, 20)
 
            ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
 
            ldap.set_option(ldap.OPT_TIMELIMIT, 15)
 
            if self.TLS_KIND != 'PLAIN':
 
                ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, self.TLS_REQCERT)
 
            server = ldap.initialize(self.LDAP_SERVER)
 
            if self.ldap_version == 2:
 
                server.protocol = ldap.VERSION2
 
            else:
 
                server.protocol = ldap.VERSION3
 

	
 
            if self.TLS_KIND == 'START_TLS':
 
                server.start_tls_s()
 

	
 
            if self.LDAP_BIND_DN and self.LDAP_BIND_PASS:
 
                log.debug('Trying simple_bind with password and given DN: %s'
 
                          % self.LDAP_BIND_DN)
 
                server.simple_bind_s(self.LDAP_BIND_DN, self.LDAP_BIND_PASS)
 

	
 
            filter_ = '(&%s(%s=%s))' % (self.LDAP_FILTER, self.attr_login,
 
                                        username)
 
            log.debug("Authenticating %r filter %s at %s", self.BASE_DN,
 
                      filter_, self.LDAP_SERVER)
 
            lobjects = server.search_ext_s(self.BASE_DN, self.SEARCH_SCOPE,
 
                                           filter_)
 

	
 
            if not lobjects:
 
                raise ldap.NO_SUCH_OBJECT()
 

	
 
            for (dn, _attrs) in lobjects:
 
                if dn is None:
 
                    continue
 

	
 
                try:
 
                    log.debug('Trying simple bind with %s' % dn)
 
                    server.simple_bind_s(dn, safe_str(password))
 
                    attrs = server.search_ext_s(dn, ldap.SCOPE_BASE,
 
                                                '(objectClass=*)')[0][1]
 
                    break
 

	
 
                except ldap.INVALID_CREDENTIALS:
 
                    log.debug("LDAP rejected password for user '%s' (%s): %s"
 
                              % (uid, username, dn))
 

	
 
            else:
 
                log.debug("No matching LDAP objects for authentication "
 
                          "of '%s' (%s)", uid, username)
 
                raise LdapPasswordError()
 

	
 
        except ldap.NO_SUCH_OBJECT:
 
            log.debug("LDAP says no such user '%s' (%s)" % (uid, username))
 
            raise LdapUsernameError()
 
        except ldap.SERVER_DOWN:
 
            raise LdapConnectionError("LDAP can't access authentication server")
 

	
 
        return dn, attrs
 

	
 

	
 
class KallitheaAuthPlugin(auth_modules.KallitheaExternalAuthPlugin):
 
    def __init__(self):
 
        self._logger = logging.getLogger(__name__)
 
        self._tls_kind_values = ["PLAIN", "LDAPS", "START_TLS"]
 
        self._tls_reqcert_values = ["NEVER", "ALLOW", "TRY", "DEMAND", "HARD"]
 
        self._search_scopes = ["BASE", "ONELEVEL", "SUBTREE"]
 

	
 
    @hybrid_property
 
    def name(self):
 
        return "ldap"
 

	
 
    def settings(self):
 
        settings = [
 
            {
 
                "name": "host",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "Host of the LDAP Server",
 
                "formname": "LDAP Host"
 
            },
 
            {
 
                "name": "port",
 
                "validator": self.validators.Number(strip=True, not_empty=True),
 
                "type": "string",
 
                "description": "Port that the LDAP server is listening on",
 
                "default": 389,
 
                "formname": "Port"
 
            },
 
            {
 
                "name": "dn_user",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "User to connect to LDAP",
 
                "formname": "Account"
 
            },
 
            {
 
                "name": "dn_pass",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "password",
 
                "description": "Password to connect to LDAP",
 
                "formname": "Password"
 
            },
 
            {
 
                "name": "tls_kind",
 
                "validator": self.validators.OneOf(self._tls_kind_values),
 
                "type": "select",
 
                "values": self._tls_kind_values,
 
                "description": "TLS Type",
 
                "default": 'PLAIN',
 
                "formname": "Connection Security"
 
            },
 
            {
 
                "name": "tls_reqcert",
 
                "validator": self.validators.OneOf(self._tls_reqcert_values),
 
                "type": "select",
 
                "values": self._tls_reqcert_values,
 
                "description": "Require Cert over TLS?",
 
                "formname": "Certificate Checks"
 
            },
 
            {
 
                "name": "base_dn",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "Base DN to search (e.g., dc=mydomain,dc=com)",
 
                "formname": "Base DN"
 
            },
 
            {
 
                "name": "filter",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "Filter to narrow results (e.g., ou=Users, etc)",
 
                "formname": "LDAP Search Filter"
 
            },
 
            {
 
                "name": "search_scope",
 
                "validator": self.validators.OneOf(self._search_scopes),
 
                "type": "select",
 
                "values": self._search_scopes,
 
                "description": "How deep to search LDAP",
 
                "formname": "LDAP Search Scope"
 
            },
 
            {
 
                "name": "attr_login",
 
                "validator": self.validators.AttrLoginValidator(not_empty=True, strip=True),
 
                "type": "string",
 
                "description": "LDAP Attribute to map to user name",
 
                "formname": "Login Attribute"
 
            },
 
            {
 
                "name": "attr_firstname",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "LDAP Attribute to map to first name",
 
                "formname": "First Name Attribute"
 
            },
 
            {
 
                "name": "attr_lastname",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "LDAP Attribute to map to last name",
 
                "formname": "Last Name Attribute"
 
            },
 
            {
 
                "name": "attr_email",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "LDAP Attribute to map to email address",
 
                "formname": "Email Attribute"
 
            }
 
        ]
 
        return settings
 

	
 
    def use_fake_password(self):
 
        return True
 

	
 
    def user_activation_state(self):
 
        def_user_perms = User.get_default_user().AuthUser.permissions['global']
 
        return 'hg.extern_activate.auto' in def_user_perms
 

	
 
    def auth(self, userobj, username, password, settings, **kwargs):
 
        """
 
        Given a user object (which may be null), username, a plaintext password,
 
        and a settings object (containing all the keys needed as listed in settings()),
 
        authenticate this user's login attempt.
 

	
 
        Return None on failure. On success, return a dictionary of the form:
 

	
 
            see: KallitheaAuthPluginBase.auth_func_attrs
 
        This is later validated for correctness
 
        """
 

	
 
        if not username or not password:
 
            log.debug('Empty username or password skipping...')
 
            return None
 

	
 
        kwargs = {
 
            'server': settings.get('host', ''),
 
            'base_dn': settings.get('base_dn', ''),
 
            'port': settings.get('port'),
 
            'bind_dn': settings.get('dn_user'),
 
            'bind_pass': settings.get('dn_pass'),
 
            'tls_kind': settings.get('tls_kind'),
 
            'tls_reqcert': settings.get('tls_reqcert'),
 
            'ldap_filter': settings.get('filter'),
 
            'search_scope': settings.get('search_scope'),
 
            'attr_login': settings.get('attr_login'),
 
            'ldap_version': 3,
 
        }
 

	
 
        if kwargs['bind_dn'] and not kwargs['bind_pass']:
 
            log.debug('Using dynamic binding.')
 
            kwargs['bind_dn'] = kwargs['bind_dn'].replace('$login', username)
 
            kwargs['bind_pass'] = password
 
        log.debug('Checking for ldap authentication')
 

	
 
        try:
 
            aldap = AuthLdap(**kwargs)
 
            (user_dn, ldap_attrs) = aldap.authenticate_ldap(username, password)
 
            log.debug('Got ldap DN response %s' % user_dn)
 

	
 
            get_ldap_attr = lambda k: ldap_attrs.get(settings.get(k), [''])[0]
 

	
 
            # old attrs fetched from Kallithea database
 
            admin = getattr(userobj, 'admin', False)
 
            active = getattr(userobj, 'active', self.user_activation_state())
 
            email = getattr(userobj, 'email', '')
 
            firstname = getattr(userobj, 'firstname', '')
 
            lastname = getattr(userobj, 'lastname', '')
 
            extern_type = getattr(userobj, 'extern_type', '')
 

	
 
            user_attrs = {
 
            user_data = {
 
                'username': username,
 
                'firstname': safe_unicode(get_ldap_attr('attr_firstname') or firstname),
 
                'lastname': safe_unicode(get_ldap_attr('attr_lastname') or lastname),
 
                'groups': [],
 
                'email': get_ldap_attr('attr_email') or email,
 
                'admin': admin,
 
                'active': active,
 
                "active_from_extern": None,
 
                'extern_name': user_dn,
 
                'extern_type': extern_type,
 
            }
 
            log.info('user %s authenticated correctly' % user_attrs['username'])
 
            return user_attrs
 
            log.info('user %s authenticated correctly' % user_data['username'])
 
            return user_data
 

	
 
        except (LdapUsernameError, LdapPasswordError, LdapImportError):
 
            log.error(traceback.format_exc())
 
            return None
 
        except (Exception,):
 
            log.error(traceback.format_exc())
 
            return None
kallithea/lib/auth_modules/auth_pam.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.auth_pam
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Kallithea authentication library for PAM
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Created on Apr 09, 2013
 
:author: Alexey Larikov
 
"""
 

	
 
import logging
 
import time
 
import pam
 
import pwd
 
import grp
 
import re
 
import socket
 
import threading
 

	
 
from kallithea.lib import auth_modules
 
from kallithea.lib.compat import formatted_json, hybrid_property
 

	
 
log = logging.getLogger(__name__)
 

	
 
# Cache to store PAM authenticated users
 
_auth_cache = dict()
 
_pam_lock = threading.Lock()
 

	
 

	
 
class KallitheaAuthPlugin(auth_modules.KallitheaExternalAuthPlugin):
 
    # PAM authnetication can be slow. Repository operations involve a lot of
 
    # auth calls. Little caching helps speedup push/pull operations significantly
 
    AUTH_CACHE_TTL = 4
 

	
 
    def __init__(self):
 
        global _auth_cache
 
        ts = time.time()
 
        cleared_cache = dict(
 
            [(k, v) for (k, v) in _auth_cache.items() if
 
             (v + KallitheaAuthPlugin.AUTH_CACHE_TTL > ts)])
 
        _auth_cache = cleared_cache
 

	
 
    @hybrid_property
 
    def name(self):
 
        return "pam"
 

	
 
    def settings(self):
 
        settings = [
 
            {
 
                "name": "service",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "PAM service name to use for authentication",
 
                "default": "login",
 
                "formname": "PAM service name"
 
            },
 
            {
 
                "name": "gecos",
 
                "validator": self.validators.UnicodeString(strip=True),
 
                "type": "string",
 
                "description": "Regex for extracting user name/email etc "
 
                               "from Unix userinfo",
 
                "default": "(?P<last_name>.+),\s*(?P<first_name>\w+)",
 
                "formname": "Gecos Regex"
 
            }
 
        ]
 
        return settings
 

	
 
    def use_fake_password(self):
 
        return True
 

	
 
    def auth(self, userobj, username, password, settings, **kwargs):
 
        if username not in _auth_cache:
 
            # Need lock here, as PAM authentication is not thread safe
 
            _pam_lock.acquire()
 
            try:
 
                auth_result = pam.authenticate(username, password,
 
                                               settings["service"])
 
                # cache result only if we properly authenticated
 
                if auth_result:
 
                    _auth_cache[username] = time.time()
 
            finally:
 
                _pam_lock.release()
 

	
 
            if not auth_result:
 
                log.error("PAM was unable to authenticate user: %s" % (username,))
 
                return None
 
        else:
 
            log.debug("Using cached auth for user: %s" % (username,))
 

	
 
        # old attrs fetched from Kallithea database
 
        admin = getattr(userobj, 'admin', False)
 
        active = getattr(userobj, 'active', True)
 
        email = getattr(userobj, 'email', '') or "%s@%s" % (username, socket.gethostname())
 
        firstname = getattr(userobj, 'firstname', '')
 
        lastname = getattr(userobj, 'lastname', '')
 
        extern_type = getattr(userobj, 'extern_type', '')
 

	
 
        user_attrs = {
 
        user_data = {
 
            'username': username,
 
            'firstname': firstname,
 
            'lastname': lastname,
 
            'groups': [g.gr_name for g in grp.getgrall() if username in g.gr_mem],
 
            'email': email,
 
            'admin': admin,
 
            'active': active,
 
            "active_from_extern": None,
 
            'extern_name': username,
 
            'extern_type': extern_type,
 
        }
 

	
 
        try:
 
            user_data = pwd.getpwnam(username)
 
            regex = settings["gecos"]
 
            match = re.search(regex, user_data.pw_gecos)
 
            if match:
 
                user_attrs["firstname"] = match.group('first_name')
 
                user_attrs["lastname"] = match.group('last_name')
 
                user_data["firstname"] = match.group('first_name')
 
                user_data["lastname"] = match.group('last_name')
 
        except Exception:
 
            log.warning("Cannot extract additional info for PAM user %s", username)
 
            pass
 

	
 
        log.debug("pamuser: \n%s" % formatted_json(user_attrs))
 
        log.info('user %s authenticated correctly' % user_attrs['username'])
 
        return user_attrs
 
        log.debug("pamuser: \n%s" % formatted_json(user_data))
 
        log.info('user %s authenticated correctly' % user_data['username'])
 
        return user_data
kallithea/lib/base.py
Show inline comments
 
@@ -27,495 +27,495 @@ Original author and date, and relevant c
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import datetime
 
import logging
 
import time
 
import traceback
 

	
 
import webob.exc
 
import paste.httpexceptions
 
import paste.auth.basic
 
import paste.httpheaders
 

	
 
from pylons import config, tmpl_context as c, request, session, url
 
from pylons.controllers import WSGIController
 
from pylons.controllers.util import redirect
 
from pylons.templating import render_mako as render  # don't remove this import
 
from pylons.i18n.translation import _
 

	
 
from kallithea import __version__, BACKENDS
 

	
 
from kallithea.lib.utils2 import str2bool, safe_unicode, AttributeDict,\
 
    safe_str, safe_int
 
from kallithea.lib import auth_modules
 
from kallithea.lib.auth import AuthUser, HasPermissionAnyMiddleware
 
from kallithea.lib.utils import get_repo_slug
 
from kallithea.lib.exceptions import UserCreationError
 
from kallithea.lib.vcs.exceptions import RepositoryError, EmptyRepositoryError, ChangesetDoesNotExistError
 
from kallithea.model import meta
 

	
 
from kallithea.model.db import Repository, Ui, User, Setting
 
from kallithea.model.notification import NotificationModel
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.pull_request import PullRequestModel
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def _filter_proxy(ip):
 
    """
 
    HEADERS can have multiple ips inside the left-most being the original
 
    client, and each successive proxy that passed the request adding the IP
 
    address where it received the request from.
 

	
 
    :param ip:
 
    """
 
    if ',' in ip:
 
        _ips = ip.split(',')
 
        _first_ip = _ips[0].strip()
 
        log.debug('Got multiple IPs %s, using %s' % (','.join(_ips), _first_ip))
 
        return _first_ip
 
    return ip
 

	
 

	
 
def _get_ip_addr(environ):
 
    proxy_key = 'HTTP_X_REAL_IP'
 
    proxy_key2 = 'HTTP_X_FORWARDED_FOR'
 
    def_key = 'REMOTE_ADDR'
 

	
 
    ip = environ.get(proxy_key)
 
    if ip:
 
        return _filter_proxy(ip)
 

	
 
    ip = environ.get(proxy_key2)
 
    if ip:
 
        return _filter_proxy(ip)
 

	
 
    ip = environ.get(def_key, '0.0.0.0')
 
    return _filter_proxy(ip)
 

	
 

	
 
def _get_access_path(environ):
 
    path = environ.get('PATH_INFO')
 
    org_req = environ.get('pylons.original_request')
 
    if org_req:
 
        path = org_req.environ.get('PATH_INFO')
 
    return path
 

	
 

	
 
def log_in_user(user, remember, is_external_auth):
 
    """
 
    Log a `User` in and update session and cookies. If `remember` is True,
 
    the session cookie is set to expire in a year; otherwise, it expires at
 
    the end of the browser session.
 

	
 
    Returns populated `AuthUser` object.
 
    """
 
    user.update_lastlogin()
 
    meta.Session().commit()
 

	
 
    auth_user = AuthUser(dbuser=user,
 
                         is_external_auth=is_external_auth)
 
    auth_user.set_authenticated()
 

	
 
    # Start new session to prevent session fixation attacks.
 
    session.invalidate()
 
    session['authuser'] = cookie = auth_user.to_cookie()
 

	
 
    # If they want to be remembered, update the cookie
 
    if remember:
 
        t = datetime.datetime.now() + datetime.timedelta(days=365)
 
        session._set_cookie_expires(t)
 

	
 
    session.save()
 

	
 
    log.info('user %s is now authenticated and stored in '
 
             'session, session attrs %s', user.username, cookie)
 

	
 
    # dumps session attrs back to cookie
 
    session._update_cookie_out()
 

	
 
    return auth_user
 

	
 

	
 
class BasicAuth(paste.auth.basic.AuthBasicAuthenticator):
 

	
 
    def __init__(self, realm, authfunc, auth_http_code=None):
 
        self.realm = realm
 
        self.authfunc = authfunc
 
        self._rc_auth_http_code = auth_http_code
 

	
 
    def build_authentication(self):
 
        head = paste.httpheaders.WWW_AUTHENTICATE.tuples('Basic realm="%s"' % self.realm)
 
        if self._rc_auth_http_code and self._rc_auth_http_code == '403':
 
            # return 403 if alternative http return code is specified in
 
            # Kallithea config
 
            return paste.httpexceptions.HTTPForbidden(headers=head)
 
        return paste.httpexceptions.HTTPUnauthorized(headers=head)
 

	
 
    def authenticate(self, environ):
 
        authorization = paste.httpheaders.AUTHORIZATION(environ)
 
        if not authorization:
 
            return self.build_authentication()
 
        (authmeth, auth) = authorization.split(' ', 1)
 
        if 'basic' != authmeth.lower():
 
            return self.build_authentication()
 
        auth = auth.strip().decode('base64')
 
        _parts = auth.split(':', 1)
 
        if len(_parts) == 2:
 
            username, password = _parts
 
            if self.authfunc(username, password, environ) is not None:
 
                return username
 
        return self.build_authentication()
 

	
 
    __call__ = authenticate
 

	
 

	
 
class BaseVCSController(object):
 

	
 
    def __init__(self, application, config):
 
        self.application = application
 
        self.config = config
 
        # base path of repo locations
 
        self.basepath = self.config['base_path']
 
        # authenticate this VCS request using the authentication modules
 
        self.authenticate = BasicAuth('', auth_modules.authenticate,
 
                                      config.get('auth_ret_code'))
 
        self.ip_addr = '0.0.0.0'
 

	
 
    def _handle_request(self, environ, start_response):
 
        raise NotImplementedError()
 

	
 
    def _get_by_id(self, repo_name):
 
        """
 
        Gets a special pattern _<ID> from clone url and tries to replace it
 
        with a repository_name for support of _<ID> permanent URLs
 

	
 
        :param repo_name:
 
        """
 

	
 
        data = repo_name.split('/')
 
        if len(data) >= 2:
 
            from kallithea.lib.utils import get_repo_by_id
 
            by_id_match = get_repo_by_id(repo_name)
 
            if by_id_match:
 
                data[1] = by_id_match
 

	
 
        return '/'.join(data)
 

	
 
    def _invalidate_cache(self, repo_name):
 
        """
 
        Sets cache for this repository for invalidation on next access
 

	
 
        :param repo_name: full repo name, also a cache key
 
        """
 
        ScmModel().mark_for_invalidation(repo_name)
 

	
 
    def _check_permission(self, action, user, repo_name, ip_addr=None):
 
        """
 
        Checks permissions using action (push/pull) user and repository
 
        name
 

	
 
        :param action: push or pull action
 
        :param user: `User` instance
 
        :param repo_name: repository name
 
        """
 
        # check IP
 
        ip_allowed = AuthUser.check_ip_allowed(user, ip_addr)
 
        if ip_allowed:
 
            log.info('Access for IP:%s allowed' % (ip_addr,))
 
        else:
 
            return False
 

	
 
        if action == 'push':
 
            if not HasPermissionAnyMiddleware('repository.write',
 
                                              'repository.admin')(user,
 
                                                                  repo_name):
 
                return False
 

	
 
        else:
 
            #any other action need at least read permission
 
            if not HasPermissionAnyMiddleware('repository.read',
 
                                              'repository.write',
 
                                              'repository.admin')(user,
 
                                                                  repo_name):
 
                return False
 

	
 
        return True
 

	
 
    def _get_ip_addr(self, environ):
 
        return _get_ip_addr(environ)
 

	
 
    def _check_ssl(self, environ):
 
        """
 
        Checks the SSL check flag and returns False if SSL is not present
 
        and required True otherwise
 
        """
 
        #check if we have SSL required  ! if not it's a bad request !
 
        if str2bool(Ui.get_by_key('push_ssl').ui_value):
 
            org_proto = environ.get('wsgi._org_proto', environ['wsgi.url_scheme'])
 
            if org_proto != 'https':
 
                log.debug('proto is %s and SSL is required BAD REQUEST !'
 
                          % org_proto)
 
                return False
 
        return True
 

	
 
    def _check_locking_state(self, environ, action, repo, user_id):
 
        """
 
        Checks locking on this repository, if locking is enabled and lock is
 
        present returns a tuple of make_lock, locked, locked_by.
 
        make_lock can have 3 states None (do nothing) True, make lock
 
        False release lock, This value is later propagated to hooks, which
 
        do the locking. Think about this as signals passed to hooks what to do.
 

	
 
        """
 
        locked = False  # defines that locked error should be thrown to user
 
        make_lock = None
 
        repo = Repository.get_by_repo_name(repo)
 
        user = User.get(user_id)
 

	
 
        # this is kind of hacky, but due to how mercurial handles client-server
 
        # server see all operation on changeset; bookmarks, phases and
 
        # obsolescence marker in different transaction, we don't want to check
 
        # locking on those
 
        obsolete_call = environ['QUERY_STRING'] in ['cmd=listkeys',]
 
        locked_by = repo.locked
 
        if repo and repo.enable_locking and not obsolete_call:
 
            if action == 'push':
 
                #check if it's already locked !, if it is compare users
 
                user_id, _date = repo.locked
 
                if user.user_id == user_id:
 
                    log.debug('Got push from user %s, now unlocking' % (user))
 
                    # unlock if we have push from user who locked
 
                    make_lock = False
 
                else:
 
                    # we're not the same user who locked, ban with 423 !
 
                    locked = True
 
            if action == 'pull':
 
                if repo.locked[0] and repo.locked[1]:
 
                    locked = True
 
                else:
 
                    log.debug('Setting lock on repo %s by %s' % (repo, user))
 
                    make_lock = True
 

	
 
        else:
 
            log.debug('Repository %s do not have locking enabled' % (repo))
 
        log.debug('FINAL locking values make_lock:%s,locked:%s,locked_by:%s'
 
                  % (make_lock, locked, locked_by))
 
        return make_lock, locked, locked_by
 

	
 
    def __call__(self, environ, start_response):
 
        start = time.time()
 
        try:
 
            return self._handle_request(environ, start_response)
 
        finally:
 
            log = logging.getLogger('kallithea.' + self.__class__.__name__)
 
            log.debug('Request time: %.3fs' % (time.time() - start))
 
            meta.Session.remove()
 

	
 

	
 
class BaseController(WSGIController):
 

	
 
    def __before__(self):
 
        """
 
        __before__ is called before controller methods and after __call__
 
        """
 
        c.kallithea_version = __version__
 
        rc_config = Setting.get_app_settings()
 

	
 
        # Visual options
 
        c.visual = AttributeDict({})
 

	
 
        ## DB stored
 
        c.visual.show_public_icon = str2bool(rc_config.get('show_public_icon'))
 
        c.visual.show_private_icon = str2bool(rc_config.get('show_private_icon'))
 
        c.visual.stylify_metatags = str2bool(rc_config.get('stylify_metatags'))
 
        c.visual.dashboard_items = safe_int(rc_config.get('dashboard_items', 100))
 
        c.visual.admin_grid_items = safe_int(rc_config.get('admin_grid_items', 100))
 
        c.visual.repository_fields = str2bool(rc_config.get('repository_fields'))
 
        c.visual.show_version = str2bool(rc_config.get('show_version'))
 
        c.visual.use_gravatar = str2bool(rc_config.get('use_gravatar'))
 
        c.visual.gravatar_url = rc_config.get('gravatar_url')
 

	
 
        c.ga_code = rc_config.get('ga_code')
 
        # TODO: replace undocumented backwards compatibility hack with db upgrade and rename ga_code
 
        if c.ga_code and '<' not in c.ga_code:
 
            c.ga_code = '''<script type="text/javascript">
 
                var _gaq = _gaq || [];
 
                _gaq.push(['_setAccount', '%s']);
 
                _gaq.push(['_trackPageview']);
 

	
 
                (function() {
 
                    var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true;
 
                    ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js';
 
                    var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s);
 
                    })();
 
            </script>''' % c.ga_code
 
        c.site_name = rc_config.get('title')
 
        c.clone_uri_tmpl = rc_config.get('clone_uri_tmpl')
 

	
 
        ## INI stored
 
        c.visual.allow_repo_location_change = str2bool(config.get('allow_repo_location_change', True))
 
        c.visual.allow_custom_hooks_settings = str2bool(config.get('allow_custom_hooks_settings', True))
 

	
 
        c.instance_id = config.get('instance_id')
 
        c.issues_url = config.get('bugtracker', url('issues_url'))
 
        # END CONFIG VARS
 

	
 
        c.repo_name = get_repo_slug(request)  # can be empty
 
        c.backends = BACKENDS.keys()
 
        c.unread_notifications = NotificationModel()\
 
                        .get_unread_cnt_for_user(c.authuser.user_id)
 

	
 
        self.cut_off_limit = safe_int(config.get('cut_off_limit'))
 

	
 
        c.my_pr_count = PullRequestModel().get_pullrequest_cnt_for_user(c.authuser.user_id)
 

	
 
        self.sa = meta.Session
 
        self.scm_model = ScmModel(self.sa)
 

	
 
    @staticmethod
 
    def _determine_auth_user(api_key, session_authuser):
 
        """
 
        Create an `AuthUser` object given the API key (if any) and the
 
        value of the authuser session cookie.
 
        """
 

	
 
        # Authenticate by API key
 
        if api_key:
 
            # when using API_KEY we are sure user exists.
 
            return AuthUser(dbuser=User.get_by_api_key(api_key),
 
                            is_external_auth=True)
 

	
 
        # Authenticate by session cookie
 
        # In ancient login sessions, 'authuser' may not be a dict.
 
        # In that case, the user will have to log in again.
 
        if isinstance(session_authuser, dict):
 
            try:
 
                return AuthUser.from_cookie(session_authuser)
 
            except UserCreationError as e:
 
                # container auth or other auth functions that create users on
 
                # the fly can throw UserCreationError to signal issues with
 
                # user creation. Explanation should be provided in the
 
                # exception object.
 
                from kallithea.lib import helpers as h
 
                h.flash(e, 'error', logf=log.error)
 

	
 
        # Authenticate by auth_container plugin (if enabled)
 
        if any(
 
            auth_modules.importplugin(name).is_container_auth
 
            for name in Setting.get_auth_plugins()
 
        ):
 
            try:
 
                auth_info = auth_modules.authenticate('', '', request.environ)
 
                user_info = auth_modules.authenticate('', '', request.environ)
 
            except UserCreationError as e:
 
                from kallithea.lib import helpers as h
 
                h.flash(e, 'error', logf=log.error)
 
            else:
 
                if auth_info is not None:
 
                    username = auth_info['username']
 
                if user_info is not None:
 
                    username = user_info['username']
 
                    user = User.get_by_username(username, case_insensitive=True)
 
                    return log_in_user(user, remember=False,
 
                                       is_external_auth=True)
 

	
 
        # User is anonymous
 
        return AuthUser()
 

	
 
    def __call__(self, environ, start_response):
 
        """Invoke the Controller"""
 

	
 
        # WSGIController.__call__ dispatches to the Controller method
 
        # the request is routed to. This routing information is
 
        # available in environ['pylons.routes_dict']
 
        try:
 
            self.ip_addr = _get_ip_addr(environ)
 
            # make sure that we update permissions each time we call controller
 

	
 
            #set globals for auth user
 
            self.authuser = c.authuser = request.user = self._determine_auth_user(
 
                request.GET.get('api_key'),
 
                session.get('authuser'),
 
            )
 

	
 
            log.info('IP: %s User: %s accessed %s',
 
                self.ip_addr, self.authuser,
 
                safe_unicode(_get_access_path(environ)),
 
            )
 
            return WSGIController.__call__(self, environ, start_response)
 
        finally:
 
            meta.Session.remove()
 

	
 

	
 
class BaseRepoController(BaseController):
 
    """
 
    Base class for controllers responsible for loading all needed data for
 
    repository loaded items are
 

	
 
    c.db_repo_scm_instance: instance of scm repository
 
    c.db_repo: instance of db
 
    c.repository_followers: number of followers
 
    c.repository_forks: number of forks
 
    c.repository_following: weather the current user is following the current repo
 
    """
 

	
 
    def __before__(self):
 
        super(BaseRepoController, self).__before__()
 
        if c.repo_name:  # extracted from routes
 
            _dbr = Repository.get_by_repo_name(c.repo_name)
 
            if not _dbr:
 
                return
 

	
 
            log.debug('Found repository in database %s with state `%s`'
 
                      % (safe_unicode(_dbr), safe_unicode(_dbr.repo_state)))
 
            route = getattr(request.environ.get('routes.route'), 'name', '')
 

	
 
            # allow to delete repos that are somehow damages in filesystem
 
            if route in ['delete_repo']:
 
                return
 

	
 
            if _dbr.repo_state in [Repository.STATE_PENDING]:
 
                if route in ['repo_creating_home']:
 
                    return
 
                check_url = url('repo_creating_home', repo_name=c.repo_name)
 
                return redirect(check_url)
 

	
 
            dbr = c.db_repo = _dbr
 
            c.db_repo_scm_instance = c.db_repo.scm_instance
 
            if c.db_repo_scm_instance is None:
 
                log.error('%s this repository is present in database but it '
 
                          'cannot be created as an scm instance', c.repo_name)
 
                from kallithea.lib import helpers as h
 
                h.flash(h.literal(_('Repository not found in the filesystem')),
 
                        category='error')
 
                raise paste.httpexceptions.HTTPNotFound()
 

	
 
            # some globals counter for menu
 
            c.repository_followers = self.scm_model.get_followers(dbr)
 
            c.repository_forks = self.scm_model.get_forks(dbr)
 
            c.repository_pull_requests = self.scm_model.get_pull_requests(dbr)
 
            c.repository_following = self.scm_model.is_following_repo(
 
                                    c.repo_name, self.authuser.user_id)
 

	
 
    @staticmethod
 
    def _get_ref_rev(repo, ref_type, ref_name, returnempty=False):
 
        """
 
        Safe way to get changeset. If error occurs show error.
 
        """
 
        from kallithea.lib import helpers as h
 
        try:
 
            return repo.scm_instance.get_ref_revision(ref_type, ref_name)
 
        except EmptyRepositoryError as e:
 
            if returnempty:
 
                return repo.scm_instance.EMPTY_CHANGESET
 
            h.flash(h.literal(_('There are no changesets yet')),
 
                    category='error')
 
            raise webob.exc.HTTPNotFound()
 
        except ChangesetDoesNotExistError as e:
 
            h.flash(h.literal(_('Changeset not found')),
 
                    category='error')
 
            raise webob.exc.HTTPNotFound()
 
        except RepositoryError as e:
 
            log.error(traceback.format_exc())
 
            h.flash(safe_str(e), category='error')
 
            raise webob.exc.HTTPBadRequest()
0 comments (0 inline, 0 general)