Files
@ 1e5bb8ed77d6
Branch filter:
Location: kallithea/rhodecode/tests/fixture.py
1e5bb8ed77d6
4.4 KiB
text/x-python
index: always use lightweight - there shouldn't be any reason not to
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 | """
Helpers for fixture generation
"""
from rhodecode.tests import *
from rhodecode.model.db import Repository, User, RepoGroup, UserGroup
from rhodecode.model.meta import Session
from rhodecode.model.repo import RepoModel
from rhodecode.model.repos_group import ReposGroupModel
from rhodecode.model.users_group import UserGroupModel
class Fixture(object):
def __init__(self):
pass
def _get_repo_create_params(self, **custom):
defs = dict(
repo_name=None,
repo_type='hg',
clone_uri='',
repo_group='',
repo_description='DESC',
repo_private=False,
repo_landing_rev='tip'
)
defs.update(custom)
if 'repo_name_full' not in custom:
defs.update({'repo_name_full': defs['repo_name']})
return defs
def _get_group_create_params(self, **custom):
defs = dict(
group_name=None,
group_description='DESC',
group_parent_id=None,
perms_updates=[],
perms_new=[],
enable_locking=False,
recursive=False
)
defs.update(custom)
return defs
def _get_user_group_create_params(self, name, **custom):
defs = dict(
users_group_name=name,
users_group_active=True,
)
defs.update(custom)
return defs
def create_repo(self, name, **kwargs):
if 'skip_if_exists' in kwargs:
del kwargs['skip_if_exists']
r = Repository.get_by_repo_name(name)
if r:
return r
if isinstance(kwargs.get('repos_group'), RepoGroup):
#TODO: rename the repos_group !
kwargs['repo_group'] = kwargs['repos_group'].group_id
del kwargs['repos_group']
form_data = self._get_repo_create_params(repo_name=name, **kwargs)
cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
RepoModel().create(form_data, cur_user)
Session().commit()
return Repository.get_by_repo_name(name)
def create_fork(self, repo_to_fork, fork_name, **kwargs):
repo_to_fork = Repository.get_by_repo_name(repo_to_fork)
form_data = self._get_repo_create_params(repo_name=fork_name,
fork_parent_id=repo_to_fork,
repo_type=repo_to_fork.repo_type,
**kwargs)
form_data['update_after_clone'] = False
#TODO: fix it !!
form_data['description'] = form_data['repo_description']
form_data['private'] = form_data['repo_private']
form_data['landing_rev'] = form_data['repo_landing_rev']
owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
RepoModel().create_fork(form_data, cur_user=owner)
Session().commit()
r = Repository.get_by_repo_name(fork_name)
assert r
return r
def destroy_repo(self, repo_name):
RepoModel().delete(repo_name)
Session().commit()
def create_group(self, name, **kwargs):
if 'skip_if_exists' in kwargs:
del kwargs['skip_if_exists']
gr = RepoGroup.get_by_group_name(group_name=name)
if gr:
return gr
form_data = self._get_group_create_params(group_name=name, **kwargs)
owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
gr = ReposGroupModel().create(group_name=form_data['group_name'],
group_description=form_data['group_name'],
owner=owner, parent=form_data['group_parent_id'])
Session().commit()
gr = RepoGroup.get_by_group_name(gr.group_name)
return gr
def create_user_group(self, name, **kwargs):
if 'skip_if_exists' in kwargs:
del kwargs['skip_if_exists']
gr = UserGroup.get_by_group_name(group_name=name)
if gr:
return gr
form_data = self._get_user_group_create_params(name, **kwargs)
owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
user_group = UserGroupModel().create(name=form_data['users_group_name'],
owner=owner, active=form_data['users_group_active'])
Session().commit()
user_group = UserGroup.get_by_group_name(user_group.users_group_name)
return user_group
|