diff --git a/rhodecode/lib/auth.py b/rhodecode/lib/auth.py --- a/rhodecode/lib/auth.py +++ b/rhodecode/lib/auth.py @@ -863,6 +863,109 @@ class HasPermissionAnyMiddleware(object) return False +#============================================================================== +# SPECIAL VERSION TO HANDLE API AUTH +#============================================================================== +class _BaseApiPerm(object): + def __init__(self, *perms): + self.required_perms = set(perms) + + def __call__(self, check_location='unspecified', user=None, repo_name=None): + cls_name = self.__class__.__name__ + check_scope = 'user:%s, repo:%s' % (user, repo_name) + log.debug('checking cls:%s %s %s @ %s', cls_name, + self.required_perms, check_scope, check_location) + if not user: + log.debug('Empty User passed into arguments') + return False + + ## process user + if not isinstance(user, AuthUser): + user = AuthUser(user.user_id) + + if self.check_permissions(user.permissions, repo_name): + log.debug('Permission to %s granted for user: %s @ %s', repo_name, + user, check_location) + return True + + else: + log.debug('Permission to %s denied for user: %s @ %s', repo_name, + user, check_location) + return False + + def check_permissions(self, perm_defs, repo_name): + """ + implement in child class should return True if permissions are ok, + False otherwise + + :param perm_defs: dict with permission definitions + :param repo_name: repo name + """ + raise NotImplementedError() + + +class HasPermissionAllApi(_BaseApiPerm): + def __call__(self, user, check_location=''): + return super(HasPermissionAllApi, self)\ + .__call__(check_location=check_location, user=user) + + def check_permissions(self, perm_defs, repo): + if self.required_perms.issubset(perm_defs.get('global')): + return True + return False + + +class HasPermissionAnyApi(_BaseApiPerm): + def __call__(self, user, check_location=''): + return super(HasPermissionAnyApi, self)\ + .__call__(check_location=check_location, user=user) + + def check_permissions(self, perm_defs, repo): + if self.required_perms.intersection(perm_defs.get('global')): + return True + return False + + +class HasRepoPermissionAllApi(_BaseApiPerm): + def __call__(self, user, repo_name, check_location=''): + return super(HasRepoPermissionAllApi, self)\ + .__call__(check_location=check_location, user=user, + repo_name=repo_name) + + def check_permissions(self, perm_defs, repo_name): + + try: + self._user_perms = set( + [perm_defs['repositories'][repo_name]] + ) + except KeyError: + log.warning(traceback.format_exc()) + return False + if self.required_perms.issubset(self._user_perms): + return True + return False + + +class HasRepoPermissionAnyApi(_BaseApiPerm): + def __call__(self, user, repo_name, check_location=''): + return super(HasRepoPermissionAnyApi, self)\ + .__call__(check_location=check_location, user=user, + repo_name=repo_name) + + def check_permissions(self, perm_defs, repo_name): + + try: + _user_perms = set( + [perm_defs['repositories'][repo_name]] + ) + except KeyError: + log.warning(traceback.format_exc()) + return False + if self.required_perms.intersection(_user_perms): + return True + return False + + def check_ip_access(source_ip, allowed_ips=None): """ Checks if source_ip is a subnet of any of allowed_ips.