Changeset - 04dee6fdfdff
[Not reviewed]
default
0 1 0
Thomas De Schampheleire - 6 years ago 2019-11-27 20:30:56
thomas.de_schampheleire@nokia.com
bin/ldap_sync: fix isort difference detected by run-all-cleanup
1 file changed with 0 insertions and 1 deletions:
0 comments (0 inline, 0 general)
kallithea/bin/ldap_sync.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.bin.ldap_sync
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
LDAP sync script
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Mar 06, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import print_function
 

	
 
import urllib2
 
import uuid
 
from ConfigParser import ConfigParser
 

	
 
import ldap
 

	
 
from kallithea.lib.compat import json
 

	
 

	
 
config = ConfigParser()
 
config.read('ldap_sync.conf')
 

	
 

	
 
class InvalidResponseIDError(Exception):
 
    """ Request and response don't have the same UUID. """
 

	
 

	
 
class ResponseError(Exception):
 
    """ Response has an error, something went wrong with request execution. """
 

	
 

	
 
class UserAlreadyInGroupError(Exception):
 
    """ User is already a member of the target group. """
 

	
 

	
 
class UserNotInGroupError(Exception):
 
    """ User is not a member of the target group. """
 

	
 

	
 
class API(object):
 

	
 
    def __init__(self, url, key):
 
        self.url = url
 
        self.key = key
 

	
 
    def get_api_data(self, uid, method, args):
 
        """Prepare dict for API post."""
 
        return {
 
            "id": uid,
 
            "api_key": self.key,
 
            "method": method,
 
            "args": args
 
        }
 

	
 
    def post(self, method, args):
 
        """Send a generic API post to Kallithea.
 

	
 
        This will generate the UUID for validation check after the
 
        response is returned. Handle errors and get the result back.
 
        """
 
        uid = str(uuid.uuid1())
 
        data = self.get_api_data(uid, method, args)
 

	
 
        data = json.dumps(data)
 
        headers = {'content-type': 'text/plain'}
 
        req = urllib2.Request(self.url, data, headers)
 

	
 
        response = urllib2.urlopen(req)
 
        response = json.load(response)
 

	
 
        if uid != response["id"]:
 
            raise InvalidResponseIDError("UUID does not match.")
 

	
 
        if response["error"] is not None:
 
            raise ResponseError(response["error"])
 

	
 
        return response["result"]
 

	
 
    def create_group(self, name, active=True):
 
        """Create the Kallithea user group."""
 
        args = {
 
            "group_name": name,
 
            "active": str(active)
 
        }
 
        self.post("create_user_group", args)
 

	
 
    def add_membership(self, group, username):
 
        """Add specific user to a group."""
 
        args = {
 
            "usersgroupid": group,
 
            "userid": username
 
        }
 
        result = self.post("add_user_to_user_group", args)
 
        if not result["success"]:
 
            raise UserAlreadyInGroupError("User %s already in group %s." %
 
                                          (username, group))
 

	
 
    def remove_membership(self, group, username):
 
        """Remove specific user from a group."""
 
        args = {
 
            "usersgroupid": group,
 
            "userid": username
 
        }
 
        result = self.post("remove_user_from_user_group", args)
 
        if not result["success"]:
 
            raise UserNotInGroupError("User %s not in group %s." %
 
                                      (username, group))
 

	
 
    def get_group_members(self, name):
 
        """Get the list of member usernames from a user group."""
 
        args = {"usersgroupid": name}
 
        members = self.post("get_user_group", args)['members']
0 comments (0 inline, 0 general)