Changeset - 9ddbfaeefb73
[Not reviewed]
beta
0 1 0
Marcin Kuzminski - 14 years ago 2012-02-21 00:18:00
marcin@python-works.com
API: allowed password field to be null when used with ldap_dn ref #362
- added errors about non unique email to create_user api function
1 file changed with 9 insertions and 5 deletions:
0 comments (0 inline, 0 general)
rhodecode/controllers/api/api.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.controllers.api
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    API controller for RhodeCode
 

	
 
    :created_on: Aug 20, 2011
 
    :author: marcink
 
    :copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software; you can redistribute it and/or
 
# modify it under the terms of the GNU General Public License
 
# as published by the Free Software Foundation; version 2
 
# of the License or (at your opinion) any later version of the license.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program; if not, write to the Free Software
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 
# MA  02110-1301, USA.
 

	
 
import traceback
 
import logging
 

	
 
from sqlalchemy.orm.exc import NoResultFound
 

	
 
from rhodecode.controllers.api import JSONRPCController, JSONRPCError
 
from rhodecode.lib.auth import HasPermissionAllDecorator, \
 
    HasPermissionAnyDecorator
 
    HasPermissionAnyDecorator, PasswordGenerator
 

	
 
from rhodecode.model.meta import Session
 
from rhodecode.model.scm import ScmModel
 
from rhodecode.model.db import User, UsersGroup, RepoGroup, Repository
 
from rhodecode.model.repo import RepoModel
 
from rhodecode.model.user import UserModel
 
from rhodecode.model.repo_permission import RepositoryPermissionModel
 
from rhodecode.model.users_group import UsersGroupModel
 
from rhodecode.model.repos_group import ReposGroupModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ApiController(JSONRPCController):
 
    """
 
    API Controller
 

	
 

	
 
    Each method needs to have USER as argument this is then based on given
 
    API_KEY propagated as instance of user object
 

	
 
    Preferably this should be first argument also
 

	
 

	
 
    Each function should also **raise** JSONRPCError for any
 
    errors that happens
 

	
 
    """
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def pull(self, apiuser, repo_name):
 
        """
 
        Dispatch pull action on given repo
 

	
 

	
 
        :param user:
 
        :param repo_name:
 
        """
 

	
 
        if Repository.is_valid(repo_name) is False:
 
            raise JSONRPCError('Unknown repo "%s"' % repo_name)
 

	
 
        try:
 
            ScmModel().pull_changes(repo_name, self.rhodecode_user.username)
 
            return 'Pulled from %s' % repo_name
 
        except Exception:
 
            raise JSONRPCError('Unable to pull changes from "%s"' % repo_name)
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def get_user(self, apiuser, username):
 
        """"
 
        Get a user by username
 

	
 
        :param apiuser:
 
        :param username:
 
        """
 

	
 
        user = User.get_by_username(username)
 
        if user is None:
 
            return user
 

	
 
        return dict(
 
            id=user.user_id,
 
            username=user.username,
 
            firstname=user.name,
 
            lastname=user.lastname,
 
            email=user.email,
 
            active=user.active,
 
            admin=user.admin,
 
            ldap=user.ldap_dn
 
        )
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def get_users(self, apiuser):
 
        """"
 
        Get all users
 

	
 
        :param apiuser:
 
        """
 

	
 
        result = []
 
        for user in User.getAll():
 
            result.append(
 
                dict(
 
                    id=user.user_id,
 
                    username=user.username,
 
                    firstname=user.name,
 
                    lastname=user.lastname,
 
                    email=user.email,
 
                    active=user.active,
 
                    admin=user.admin,
 
                    ldap=user.ldap_dn
 
                )
 
            )
 
        return result
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def create_user(self, apiuser, username, password, email, firstname=None,
 
    def create_user(self, apiuser, username, email, password, firstname=None,
 
                    lastname=None, active=True, admin=False, ldap_dn=None):
 
        """
 
        Create new user
 

	
 
        :param apiuser:
 
        :param username:
 
        :param password:
 
        :param email:
 
        :param name:
 
        :param lastname:
 
        :param active:
 
        :param admin:
 
        :param ldap_dn:
 
        """
 
        if User.get_by_username(username):
 
            raise JSONRPCError("user %s already exist" % username)
 

	
 
        if User.get_by_email(email, case_insensitive=True):
 
            raise JSONRPCError("email %s already exist" % email)
 

	
 
        if ldap_dn:
 
            # generate temporary password if ldap_dn
 
            password = PasswordGenerator().gen_password(length=8)
 

	
 
        try:
 
            usr = UserModel().create_or_update(
 
                username, password, email, firstname,
 
                lastname, active, admin, ldap_dn
 
            )
 
            Session.commit()
 
            return dict(
 
                id=usr.user_id,
 
                msg='created new user %s' % username
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to create user %s' % username)
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def update_user(self, apiuser, username, password, email, firstname=None,
 
                    lastname=None, active=True, admin=False, ldap_dn=None):
 
        """
 
        Updates given user
 

	
 
        :param apiuser:
 
        :param username:
 
        :param password:
 
        :param email:
 
        :param name:
 
        :param lastname:
 
        :param active:
 
        :param admin:
 
        :param ldap_dn:
 
        """
 
        if not User.get_by_username(username):
 
            raise JSONRPCError("user %s does not exist" % username)
 

	
 
        try:
 
            usr = UserModel().create_or_update(
 
                username, password, email, firstname,
 
                lastname, active, admin, ldap_dn
 
            )
 
            Session.commit()
 
            return dict(
 
                id=usr.user_id,
 
                msg='updated user %s' % username
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to update user %s' % username)
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def get_users_group(self, apiuser, group_name):
 
        """"
 
        Get users group by name
 

	
 
        :param apiuser:
 
        :param group_name:
 
        """
 

	
 
        users_group = UsersGroup.get_by_group_name(group_name)
 
        if not users_group:
 
            return None
 

	
 
        members = []
 
        for user in users_group.members:
 
            user = user.user
 
            members.append(dict(id=user.user_id,
 
                            username=user.username,
 
                            firstname=user.name,
 
                            lastname=user.lastname,
 
                            email=user.email,
 
                            active=user.active,
 
                            admin=user.admin,
 
                            ldap=user.ldap_dn))
 

	
 
        return dict(id=users_group.users_group_id,
 
                    group_name=users_group.users_group_name,
 
                    active=users_group.users_group_active,
 
                    members=members)
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def get_users_groups(self, apiuser):
 
        """"
 
        Get all users groups
 

	
 
        :param apiuser:
 
        """
 

	
 
        result = []
 
        for users_group in UsersGroup.getAll():
 
            members = []
 
            for user in users_group.members:
 
                user = user.user
 
                members.append(dict(id=user.user_id,
 
                                username=user.username,
 
                                firstname=user.name,
 
                                lastname=user.lastname,
 
                                email=user.email,
 
                                active=user.active,
 
                                admin=user.admin,
 
                                ldap=user.ldap_dn))
 

	
 
            result.append(dict(id=users_group.users_group_id,
 
                                group_name=users_group.users_group_name,
 
                                active=users_group.users_group_active,
 
                                members=members))
 
        return result
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def create_users_group(self, apiuser, group_name, active=True):
 
        """
 
        Creates an new usergroup
 

	
 
        :param group_name:
 
        :param active:
 
        """
 

	
 
        if self.get_users_group(apiuser, group_name):
 
            raise JSONRPCError("users group %s already exist" % group_name)
 

	
 
        try:
 
            ug = UsersGroupModel().create(name=group_name, active=active)
 
            Session.commit()
 
            return dict(id=ug.users_group_id,
 
                        msg='created new users group %s' % group_name)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to create group %s' % group_name)
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def add_user_to_users_group(self, apiuser, group_name, username):
 
        """"
 
        Add a user to a group
 

	
 
        :param apiuser:
 
        :param group_name:
 
        :param username:
 
        """
 

	
 
        try:
 
            users_group = UsersGroup.get_by_group_name(group_name)
 
            if not users_group:
 
                raise JSONRPCError('unknown users group %s' % group_name)
 

	
 
            user = User.get_by_username(username)
 
            if user is None:
 
                raise JSONRPCError('unknown user %s' % username)
 

	
 
            ugm = UsersGroupModel().add_user_to_group(users_group, user)
 
            success = True if ugm != True else False
 
            msg = 'added member %s to users group %s' % (username, group_name)
 
            msg = msg if success else 'User is already in that group'
 
            Session.commit()
 

	
 
            return dict(
 
                id=ugm.users_group_member_id if ugm != True else None,
 
                success=success,
 
                msg=msg
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to add users group member')
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def remove_user_from_users_group(self, apiuser, group_name, username):
 
        """
 
        Remove user from a group
 

	
 
        :param apiuser
 
        :param group_name
 
        :param username
 
        """
 

	
 
        try:
 
            users_group = UsersGroup.get_by_group_name(group_name)
 
            if not users_group:
 
                raise JSONRPCError('unknown users group %s' % group_name)
 

	
 
            user = User.get_by_username(username)
 
            if user is None:
 
                raise JSONRPCError('unknown user %s' % username)
 

	
 
            success = UsersGroupModel().remove_user_from_group(users_group, user)
 
            msg = 'removed member %s from users group %s' % (username, group_name)
 
            msg = msg if success else "User wasn't in group"
 
            Session.commit()
 
            return dict(success=success, msg=msg)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to remove user from group')
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def get_repo(self, apiuser, repo_name):
 
        """"
 
        Get repository by name
0 comments (0 inline, 0 general)