@@ -1651,769 +1651,769 @@ class ApiController(JSONRPCController):
if HasPermissionAnyApi('hg.admin')(user=apiuser):
pass
elif HasRepoPermissionAnyApi('repository.admin',
'repository.write',
'repository.read')(user=apiuser,
repo_name=repo.repo_name):
if not isinstance(owner, Optional):
# forbid setting owner for non-admins
raise JSONRPCError(
'Only Kallithea admin can specify `owner` param'
)
if not HasPermissionAnyApi('hg.create.repository')(user=apiuser):
raise JSONRPCError('no permission to create repositories')
else:
raise JSONRPCError('repository `%s` does not exist' % (repoid,))
if isinstance(owner, Optional):
owner = apiuser.user_id
owner = get_user_or_error(owner)
try:
# create structure of groups and return the last group
group = map_groups(fork_name)
fork_base_name = fork_name.rsplit('/', 1)[-1]
form_data = dict(
repo_name=fork_base_name,
repo_name_full=fork_name,
repo_group=group,
repo_type=repo.repo_type,
description=Optional.extract(description),
private=Optional.extract(private),
copy_permissions=Optional.extract(copy_permissions),
landing_rev=Optional.extract(landing_rev),
update_after_clone=False,
fork_parent_id=repo.repo_id,
task = RepoModel().create_fork(form_data, cur_user=owner)
# no commit, it's done in RepoModel, or async via celery
from celery.result import BaseAsyncResult
task_id = None
if isinstance(task, BaseAsyncResult):
task_id = task.task_id
return dict(
msg='Created fork of `%s` as `%s`' % (repo.repo_name,
fork_name),
success=True, # cannot return the repo data here since fork
# can be done async
task=task_id
except Exception:
log.error(traceback.format_exc())
'failed to fork repository `%s` as `%s`' % (repo_name,
fork_name)
# permission check inside
def delete_repo(self, apiuser, repoid, forks=Optional('')):
"""
Deletes a repository. This command can be executed only using api_key belonging
to user with admin rights or regular user that have admin access to repository.
When `forks` param is set it's possible to detach or delete forks of deleting
repository
:param apiuser: filled automatically from apikey
:type apiuser: AuthUser
:param repoid: repository name or repository id
:type repoid: str or int
:param forks: `detach` or `delete`, what do do with attached forks for repo
:type forks: Optional(str)
OUTPUT::
id : <id_given_in_input>
result: {
"msg": "Deleted repository `<reponame>`",
"success": true
}
error: null
repo = get_repo_or_error(repoid)
if not HasPermissionAnyApi('hg.admin')(user=apiuser):
# check if we have admin permission for this repo !
if not HasRepoPermissionAnyApi('repository.admin')(user=apiuser,
handle_forks = Optional.extract(forks)
_forks_msg = ''
_forks = [f for f in repo.forks]
if handle_forks == 'detach':
_forks_msg = ' ' + 'Detached %s forks' % len(_forks)
elif handle_forks == 'delete':
_forks_msg = ' ' + 'Deleted %s forks' % len(_forks)
elif _forks:
'Cannot delete `%s` it still contains attached forks' %
(repo.repo_name,)
RepoModel().delete(repo, forks=forks)
Session().commit()
msg='Deleted repository `%s`%s' % (repo.repo_name, _forks_msg),
success=True
'failed to delete repository `%s`' % (repo.repo_name,)
@HasPermissionAnyDecorator('hg.admin')
def grant_user_permission(self, apiuser, repoid, userid, perm):
Grant permission for user on given repository, or update existing one
if found. This command can be executed only using api_key belonging to user
with admin rights.
:param userid:
:param perm: (repository.(none|read|write|admin))
:type perm: str
"msg" : "Granted perm: `<perm>` for user: `<username>` in repo: `<reponame>`",
user = get_user_or_error(userid)
perm = get_perm_or_error(perm)
RepoModel().grant_user_permission(repo=repo, user=user, perm=perm)
msg='Granted perm: `%s` for user: `%s` in repo: `%s`' % (
perm.permission_name, user.username, repo.repo_name
),
'failed to edit permission for user: `%s` in repo: `%s`' % (
userid, repoid
def revoke_user_permission(self, apiuser, repoid, userid):
Revoke permission for user on given repository. This command can be executed
only using api_key belonging to user with admin rights.
"msg" : "Revoked perm for user: `<username>` in repo: `<reponame>`",
RepoModel().revoke_user_permission(repo=repo, user=user)
msg='Revoked perm for user: `%s` in repo: `%s`' % (
user.username, repo.repo_name
def grant_user_group_permission(self, apiuser, repoid, usergroupid, perm):
Grant permission for user group on given repository, or update
existing one if found. This command can be executed only using
api_key belonging to user with admin rights.
:param usergroupid: id of usergroup
:type usergroupid: str or int
result : {
"msg" : "Granted perm: `<perm>` for group: `<usersgroupname>` in repo: `<reponame>`",
error : null
ERROR OUTPUT::
result : null
error : {
"failed to edit permission for user group: `<usergroup>` in repo `<repo>`'
user_group = get_user_group_or_error(usergroupid)
_perms = ('repository.admin',)
if not HasRepoPermissionAnyApi(*_perms)(
user=apiuser, repo_name=repo.repo_name):
# check if we have at least read permission for this user group !
_perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
if not HasUserGroupPermissionAny(*_perms)(
user=apiuser, user_group_name=user_group.users_group_name):
raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
RepoModel().grant_user_group_permission(
repo=repo, group_name=user_group, perm=perm)
msg='Granted perm: `%s` for user group: `%s` in '
'repo: `%s`' % (
perm.permission_name, user_group.users_group_name,
repo.repo_name
'failed to edit permission for user group: `%s` in '
usergroupid, repo.repo_name
def revoke_user_group_permission(self, apiuser, repoid, usergroupid):
Revoke permission for user group on given repository. This command can be
executed only using api_key belonging to user with admin rights.
:param usergroupid:
"msg" : "Revoked perm for group: `<usersgroupname>` in repo: `<reponame>`",
RepoModel().revoke_user_group_permission(
repo=repo, group_name=user_group)
msg='Revoked perm for user group: `%s` in repo: `%s`' % (
user_group.users_group_name, repo.repo_name
def get_repo_group(self, apiuser, repogroupid):
Returns given repo group together with permissions, and repositories
inside the group
:param repogroupid: id/name of repository group
:type repogroupid: str or int
repo_group = get_repo_group_or_error(repogroupid)
members = []
for user in repo_group.repo_group_to_perm:
perm = user.permission.permission_name
user = user.user
user_data = {
'name': user.username,
'type': "user",
'permission': perm
members.append(user_data)
for user_group in repo_group.users_group_to_perm:
perm = user_group.permission.permission_name
user_group = user_group.users_group
user_group_data = {
'name': user_group.users_group_name,
'type': "user_group",
members.append(user_group_data)
data = repo_group.get_api_data()
data["members"] = members
return data
def get_repo_groups(self, apiuser):
Returns all repository groups
result = []
for repo_group in RepoGroupModel().get_all():
for repo_group in RepoGroup.get_all():
result.append(repo_group.get_api_data())
return result
def create_repo_group(self, apiuser, group_name, description=Optional(''),
owner=Optional(OAttr('apiuser')),
parent=Optional(None),
copy_permissions=Optional(False)):
Creates a repository group. This command can be executed only using
:param group_name:
:type group_name:
:param description:
:type description:
:param owner:
:type owner:
:param parent:
:type parent:
:param copy_permissions:
:type copy_permissions:
"msg": "created new repo group `<repo_group_name>`"
"repo_group": <repogroup_object>
failed to create repo group `<repogroupid>`
if RepoGroup.get_by_group_name(group_name):
raise JSONRPCError("repo group `%s` already exist" % (group_name,))
group_description = Optional.extract(description)
parent_group = Optional.extract(parent)
if not isinstance(parent, Optional):
parent_group = get_repo_group_or_error(parent_group)
copy_permissions = Optional.extract(copy_permissions)
repo_group = RepoGroupModel().create(
group_name=group_name,
group_description=group_description,
owner=owner,
parent=parent_group,
copy_permissions=copy_permissions
msg='created new repo group `%s`' % group_name,
repo_group=repo_group.get_api_data()
raise JSONRPCError('failed to create repo group `%s`' % (group_name,))
def update_repo_group(self, apiuser, repogroupid, group_name=Optional(''),
description=Optional(''),
parent=Optional(None), enable_locking=Optional(False)):
updates = {}
store_update(updates, group_name, 'group_name')
store_update(updates, description, 'group_description')
store_update(updates, owner, 'owner')
store_update(updates, parent, 'parent_group')
store_update(updates, enable_locking, 'enable_locking')
repo_group = RepoGroupModel().update(repo_group, updates)
msg='updated repository group ID:%s %s' % (repo_group.group_id,
repo_group.group_name),
raise JSONRPCError('failed to update repository group `%s`'
% (repogroupid,))
def delete_repo_group(self, apiuser, repogroupid):
:param repogroupid: name or id of repository group
'msg': 'deleted repo group ID:<repogroupid> <repogroupname>
'repo_group': null
"failed to delete repo group ID:<repogroupid> <repogroupname>"
RepoGroupModel().delete(repo_group)
msg='deleted repo group ID:%s %s' %
(repo_group.group_id, repo_group.group_name),
repo_group=None
raise JSONRPCError('failed to delete repo group ID:%s %s' %
(repo_group.group_id, repo_group.group_name)
def grant_user_permission_to_repo_group(self, apiuser, repogroupid, userid,
perm, apply_to_children=Optional('none')):
Grant permission for user on given repository group, or update existing
one if found. This command can be executed only using api_key belonging
to user with admin rights, or user who has admin right to given repository
group.
:param perm: (group.(none|read|write|admin))
:param apply_to_children: 'none', 'repos', 'groups', 'all'
:type apply_to_children: str
"msg" : "Granted perm: `<perm>` (recursive:<apply_to_children>) for user: `<username>` in repo group: `<repo_group_name>`",
"failed to edit permission for user: `<userid>` in repo group: `<repo_group_name>`"
# check if we have admin permission for this repo group !
if not HasRepoGroupPermissionAnyApi('group.admin')(user=apiuser,
group_name=repo_group.group_name):
raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,))
perm = get_perm_or_error(perm, prefix='group.')
apply_to_children = Optional.extract(apply_to_children)
RepoGroupModel().add_permission(repo_group=repo_group,
obj=user,
obj_type="user",
perm=perm,
recursive=apply_to_children)
msg='Granted perm: `%s` (recursive:%s) for user: `%s` in repo group: `%s`' % (
perm.permission_name, apply_to_children, user.username, repo_group.name
'failed to edit permission for user: `%s` in repo group: `%s`' % (
userid, repo_group.name))
def revoke_user_permission_from_repo_group(self, apiuser, repogroupid, userid,
apply_to_children=Optional('none')):
Revoke permission for user on given repository group. This command can
be executed only using api_key belonging to user with admin rights, or
user who has admin right to given repository group.
:type userid:
"msg" : "Revoked perm (recursive:<apply_to_children>) for user: `<username>` in repo group: `<repo_group_name>`",
RepoGroupModel().delete_permission(repo_group=repo_group,
msg='Revoked perm (recursive:%s) for user: `%s` in repo group: `%s`' % (
apply_to_children, user.username, repo_group.name
def grant_user_group_permission_to_repo_group(
self, apiuser, repogroupid, usergroupid, perm,
Grant permission for user group on given repository group, or update
api_key belonging to user with admin rights, or user who has admin
right to given repository group.
"msg" : "Granted perm: `<perm>` (recursive:<apply_to_children>) for user group: `<usersgroupname>` in repo group: `<repo_group_name>`",
"failed to edit permission for user group: `<usergroup>` in repo group: `<repo_group_name>`"
_perms = ('group.admin',)
if not HasRepoGroupPermissionAnyApi(*_perms)(
user=apiuser, group_name=repo_group.group_name):
'repository group `%s` does not exist' % (repogroupid,))
'user group `%s` does not exist' % (usergroupid,))
obj=user_group,
obj_type="user_group",
msg='Granted perm: `%s` (recursive:%s) for user group: `%s` in repo group: `%s`' % (
perm.permission_name, apply_to_children,
user_group.users_group_name, repo_group.name
'repo group: `%s`' % (
usergroupid, repo_group.name
def revoke_user_group_permission_from_repo_group(
self, apiuser, repogroupid, usergroupid,
executed only using api_key belonging to user with admin rights, or
"msg" : "Revoked perm (recursive:<apply_to_children>) for user group: `<usersgroupname>` in repo group: `<repo_group_name>`",
Status change: