diff --git a/rhodecode/tests/fixture.py b/rhodecode/tests/fixture.py --- a/rhodecode/tests/fixture.py +++ b/rhodecode/tests/fixture.py @@ -1,12 +1,37 @@ +# -*- coding: utf-8 -*- +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + """ Helpers for fixture generation """ +import os +import time from rhodecode.tests import * from rhodecode.model.db import Repository, User, RepoGroup, UserGroup from rhodecode.model.meta import Session from rhodecode.model.repo import RepoModel -from rhodecode.model.repos_group import ReposGroupModel -from rhodecode.model.users_group import UserGroupModel +from rhodecode.model.user import UserModel +from rhodecode.model.repo_group import RepoGroupModel +from rhodecode.model.user_group import UserGroupModel +from rhodecode.model.gist import GistModel + +dn = os.path.dirname +FIXTURES = os.path.join(dn(dn(os.path.abspath(__file__))), 'tests', 'fixtures') + + +def error_function(*args, **kwargs): + raise Exception('Total Crash !') class Fixture(object): @@ -14,20 +39,52 @@ class Fixture(object): def __init__(self): pass + def anon_access(self, status): + """ + Context process for disabling anonymous access. use like: + fixture = Fixture() + with fixture.anon_access(False): + #tests + + after this block anon access will be set to `not status` + """ + + class context(object): + def __enter__(self): + anon = User.get_default_user() + anon.active = status + Session().add(anon) + Session().commit() + time.sleep(1.5) # must sleep for cache (1s to expire) + + def __exit__(self, exc_type, exc_val, exc_tb): + anon = User.get_default_user() + anon.active = not status + Session().add(anon) + Session().commit() + + return context() + def _get_repo_create_params(self, **custom): defs = dict( repo_name=None, repo_type='hg', clone_uri='', - repo_group='', + repo_group='-1', repo_description='DESC', repo_private=False, - repo_landing_rev='tip' + repo_landing_rev='rev:tip', + repo_copy_permissions=False, + repo_state=Repository.STATE_CREATED, ) defs.update(custom) if 'repo_name_full' not in custom: defs.update({'repo_name_full': defs['repo_name']}) + # fix the repo name if passed as repo_name_full + if defs['repo_name']: + defs['repo_name'] = defs['repo_name'].split('/')[-1] + return defs def _get_group_create_params(self, **custom): @@ -44,10 +101,28 @@ class Fixture(object): return defs + def _get_user_create_params(self, name, **custom): + defs = dict( + username=name, + password='qweqwe', + email='%s+test@rhodecode.org' % name, + firstname='TestUser', + lastname='Test', + active=True, + admin=False, + extern_type='rhodecode', + extern_name=None + ) + defs.update(custom) + + return defs + def _get_user_group_create_params(self, name, **custom): defs = dict( users_group_name=name, + user_group_description='DESC', users_group_active=True, + user_group_data={}, ) defs.update(custom) @@ -60,10 +135,8 @@ class Fixture(object): if r: return r - if isinstance(kwargs.get('repos_group'), RepoGroup): - #TODO: rename the repos_group ! - kwargs['repo_group'] = kwargs['repos_group'].group_id - del kwargs['repos_group'] + if isinstance(kwargs.get('repo_group'), RepoGroup): + kwargs['repo_group'] = kwargs['repo_group'].group_id form_data = self._get_repo_create_params(repo_name=name, **kwargs) cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN) @@ -92,11 +165,11 @@ class Fixture(object): assert r return r - def destroy_repo(self, repo_name): - RepoModel().delete(repo_name) + def destroy_repo(self, repo_name, **kwargs): + RepoModel().delete(repo_name, **kwargs) Session().commit() - def create_group(self, name, **kwargs): + def create_repo_group(self, name, **kwargs): if 'skip_if_exists' in kwargs: del kwargs['skip_if_exists'] gr = RepoGroup.get_by_group_name(group_name=name) @@ -104,13 +177,34 @@ class Fixture(object): return gr form_data = self._get_group_create_params(group_name=name, **kwargs) owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN) - gr = ReposGroupModel().create(group_name=form_data['group_name'], - group_description=form_data['group_name'], - owner=owner, parent=form_data['group_parent_id']) + gr = RepoGroupModel().create( + group_name=form_data['group_name'], + group_description=form_data['group_name'], + owner=owner, parent=form_data['group_parent_id']) Session().commit() gr = RepoGroup.get_by_group_name(gr.group_name) return gr + def destroy_repo_group(self, repogroupid): + RepoGroupModel().delete(repogroupid) + Session().commit() + + def create_user(self, name, **kwargs): + if 'skip_if_exists' in kwargs: + del kwargs['skip_if_exists'] + user = User.get_by_username(name) + if user: + return user + form_data = self._get_user_create_params(name, **kwargs) + user = UserModel().create(form_data) + Session().commit() + user = User.get_by_username(user.username) + return user + + def destroy_user(self, userid): + UserModel().delete(userid) + Session().commit() + def create_user_group(self, name, **kwargs): if 'skip_if_exists' in kwargs: del kwargs['skip_if_exists'] @@ -119,8 +213,50 @@ class Fixture(object): return gr form_data = self._get_user_group_create_params(name, **kwargs) owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN) - user_group = UserGroupModel().create(name=form_data['users_group_name'], - owner=owner, active=form_data['users_group_active']) + user_group = UserGroupModel().create( + name=form_data['users_group_name'], + description=form_data['user_group_description'], + owner=owner, active=form_data['users_group_active'], + group_data=form_data['user_group_data']) Session().commit() user_group = UserGroup.get_by_group_name(user_group.users_group_name) return user_group + + def destroy_user_group(self, usergroupid): + UserGroupModel().delete(user_group=usergroupid, force=True) + Session().commit() + + def create_gist(self, **kwargs): + form_data = { + 'description': 'new-gist', + 'owner': TEST_USER_ADMIN_LOGIN, + 'gist_type': GistModel.cls.GIST_PUBLIC, + 'lifetime': -1, + 'gist_mapping': {'filename1.txt':{'content':'hello world'},} + } + form_data.update(kwargs) + gist = GistModel().create( + description=form_data['description'],owner=form_data['owner'], + gist_mapping=form_data['gist_mapping'], gist_type=form_data['gist_type'], + lifetime=form_data['lifetime'] + ) + Session().commit() + + return gist + + def destroy_gists(self, gistid=None): + for g in GistModel.cls.get_all(): + if gistid: + if gistid == g.gist_access_id: + GistModel().delete(g) + else: + GistModel().delete(g) + Session().commit() + + def load_resource(self, resource_name, strip=True): + with open(os.path.join(FIXTURES, resource_name)) as f: + source = f.read() + if strip: + source = source.strip() + + return source