Files
@ cb472dfe807d
Branch filter:
Location: kallithea/kallithea/lib/auth_modules/auth_ldap.py
cb472dfe807d
14.6 KiB
text/x-python
auth: drop active_from_extern from internal auth API
Modules should never auth a user if the auth source knows the user is inactive.
Also, it is too late and unreliable to disable users when they try to log in.
There is thus no need for this concept.
Only the crowd module had some traces of actual active_from_extern usage. The
'active' flag for crowd users was fully controlled from crowd. Now, Instead,
just let crowd reject authentication of users that are inactive in crowd, and
leave the internal Kallithea 'active' flag under admin control.
Modules should never auth a user if the auth source knows the user is inactive.
Also, it is too late and unreliable to disable users when they try to log in.
There is thus no need for this concept.
Only the crowd module had some traces of actual active_from_extern usage. The
'active' flag for crowd users was fully controlled from crowd. Now, Instead,
just let crowd reject authentication of users that are inactive in crowd, and
leave the internal Kallithea 'active' flag under admin control.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 | # -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
kallithea.lib.auth_modules.auth_ldap
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Kallithea authentication plugin for LDAP
This file was forked by the Kallithea project in July 2014.
Original author and date, and relevant copyright and licensing information is below:
:created_on: Created on Nov 17, 2010
:author: marcink
:copyright: (c) 2013 RhodeCode GmbH, and others.
:license: GPLv3, see LICENSE.md for more details.
"""
import logging
import traceback
from kallithea.lib import auth_modules
from kallithea.lib.compat import hybrid_property
from kallithea.lib.utils2 import safe_unicode, safe_str
from kallithea.lib.exceptions import (
LdapConnectionError, LdapUsernameError, LdapPasswordError, LdapImportError
)
from kallithea.model.db import User
log = logging.getLogger(__name__)
try:
import ldap
import ldap.filter
except ImportError:
# means that python-ldap is not installed
ldap = None
class AuthLdap(object):
def __init__(self, server, base_dn, port=None, bind_dn='', bind_pass='',
tls_kind='LDAPS', tls_reqcert='DEMAND', cacertdir=None, ldap_version=3,
ldap_filter='(&(objectClass=user)(!(objectClass=computer)))',
search_scope='SUBTREE', attr_login='uid'):
if ldap is None:
raise LdapImportError
self.ldap_version = ldap_version
self.TLS_KIND = tls_kind
OPT_X_TLS_DEMAND = 2
self.TLS_REQCERT = getattr(ldap, 'OPT_X_TLS_%s' % tls_reqcert,
OPT_X_TLS_DEMAND)
self.cacertdir = cacertdir
protocol = 'ldaps' if self.TLS_KIND == 'LDAPS' else 'ldap'
if not port:
port = 636 if self.TLS_KIND == 'LDAPS' else 389
self.LDAP_SERVER = str(', '.join(
"%s://%s:%s" % (protocol,
host.strip(),
port)
for host in server.split(',')))
self.LDAP_BIND_DN = safe_str(bind_dn)
self.LDAP_BIND_PASS = safe_str(bind_pass)
self.BASE_DN = safe_str(base_dn)
self.LDAP_FILTER = safe_str(ldap_filter)
self.SEARCH_SCOPE = getattr(ldap, 'SCOPE_%s' % search_scope)
self.attr_login = attr_login
def authenticate_ldap(self, username, password):
"""
Authenticate a user via LDAP and return his/her LDAP properties.
Raises AuthenticationError if the credentials are rejected, or
EnvironmentError if the LDAP server can't be reached.
:param username: username
:param password: password
"""
if not password:
log.debug("Attempt to authenticate LDAP user "
"with blank password rejected.")
raise LdapPasswordError()
if "," in username:
raise LdapUsernameError("invalid character in username: ,")
try:
if self.cacertdir:
if hasattr(ldap, 'OPT_X_TLS_CACERTDIR'):
ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, self.cacertdir)
else:
log.debug("OPT_X_TLS_CACERTDIR is not available - can't set %s", self.cacertdir)
ldap.set_option(ldap.OPT_REFERRALS, ldap.OPT_OFF)
ldap.set_option(ldap.OPT_RESTART, ldap.OPT_ON)
ldap.set_option(ldap.OPT_TIMEOUT, 20)
ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
ldap.set_option(ldap.OPT_TIMELIMIT, 15)
if self.TLS_KIND != 'PLAIN':
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, self.TLS_REQCERT)
server = ldap.initialize(self.LDAP_SERVER)
if self.ldap_version == 2:
server.protocol = ldap.VERSION2
else:
server.protocol = ldap.VERSION3
if self.TLS_KIND == 'START_TLS':
server.start_tls_s()
if self.LDAP_BIND_DN and self.LDAP_BIND_PASS:
log.debug('Trying simple_bind with password and given DN: %s',
self.LDAP_BIND_DN)
server.simple_bind_s(self.LDAP_BIND_DN, self.LDAP_BIND_PASS)
filter_ = '(&%s(%s=%s))' % (self.LDAP_FILTER,
ldap.filter.escape_filter_chars(self.attr_login),
ldap.filter.escape_filter_chars(username))
log.debug("Authenticating %r filter %s at %s", self.BASE_DN,
filter_, self.LDAP_SERVER)
lobjects = server.search_ext_s(self.BASE_DN, self.SEARCH_SCOPE,
filter_)
if not lobjects:
raise ldap.NO_SUCH_OBJECT()
for (dn, _attrs) in lobjects:
if dn is None:
continue
try:
log.debug('Trying simple bind with %s', dn)
server.simple_bind_s(dn, safe_str(password))
results = server.search_ext_s(dn, ldap.SCOPE_BASE,
'(objectClass=*)')
if len(results) == 1:
dn_, attrs = results[0]
assert dn_ == dn
return dn, attrs
except ldap.INVALID_CREDENTIALS:
log.debug("LDAP rejected password for user '%s': %s",
username, dn)
continue # accept authentication as another ldap user with same username
log.debug("No matching LDAP objects for authentication "
"of '%s'", username)
raise LdapPasswordError()
except ldap.NO_SUCH_OBJECT:
log.debug("LDAP says no such user '%s'", username)
raise LdapUsernameError()
except ldap.SERVER_DOWN:
# [0] might be {'info': "TLS error -8179:Peer's Certificate issuer is not recognized.", 'desc': "Can't contact LDAP server"}
raise LdapConnectionError("LDAP can't connect to authentication server")
class KallitheaAuthPlugin(auth_modules.KallitheaExternalAuthPlugin):
def __init__(self):
self._logger = logging.getLogger(__name__)
self._tls_kind_values = ["PLAIN", "LDAPS", "START_TLS"]
self._tls_reqcert_values = ["NEVER", "ALLOW", "TRY", "DEMAND", "HARD"]
self._search_scopes = ["BASE", "ONELEVEL", "SUBTREE"]
@hybrid_property
def name(self):
return "ldap"
def settings(self):
settings = [
{
"name": "host",
"validator": self.validators.UnicodeString(strip=True),
"type": "string",
"description": "Host of the LDAP Server",
"formname": "LDAP Host"
},
{
"name": "port",
"validator": self.validators.Number(strip=True),
"type": "string",
"description": "Port that the LDAP server is listening on. Defaults to 389 for PLAIN/START_TLS and 636 for LDAPS.",
"default": "",
"formname": "Custom LDAP Port"
},
{
"name": "dn_user",
"validator": self.validators.UnicodeString(strip=True),
"type": "string",
"description": "User to connect to LDAP",
"formname": "Account"
},
{
"name": "dn_pass",
"validator": self.validators.UnicodeString(strip=True),
"type": "password",
"description": "Password to connect to LDAP",
"formname": "Password"
},
{
"name": "tls_kind",
"validator": self.validators.OneOf(self._tls_kind_values),
"type": "select",
"values": self._tls_kind_values,
"description": "TLS Type",
"default": 'LDAPS',
"formname": "Connection Security"
},
{
"name": "tls_reqcert",
"validator": self.validators.OneOf(self._tls_reqcert_values),
"type": "select",
"values": self._tls_reqcert_values,
"description": "Require Cert over TLS?",
"formname": "Certificate Checks"
},
{
"name": "cacertdir",
"validator": self.validators.UnicodeString(strip=True),
"type": "string",
"description": "Optional: Custom CA certificate directory for validating LDAPS",
"formname": "Custom CA Certificates"
},
{
"name": "base_dn",
"validator": self.validators.UnicodeString(strip=True),
"type": "string",
"description": "Base DN to search (e.g., dc=mydomain,dc=com)",
"formname": "Base DN"
},
{
"name": "filter",
"validator": self.validators.UnicodeString(strip=True),
"type": "string",
"description": "Filter to narrow results (e.g., ou=Users, etc)",
"formname": "LDAP Search Filter"
},
{
"name": "search_scope",
"validator": self.validators.OneOf(self._search_scopes),
"type": "select",
"values": self._search_scopes,
"description": "How deep to search LDAP",
"formname": "LDAP Search Scope"
},
{
"name": "attr_login",
"validator": self.validators.AttrLoginValidator(not_empty=True, strip=True),
"type": "string",
"description": "LDAP Attribute to map to user name",
"formname": "Login Attribute"
},
{
"name": "attr_firstname",
"validator": self.validators.UnicodeString(strip=True),
"type": "string",
"description": "LDAP Attribute to map to first name",
"formname": "First Name Attribute"
},
{
"name": "attr_lastname",
"validator": self.validators.UnicodeString(strip=True),
"type": "string",
"description": "LDAP Attribute to map to last name",
"formname": "Last Name Attribute"
},
{
"name": "attr_email",
"validator": self.validators.UnicodeString(strip=True),
"type": "string",
"description": "LDAP Attribute to map to email address",
"formname": "Email Attribute"
}
]
return settings
def use_fake_password(self):
return True
def user_activation_state(self):
def_user_perms = User.get_default_user().AuthUser.permissions['global']
return 'hg.extern_activate.auto' in def_user_perms
def auth(self, userobj, username, password, settings, **kwargs):
"""
Given a user object (which may be null), username, a plaintext password,
and a settings object (containing all the keys needed as listed in settings()),
authenticate this user's login attempt.
Return None on failure. On success, return a dictionary of the form:
see: KallitheaAuthPluginBase.auth_func_attrs
This is later validated for correctness
"""
if not username or not password:
log.debug('Empty username or password skipping...')
return None
kwargs = {
'server': settings.get('host', ''),
'base_dn': settings.get('base_dn', ''),
'port': settings.get('port'),
'bind_dn': settings.get('dn_user'),
'bind_pass': settings.get('dn_pass'),
'tls_kind': settings.get('tls_kind'),
'tls_reqcert': settings.get('tls_reqcert'),
'cacertdir': settings.get('cacertdir'),
'ldap_filter': settings.get('filter'),
'search_scope': settings.get('search_scope'),
'attr_login': settings.get('attr_login'),
'ldap_version': 3,
}
if kwargs['bind_dn'] and not kwargs['bind_pass']:
log.debug('Using dynamic binding.')
kwargs['bind_dn'] = kwargs['bind_dn'].replace('$login', username)
kwargs['bind_pass'] = password
log.debug('Checking for ldap authentication')
try:
aldap = AuthLdap(**kwargs)
(user_dn, ldap_attrs) = aldap.authenticate_ldap(username, password)
log.debug('Got ldap DN response %s', user_dn)
get_ldap_attr = lambda k: ldap_attrs.get(settings.get(k), [''])[0]
# old attrs fetched from Kallithea database
admin = getattr(userobj, 'admin', False)
active = getattr(userobj, 'active', self.user_activation_state())
email = getattr(userobj, 'email', '')
firstname = getattr(userobj, 'firstname', '')
lastname = getattr(userobj, 'lastname', '')
user_data = {
'username': username,
'firstname': safe_unicode(get_ldap_attr('attr_firstname') or firstname),
'lastname': safe_unicode(get_ldap_attr('attr_lastname') or lastname),
'groups': [],
'email': get_ldap_attr('attr_email') or email,
'admin': admin,
'active': active,
'extern_name': user_dn,
}
log.info('user %s authenticated correctly', user_data['username'])
return user_data
except LdapUsernameError:
log.info('Error authenticating %s with LDAP: User not found', username)
except LdapPasswordError:
log.info('Error authenticating %s with LDAP: Password error', username)
except LdapImportError:
log.error('Error authenticating %s with LDAP: LDAP not available', username)
return None
def get_managed_fields(self):
return ['username', 'firstname', 'lastname', 'email', 'password']
|