@@ -630,192 +630,193 @@ class AuthUser(object):
inherit_from_default=inherit_from_default)
if check_ip_access(source_ip=ip_addr, allowed_ips=allowed_ips):
log.debug('IP:%s is in range of %s' % (ip_addr, allowed_ips))
return True
else:
log.info('Access for IP:%s forbidden, '
'not in %s' % (ip_addr, allowed_ips))
return False
def __repr__(self):
return "<AuthUser('id:%s[%s] ip:%s auth:%s')>"\
% (self.user_id, self.username, self.ip_addr, self.is_authenticated)
def set_authenticated(self, authenticated=True):
if self.user_id != self.anonymous_user.user_id:
self.is_authenticated = authenticated
def get_cookie_store(self):
return {'username': self.username,
'user_id': self.user_id,
'is_authenticated': self.is_authenticated}
@classmethod
def from_cookie_store(cls, cookie_store):
"""
Creates AuthUser from a cookie store
:param cls:
:param cookie_store:
user_id = cookie_store.get('user_id')
username = cookie_store.get('username')
api_key = cookie_store.get('api_key')
return AuthUser(user_id, api_key, username)
def get_allowed_ips(cls, user_id, cache=False, inherit_from_default=False):
_set = set()
if inherit_from_default:
default_ips = UserIpMap.query().filter(UserIpMap.user ==
User.get_default_user(cache=True))
if cache:
default_ips = default_ips.options(FromCache("sql_cache_short",
"get_user_ips_default"))
# populate from default user
for ip in default_ips:
try:
_set.add(ip.ip_addr)
except ObjectDeletedError:
# since we use heavy caching sometimes it happens that we get
# deleted objects here, we just skip them
pass
user_ips = UserIpMap.query().filter(UserIpMap.user_id == user_id)
user_ips = user_ips.options(FromCache("sql_cache_short",
"get_user_ips_%s" % user_id))
for ip in user_ips:
return _set or set(['0.0.0.0/0', '::/0'])
def set_available_permissions(config):
This function will propagate pylons globals with all available defined
permission given in db. We don't want to check each time from db for new
permissions since adding a new permission also requires application restart
ie. to decorate new views with the newly created permission
:param config: current pylons config instance
log.info('getting information about all available permissions')
sa = meta.Session
all_perms = sa.query(Permission).all()
config['available_permissions'] = [x.permission_name for x in all_perms]
finally:
meta.Session.remove()
#==============================================================================
# CHECK DECORATORS
def redirect_to_login(message=None):
from kallithea.lib import helpers as h
p = url.current()
if message:
h.flash(h.literal(message), category='warning')
log.debug('Redirecting to login page, origin: %s' % p)
return redirect(url('login_home', came_from=p))
class LoginRequired(object):
Must be logged in to execute this function else
redirect to login page
:param api_access: if enabled this checks only for valid auth token
and grants access based on valid token
def __init__(self, api_access=False):
self.api_access = api_access
def __call__(self, func):
return decorator(self.__wrapper, func)
def __wrapper(self, func, *fargs, **fkwargs):
cls = fargs[0]
user = cls.authuser
loc = "%s:%s" % (cls.__class__.__name__, func.__name__)
log.debug('Checking access for user %s @ %s' % (user, loc))
# check if our IP is allowed
if not user.ip_allowed:
return redirect_to_login(_('IP %s not allowed' % (user.ip_addr)))
# check if we used an API key and it's a valid one
api_key = request.GET.get('api_key')
if api_key is not None:
# explicit controller is enabled or API is in our whitelist
if self.api_access or allowed_api_access(loc, api_key=api_key):
if api_key in user.api_keys:
log.info('user %s authenticated with API key ****%s @ %s'
% (user, api_key[-4:], loc))
return func(*fargs, **fkwargs)
log.warning('API key ****%s is NOT valid' % api_key[-4:])
return redirect_to_login(_('Invalid API key'))
# controller does not allow API access
log.warning('API access to %s is not allowed' % loc)
return abort(403)
# CSRF protection - POSTs with session auth must contain correct token
if request.POST and user.is_authenticated:
token = request.POST.get(secure_form.token_key)
if not token or token != secure_form.authentication_token():
log.error('CSRF check failed')
log.debug('Checking if %s is authenticated @ %s' % (user.username, loc))
reason = 'RegularAuth' if user.is_authenticated else 'APIAuth'
if user.is_authenticated:
log.info('user %s authenticating with:%s IS authenticated on func %s '
% (user, reason, loc)
)
log.warning('user %s authenticating with:%s NOT authenticated on func: %s: '
return redirect_to_login()
class NotAnonymous(object):
redirect to login page"""
self.user = cls.authuser
log.debug('Checking if user is not anonymous @%s' % cls)
anonymous = self.user.username == User.DEFAULT_USER
if anonymous:
return redirect_to_login(_('You need to be a registered user to '
'perform this action'))
class PermsDecorator(object):
"""Base class for controller decorators"""
def __init__(self, *required_perms):
self.required_perms = set(required_perms)
self.user_perms = None
Status change: