Files @ e61a656b44bd
Branch filter:

Location: kallithea/rhodecode/tests/fixture.py

Mads Kiilerich
html: move "Submit a bug" to make it more clear that it is for RhodeCode, not the repo

RhodeCode _could_ contain a bug tracker and this link _could_ be for filing
bugs for the hosted projects.

Moving the link to the RhodeCode info makes it more clear that it is for
RhodeCode bugs.

The server instance is however something local, not directly related to the
upstream.
"""
Helpers for fixture generation
"""
from rhodecode.tests import *
from rhodecode.model.db import Repository, User, RepoGroup, UserGroup
from rhodecode.model.meta import Session
from rhodecode.model.repo import RepoModel
from rhodecode.model.repos_group import ReposGroupModel
from rhodecode.model.users_group import UserGroupModel


class Fixture(object):

    def __init__(self):
        pass

    def _get_repo_create_params(self, **custom):
        defs = dict(
            repo_name=None,
            repo_type='hg',
            clone_uri='',
            repo_group='',
            repo_description='DESC',
            repo_private=False,
            repo_landing_rev='tip'
        )
        defs.update(custom)
        if 'repo_name_full' not in custom:
            defs.update({'repo_name_full': defs['repo_name']})

        return defs

    def _get_group_create_params(self, **custom):
        defs = dict(
            group_name=None,
            group_description='DESC',
            group_parent_id=None,
            perms_updates=[],
            perms_new=[],
            enable_locking=False,
            recursive=False
        )
        defs.update(custom)

        return defs

    def _get_user_group_create_params(self, name, **custom):
        defs = dict(
            users_group_name=name,
            users_group_active=True,
        )
        defs.update(custom)

        return defs

    def create_repo(self, name, **kwargs):
        if 'skip_if_exists' in kwargs:
            del kwargs['skip_if_exists']
            r = Repository.get_by_repo_name(name)
            if r:
                return r

        if isinstance(kwargs.get('repos_group'), RepoGroup):
            #TODO: rename the repos_group !
            kwargs['repo_group'] = kwargs['repos_group'].group_id
            del kwargs['repos_group']

        form_data = self._get_repo_create_params(repo_name=name, **kwargs)
        cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
        RepoModel().create(form_data, cur_user)
        Session().commit()
        return Repository.get_by_repo_name(name)

    def create_fork(self, repo_to_fork, fork_name, **kwargs):
        repo_to_fork = Repository.get_by_repo_name(repo_to_fork)

        form_data = self._get_repo_create_params(repo_name=fork_name,
                                            fork_parent_id=repo_to_fork,
                                            repo_type=repo_to_fork.repo_type,
                                            **kwargs)
        form_data['update_after_clone'] = False

        #TODO: fix it !!
        form_data['description'] = form_data['repo_description']
        form_data['private'] = form_data['repo_private']
        form_data['landing_rev'] = form_data['repo_landing_rev']

        owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
        RepoModel().create_fork(form_data, cur_user=owner)
        Session().commit()
        r = Repository.get_by_repo_name(fork_name)
        assert r
        return r

    def destroy_repo(self, repo_name):
        RepoModel().delete(repo_name)
        Session().commit()

    def create_group(self, name, **kwargs):
        if 'skip_if_exists' in kwargs:
            del kwargs['skip_if_exists']
            gr = RepoGroup.get_by_group_name(group_name=name)
            if gr:
                return gr
        form_data = self._get_group_create_params(group_name=name, **kwargs)
        owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
        gr = ReposGroupModel().create(group_name=form_data['group_name'],
                                 group_description=form_data['group_name'],
                                 owner=owner, parent=form_data['group_parent_id'])
        Session().commit()
        gr = RepoGroup.get_by_group_name(gr.group_name)
        return gr

    def create_user_group(self, name, **kwargs):
        if 'skip_if_exists' in kwargs:
            del kwargs['skip_if_exists']
            gr = UserGroup.get_by_group_name(group_name=name)
            if gr:
                return gr
        form_data = self._get_user_group_create_params(name, **kwargs)
        owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
        user_group = UserGroupModel().create(name=form_data['users_group_name'],
                        owner=owner, active=form_data['users_group_active'])
        Session().commit()
        user_group = UserGroup.get_by_group_name(user_group.users_group_name)
        return user_group