Changeset - 72c4b2d720ea
[Not reviewed]
default
0 3 0
Mads Kiilerich - 6 years ago 2019-08-04 00:59:42
mads@kiilerich.com
Grafted from: f01f2674781e
flake8: fix some E712 comparison to True should be 'if cond is True:' or 'if cond:'

add_user_to_group is really odd ...

Note that the SqlAlchemy query API cause a lot of this kind of warnings.
3 files changed with 3 insertions and 2 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/api/api.py
Show inline comments
 
@@ -805,193 +805,193 @@ class ApiController(JSONRPCController):
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to update user group `%s`' % (usergroupid,))
 

	
 
    # permission check inside
 
    def delete_user_group(self, usergroupid):
 
        """
 
        Delete given user group by user group id or name.
 
        This command can be executed only using api_key
 
        belonging to user with admin rights or an admin of given user group
 

	
 
        :param usergroupid:
 
        :type usergroupid: int
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            "msg": "deleted user group ID:<user_group_id> <user_group_name>"
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to delete user group ID:<user_group_id> <user_group_name>"
 
            or
 
            "RepoGroup assigned to <repo_groups_list>"
 
          }
 

	
 
        """
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name):
 
                raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 

	
 
        try:
 
            UserGroupModel().delete(user_group)
 
            Session().commit()
 
            return dict(
 
                msg='deleted user group ID:%s %s' %
 
                    (user_group.users_group_id, user_group.users_group_name),
 
                user_group=None
 
            )
 
        except UserGroupsAssignedException as e:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(str(e))
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to delete user group ID:%s %s' %
 
                               (user_group.users_group_id,
 
                                user_group.users_group_name)
 
                               )
 

	
 
    # permission check inside
 
    def add_user_to_user_group(self, usergroupid, userid):
 
        """
 
        Adds a user to a user group. If user exists in that group success will be
 
        `false`. This command can be executed only using api_key
 
        belonging to user with admin rights  or an admin of given user group
 

	
 
        :param usergroupid:
 
        :type usergroupid: int
 
        :param userid:
 
        :type userid: int
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
              "success": True|False # depends on if member is in group
 
              "msg": "added member `<username>` to user group `<groupname>` |
 
                      User is already in that group"
 

	
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to add member to user group `<user_group_name>`"
 
          }
 

	
 
        """
 
        user = get_user_or_error(userid)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name):
 
                raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 

	
 
        try:
 
            ugm = UserGroupModel().add_user_to_group(user_group, user)
 
            success = True if ugm != True else False
 
            success = True if ugm is not True else False
 
            msg = 'added member `%s` to user group `%s`' % (
 
                user.username, user_group.users_group_name
 
            )
 
            msg = msg if success else 'User is already in that group'
 
            Session().commit()
 

	
 
            return dict(
 
                success=success,
 
                msg=msg
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to add member to user group `%s`' % (
 
                    user_group.users_group_name,
 
                )
 
            )
 

	
 
    # permission check inside
 
    def remove_user_from_user_group(self, usergroupid, userid):
 
        """
 
        Removes a user from a user group. If user is not in given group success will
 
        be `false`. This command can be executed only
 
        using api_key belonging to user with admin rights or an admin of given user group
 

	
 
        :param usergroupid:
 
        :param userid:
 

	
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "success":  True|False,  # depends on if member is in group
 
                      "msg": "removed member <username> from user group <groupname> |
 
                              User wasn't in group"
 
                    }
 
            error:  null
 

	
 
        """
 
        user = get_user_or_error(userid)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name):
 
                raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 

	
 
        try:
 
            success = UserGroupModel().remove_user_from_group(user_group, user)
 
            msg = 'removed member `%s` from user group `%s`' % (
 
                user.username, user_group.users_group_name
 
            )
 
            msg = msg if success else "User wasn't in group"
 
            Session().commit()
 
            return dict(success=success, msg=msg)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to remove member from user group `%s`' % (
 
                    user_group.users_group_name,
 
                )
 
            )
 

	
 
    # permission check inside
 
    def get_repo(self, repoid,
 
                 with_revision_names=Optional(False),
 
                 with_pullrequests=Optional(False)):
 
        """
 
        Gets an existing repository by it's name or repository_id. Members will return
 
        either users_group or user associated to that repository. This command can be
 
        executed only using api_key belonging to user with admin
 
        rights or regular user that have at least read access to repository.
 

	
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            {
 
                "repo_id" :          "<repo_id>",
 
                "repo_name" :        "<reponame>"
 
                "repo_type" :        "<repo_type>",
 
                "clone_uri" :        "<clone_uri>",
 
                "enable_downloads":  "<bool>",
 
                "enable_statistics": "<bool>",
 
                "private":           "<bool>",
 
                "created_on" :       "<date_time_created>",
 
                "description" :      "<description>",
 
                "landing_rev":       "<landing_rev>",
 
                "last_changeset":    {
 
                                       "author":   "<full_author>",
 
                                       "date":     "<date_time_of_commit>",
 
                                       "message":  "<commit_message>",
 
                                       "raw_id":   "<raw_id>",
 
                                       "revision": "<numeric_revision>",
kallithea/lib/recaptcha.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
import json
 
import urllib
 
import urllib2
 

	
 

	
 
class RecaptchaResponse(object):
 
    def __init__(self, is_valid, error_code=None):
 
        self.is_valid = is_valid
 
        self.error_code = error_code
 

	
 
    def __repr__(self):
 
        return '<RecaptchaResponse:%s>' % (self.is_valid)
 

	
 

	
 
def submit(g_recaptcha_response, private_key, remoteip):
 
    """
 
    Submits a reCAPTCHA request for verification. Returns RecaptchaResponse for the request
 

	
 
    g_recaptcha_response -- The value of g_recaptcha_response from the form
 
    private_key -- your reCAPTCHA private key
 
    remoteip -- the user's IP address
 
    """
 

	
 
    if not (g_recaptcha_response and len(g_recaptcha_response)):
 
        return RecaptchaResponse(is_valid=False, error_code='incorrect-captcha-sol')
 

	
 
    def encode_if_necessary(s):
 
        if isinstance(s, unicode):
 
            return s.encode('utf-8')
 
        return s
 

	
 
    params = urllib.urlencode({
 
        'secret': encode_if_necessary(private_key),
 
        'remoteip': encode_if_necessary(remoteip),
 
        'response': encode_if_necessary(g_recaptcha_response),
 
    })
 

	
 
    req = urllib2.Request(
 
        url="https://www.google.com/recaptcha/api/siteverify",
 
        data=params,
 
        headers={
 
            "Content-type": "application/x-www-form-urlencoded",
 
            "User-agent": "reCAPTCHA Python"
 
        }
 
    )
 

	
 
    httpresp = urllib2.urlopen(req)
 
    return_values = json.loads(httpresp.read())
 
    httpresp.close()
 

	
 
    if not (isinstance(return_values, dict)):
 
        return RecaptchaResponse(is_valid=False, error_code='incorrect-captcha-sol')
 
    elif (("success" in return_values) and ((return_values["success"] == True) or (return_values["success"] == "true"))):
 
    elif (("success" in return_values) and ((return_values["success"] is True) or (return_values["success"] == "true"))):
 
        return RecaptchaResponse(is_valid=True)
 
    elif (("error-codes" in return_values) and isinstance(return_values["error-codes"], list) and (len(return_values["error-codes"]) > 0)):
 
        return RecaptchaResponse(is_valid=False, error_code=return_values["error-codes"][0])
 
    else:
 
        return RecaptchaResponse(is_valid=False, error_code='incorrect-captcha-sol')
kallithea/model/user_group.py
Show inline comments
 
@@ -73,192 +73,193 @@ class UserGroupModel(object):
 
                # check if we have permissions to alter this usergroup's access
 
                if HasUserGroupPermissionLevel('read')(member):
 
                    self.grant_user_group_permission(
 
                        target_user_group=user_group, user_group=member, perm=perm
 
                    )
 
        # set new permissions
 
        for member, perm, member_type in perms_new:
 
            if member_type == 'user':
 
                self.grant_user_permission(
 
                    user_group=user_group, user=member, perm=perm
 
                )
 
            else:
 
                # check if we have permissions to alter this usergroup's access
 
                if HasUserGroupPermissionLevel('read')(member):
 
                    self.grant_user_group_permission(
 
                        target_user_group=user_group, user_group=member, perm=perm
 
                    )
 

	
 
    def get(self, user_group_id, cache=False):
 
        return UserGroup.get(user_group_id)
 

	
 
    def get_group(self, user_group):
 
        return UserGroup.guess_instance(user_group)
 

	
 
    def get_by_name(self, name, cache=False, case_insensitive=False):
 
        return UserGroup.get_by_group_name(name, cache, case_insensitive)
 

	
 
    def create(self, name, description, owner, active=True, group_data=None):
 
        try:
 
            new_user_group = UserGroup()
 
            new_user_group.owner = User.guess_instance(owner)
 
            new_user_group.users_group_name = name
 
            new_user_group.user_group_description = description
 
            new_user_group.users_group_active = active
 
            if group_data:
 
                new_user_group.group_data = group_data
 
            Session().add(new_user_group)
 
            self._create_default_perms(new_user_group)
 

	
 
            self.grant_user_permission(user_group=new_user_group,
 
                                       user=owner, perm='usergroup.admin')
 

	
 
            return new_user_group
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def update(self, user_group, form_data):
 

	
 
        try:
 
            user_group = UserGroup.guess_instance(user_group)
 

	
 
            for k, v in form_data.items():
 
                if k == 'users_group_members':
 
                    members_list = []
 
                    if v:
 
                        v = [v] if isinstance(v, basestring) else v
 
                        for u_id in set(v):
 
                            member = UserGroupMember(user_group.users_group_id, u_id)
 
                            members_list.append(member)
 
                            Session().add(member)
 
                    user_group.members = members_list
 
                setattr(user_group, k, v)
 

	
 
            # Flush to make db assign users_group_member_id to newly
 
            # created UserGroupMembers.
 
            Session().flush()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def delete(self, user_group, force=False):
 
        """
 
        Deletes user group, unless force flag is used
 
        raises exception if there are members in that group, else deletes
 
        group and users
 

	
 
        :param user_group:
 
        :param force:
 
        """
 
        user_group = UserGroup.guess_instance(user_group)
 
        try:
 
            # check if this group is not assigned to repo
 
            assigned_groups = UserGroupRepoToPerm.query() \
 
                .filter(UserGroupRepoToPerm.users_group == user_group).all()
 
            assigned_groups = [x.repository.repo_name for x in assigned_groups]
 

	
 
            if assigned_groups and not force:
 
                raise UserGroupsAssignedException(
 
                    'User Group assigned to %s' % ", ".join(assigned_groups))
 
            Session().delete(user_group)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def add_user_to_group(self, user_group, user):
 
        """Return True if user already is in the group - else return the new UserGroupMember"""
 
        user_group = UserGroup.guess_instance(user_group)
 
        user = User.guess_instance(user)
 

	
 
        for m in user_group.members:
 
            u = m.user
 
            if u.user_id == user.user_id:
 
                # user already in the group, skip
 
                return True
 

	
 
        try:
 
            user_group_member = UserGroupMember()
 
            user_group_member.user = user
 
            user_group_member.users_group = user_group
 

	
 
            user_group.members.append(user_group_member)
 
            user.group_member.append(user_group_member)
 

	
 
            Session().add(user_group_member)
 
            return user_group_member
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def remove_user_from_group(self, user_group, user):
 
        user_group = UserGroup.guess_instance(user_group)
 
        user = User.guess_instance(user)
 

	
 
        user_group_member = None
 
        for m in user_group.members:
 
            if m.user_id == user.user_id:
 
                # Found this user's membership row
 
                user_group_member = m
 
                break
 

	
 
        if user_group_member:
 
            try:
 
                Session().delete(user_group_member)
 
                return True
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                raise
 
        else:
 
            # User isn't in that group
 
            return False
 

	
 
    def has_perm(self, user_group, perm):
 
        user_group = UserGroup.guess_instance(user_group)
 
        perm = Permission.guess_instance(perm)
 

	
 
        return UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group == user_group) \
 
            .filter(UserGroupToPerm.permission == perm).scalar() is not None
 

	
 
    def grant_perm(self, user_group, perm):
 
        user_group = UserGroup.guess_instance(user_group)
 
        perm = Permission.guess_instance(perm)
 

	
 
        # if this permission is already granted skip it
 
        _perm = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group == user_group) \
 
            .filter(UserGroupToPerm.permission == perm) \
 
            .scalar()
 
        if _perm:
 
            return
 

	
 
        new = UserGroupToPerm()
 
        new.users_group = user_group
 
        new.permission = perm
 
        Session().add(new)
 
        return new
 

	
 
    def revoke_perm(self, user_group, perm):
 
        user_group = UserGroup.guess_instance(user_group)
 
        perm = Permission.guess_instance(perm)
 

	
 
        obj = UserGroupToPerm.query() \
 
            .filter(UserGroupToPerm.users_group == user_group) \
 
            .filter(UserGroupToPerm.permission == perm).scalar()
 
        if obj is not None:
 
            Session().delete(obj)
 

	
 
    def grant_user_permission(self, user_group, user, perm):
 
        """
 
        Grant permission for user on given user group, or update
 
        existing one if found
 

	
 
        :param user_group: Instance of UserGroup, users_group_id,
 
            or users_group_name
 
        :param user: Instance of User, user_id or username
 
        :param perm: Instance of Permission, or permission_name
 
        """
 

	
 
        user_group = UserGroup.guess_instance(user_group)
 
        user = User.guess_instance(user)
 
        permission = Permission.guess_instance(perm)
 

	
0 comments (0 inline, 0 general)