Changeset - 703d3208424c
[Not reviewed]
Bradley M. Kuhn - 11 years ago 2014-07-03 01:05:41
bkuhn@sfconservancy.org
Rename various strings for tests
5 files changed:
0 comments (0 inline, 0 general)
.travis.yml
Show inline comments
 
language: python
 
python:
 
  - "2.5"
 
  - "2.6"
 
  - "2.7"
 

	
 
env:  
 
  - TEST_DB=sqlite:////tmp/rhodecode_test.sqlite
 
  - TEST_DB=mysql://root@127.0.0.1/rhodecode_test
 
  - TEST_DB=postgresql://postgres@127.0.0.1/rhodecode_test
 
  - TEST_DB=sqlite:////tmp/kallithea_test.sqlite
 
  - TEST_DB=mysql://root@127.0.0.1/kallithea_test
 
  - TEST_DB=postgresql://postgres@127.0.0.1/kallithea_test
 

	
 
services:
 
  - mysql
 
  - postgresql
 

	
 
# command to install dependencies
 
before_script:
 
  - mysql -e 'create database rhodecode_test;'
 
  - psql -c 'create database rhodecode_test;' -U postgres
 
  - mysql -e 'create database kallithea_test;'
 
  - psql -c 'create database kallithea_test;' -U postgres
 
  - git --version
 

	
 
before_install:
 
  - sudo apt-get remove git
 
  - sudo add-apt-repository ppa:pdoes/ppa -y
 
  - sudo apt-get update -y
 
  - sudo apt-get install git -y
 

	
 
install:
 
  - pip install mysql-python psycopg2 mock unittest2
 
  - pip install . --use-mirrors
 

	
 
# command to run tests
 
script: nosetests
 

	
 
notifications:
 
    email:
 
        - marcinkuz@gmail.com
 
    irc: "irc.freenode.org#kallithea"
 

	
 
branches:
 
  only:
 
    - master
docs/contributing.rst
Show inline comments
 
.. _contributing:
 

	
 
=========================
 
Contributing to Kallithea
 
=========================
 

	
 
If you would like to contribute to Kallithea, please contact us, any help is
 
greatly appreciated!
 

	
 
Could I request that you make your source contributions by first forking the
 
Kallithea repository on bitbucket_
 
https://bitbucket.org/conservancy/kallithea and then make your changes to
 
your forked repository. Please post all fixes into **dev** bookmark since your
 
change might be already fixed there and i try to merge all fixes from dev into
 
stable, and not the other way. Finally, when you are finished with your changes,
 
please send us a pull request.
 

	
 
To run Kallithea in a development version you always need to install the latest
 
required libs. Simply clone Kallithea and switch to beta branch::
 

	
 
    hg clone https://kallithea-scm.org/repos/kallithea
 

	
 
after downloading/pulling Kallithea make sure you run::
 

	
 
    python setup.py develop
 

	
 
command to install/verify all required packages, and prepare development
 
enviroment.
 

	
 
There are two files in the directory production.ini and developement.ini copy
 
the `development.ini` file as rc.ini (which is excluded from version controll)
 
and put all your changes like db connection or server port in there.
 

	
 
After finishing your changes make sure all tests passes ok. You can run
 
the testsuite running ``nosetest`` from the project root, or if you use tox
 
run tox for python2.5-2.7 with multiple database test. When using `nosetests`
 
test.ini file is used and by default it uses sqlite for tests, edit this file
 
to change your testing enviroment.
 

	
 

	
 
There's a special set of tests for push/pull operations, you can runn them using::
 

	
 
    paster serve test.ini --pid-file=test.pid --daemon
 
    RC_WHOOSH_TEST_DISABLE=1 RC_NO_TMP_PATH=1 nosetests -x kallithea/tests/other/test_vcs_operations.py
 
    KALLITHEA_WHOOSH_TEST_DISABLE=1 KALLITHEA_NO_TMP_PATH=1 nosetests -x kallithea/tests/other/test_vcs_operations.py
 
    kill -9 $(cat test.pid)
 

	
 

	
 
| Thank you for any contributions!
 

	
 

	
 
.. _bitbucket: http://bitbucket.org/
kallithea/config/environment.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
    Pylons environment configuration
 
"""
 

	
 
import os
 
import logging
 
import kallithea
 
import platform
 

	
 
from mako.lookup import TemplateLookup
 
from pylons.configuration import PylonsConfig
 
from pylons.error import handle_mako_error
 

	
 
# don't remove this import it does magic for celery
 
from kallithea.lib import celerypylons
 

	
 
import kallithea.lib.app_globals as app_globals
 

	
 
from kallithea.config.routing import make_map
 

	
 
from kallithea.lib import helpers
 
from kallithea.lib.auth import set_available_permissions
 
from kallithea.lib.utils import repo2db_mapper, make_ui, set_app_settings,\
 
    load_rcextensions, check_git_version, set_vcs_config
 
from kallithea.lib.utils2 import engine_from_config, str2bool
 
from kallithea.lib.db_manage import DbManage
 
from kallithea.model import init_model
 
from kallithea.model.scm import ScmModel
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def load_environment(global_conf, app_conf, initial=False,
 
                     test_env=None, test_index=None):
 
    """
 
    Configure the Pylons environment via the ``pylons.config``
 
    object
 
    """
 
    config = PylonsConfig()
 

	
 
    # Pylons paths
 
    root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 
    paths = dict(
 
        root=root,
 
        controllers=os.path.join(root, 'controllers'),
 
        static_files=os.path.join(root, 'public'),
 
        templates=[os.path.join(root, 'templates')]
 
    )
 

	
 
    # Initialize config with the basic options
 
    config.init_app(global_conf, app_conf, package='kallithea', paths=paths)
 

	
 
    # store some globals into rhodecode
 
    kallithea.CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
 
    kallithea.CELERY_EAGER = str2bool(config['app_conf'].get('celery.always.eager'))
 

	
 
    config['routes.map'] = make_map(config)
 
    config['pylons.app_globals'] = app_globals.Globals(config)
 
    config['pylons.h'] = helpers
 
    kallithea.CONFIG = config
 

	
 
    load_rcextensions(root_path=config['here'])
 

	
 
    # Setup cache object as early as possible
 
    import pylons
 
    pylons.cache._push_object(config['pylons.app_globals'].cache)
 

	
 
    # Create the Mako TemplateLookup, with the default auto-escaping
 
    config['pylons.app_globals'].mako_lookup = TemplateLookup(
 
        directories=paths['templates'],
 
        error_handler=handle_mako_error,
 
        module_directory=os.path.join(app_conf['cache_dir'], 'templates'),
 
        input_encoding='utf-8', default_filters=['escape'],
 
        imports=['from webhelpers.html import escape'])
 

	
 
    # sets the c attribute access when don't existing attribute are accessed
 
    config['pylons.strict_tmpl_context'] = True
 
    test = os.path.split(config['__file__'])[-1] == 'test.ini'
 
    if test:
 
        if test_env is None:
 
            test_env = not int(os.environ.get('RC_NO_TMP_PATH', 0))
 
            test_env = not int(os.environ.get('KALLITHEA_NO_TMP_PATH', 0))
 
        if test_index is None:
 
            test_index = not int(os.environ.get('RC_WHOOSH_TEST_DISABLE', 0))
 
            test_index = not int(os.environ.get('KALLITHEA_WHOOSH_TEST_DISABLE', 0))
 
        if os.environ.get('TEST_DB'):
 
            # swap config if we pass enviroment variable
 
            config['sqlalchemy.db1.url'] = os.environ.get('TEST_DB')
 

	
 
        from kallithea.lib.utils import create_test_env, create_test_index
 
        from kallithea.tests import TESTS_TMP_PATH
 
        #set RC_NO_TMP_PATH=1 to disable re-creating the database and
 
        #set KALLITHEA_NO_TMP_PATH=1 to disable re-creating the database and
 
        #test repos
 
        if test_env:
 
            create_test_env(TESTS_TMP_PATH, config)
 
        #set RC_WHOOSH_TEST_DISABLE=1 to disable whoosh index during tests
 
        #set KALLITHEA_WHOOSH_TEST_DISABLE=1 to disable whoosh index during tests
 
        if test_index:
 
            create_test_index(TESTS_TMP_PATH, config, True)
 

	
 
    DbManage.check_waitress()
 
    # MULTIPLE DB configs
 
    # Setup the SQLAlchemy database engine
 
    sa_engine_db1 = engine_from_config(config, 'sqlalchemy.db1.')
 
    init_model(sa_engine_db1)
 

	
 
    set_available_permissions(config)
 
    repos_path = make_ui('db').configitems('paths')[0][1]
 
    config['base_path'] = repos_path
 
    set_app_settings(config)
 

	
 
    instance_id = kallithea.CONFIG.get('instance_id')
 
    if instance_id == '*':
 
        instance_id = '%s-%s' % (platform.uname()[1], os.getpid())
 
        kallithea.CONFIG['instance_id'] = instance_id
 

	
 
    # CONFIGURATION OPTIONS HERE (note: all config options will override
 
    # any Pylons config options)
 

	
 
    # store config reference into our module to skip import magic of
 
    # pylons
 
    kallithea.CONFIG.update(config)
 
    set_vcs_config(kallithea.CONFIG)
 

	
 
    #check git version
 
    check_git_version()
 

	
 
    if str2bool(config.get('initial_repo_scan', True)):
 
        repo2db_mapper(ScmModel().repo_scan(repos_path),
 
                       remove_obsolete=False, install_git_hook=False)
 
    return config
kallithea/tests/__init__.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
Pylons application test package
 

	
 
This package assumes the Pylons environment is already loaded, such as
 
when this script is imported from the `nosetests --with-pylons=test.ini`
 
command.
 

	
 
This module initializes the application via ``websetup`` (`paster
 
setup-app`) and provides the base testing objects.
 

	
 
nosetests -x - fail on first error
 
nosetests kallithea.tests.functional.test_admin_settings:TestSettingsController.test_my_account
 
nosetests --pdb --pdb-failures
 
nosetests --with-coverage --cover-package=kallithea.model.validators kallithea.tests.test_validators
 

	
 
optional FLAGS:
 
    RC_WHOOSH_TEST_DISABLE=1 - skip whoosh index building and tests
 
    RC_NO_TMP_PATH=1 - disable new temp path for tests, used mostly for test_vcs_operations
 
    KALLITHEA_WHOOSH_TEST_DISABLE=1 - skip whoosh index building and tests
 
    KALLITHEA_NO_TMP_PATH=1 - disable new temp path for tests, used mostly for test_vcs_operations
 

	
 
"""
 
import os
 
import time
 
import logging
 
import datetime
 
import hashlib
 
import tempfile
 
from os.path import join as jn
 

	
 
from tempfile import _RandomNameSequence
 

	
 
from paste.deploy import loadapp
 
from paste.script.appinstall import SetupCommand
 

	
 
import pylons
 
import pylons.test
 
from pylons import config, url
 
from pylons.i18n.translation import _get_translator
 
from pylons.util import ContextObj
 

	
 
from routes.util import URLGenerator
 
from webtest import TestApp
 
from nose.plugins.skip import SkipTest
 

	
 
from kallithea.lib.compat import unittest
 
from kallithea import is_windows
 
from kallithea.model.meta import Session
 
from kallithea.model.db import User
 
from kallithea.tests.nose_parametrized import parameterized
 
from kallithea.lib.utils2 import safe_unicode, safe_str
 

	
 

	
 
os.environ['TZ'] = 'UTC'
 
if not is_windows:
 
    time.tzset()
 

	
 
log = logging.getLogger(__name__)
 

	
 
__all__ = [
 
    'parameterized', 'environ', 'url', 'get_new_dir', 'TestController',
 
    'SkipTest', 'ldap_lib_installed', 'BaseTestCase', 'init_stack',
 
    'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
 
    'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
 
    'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
 
    'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
 
    'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
 
    'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
 
    'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
 
    'GIT_REMOTE_REPO', 'SCM_TESTS',
 
]
 

	
 
# Invoke websetup with the current config file
 
# SetupCommand('setup-app').run([config_file])
 

	
 
environ = {}
 

	
 
#SOME GLOBALS FOR TESTS
 

	
 
TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
 
TEST_USER_ADMIN_LOGIN = 'test_admin'
 
TEST_USER_ADMIN_PASS = 'test12'
 
TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
 

	
 
TEST_USER_REGULAR_LOGIN = 'test_regular'
 
TEST_USER_REGULAR_PASS = 'test12'
 
TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
 

	
 
TEST_USER_REGULAR2_LOGIN = 'test_regular2'
 
TEST_USER_REGULAR2_PASS = 'test12'
 
TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
 

	
 
HG_REPO = 'vcs_test_hg'
 
GIT_REPO = 'vcs_test_git'
 

	
 
NEW_HG_REPO = 'vcs_test_hg_new'
 
NEW_GIT_REPO = 'vcs_test_git_new'
 

	
 
HG_FORK = 'vcs_test_hg_fork'
 
GIT_FORK = 'vcs_test_git_fork'
 

	
 
## VCS
 
SCM_TESTS = ['hg', 'git']
 
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 

	
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 

	
 
TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
 
TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
 

	
 

	
 
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 

	
 
TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 
TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
 
TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
 

	
 
TEST_DIR = tempfile.gettempdir()
 
TEST_REPO_PREFIX = 'vcs-test'
 

	
 
# cached repos if any !
 
# comment out to get some other repos from bb or github
 
GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 

	
 
#skip ldap tests if LDAP lib is not installed
 
ldap_lib_installed = False
 
try:
 
    import ldap
 
    ldap_lib_installed = True
 
except ImportError:
 
    # means that python-ldap is not installed
 
    pass
 

	
 

	
 
def get_new_dir(title):
 
    """
 
    Returns always new directory path.
 
    """
 
    from kallithea.tests.vcs.utils import get_normalized_path
 
    name = TEST_REPO_PREFIX
 
    if title:
 
        name = '-'.join((name, title))
 
    hex = hashlib.sha1(str(time.time())).hexdigest()
 
    name = '-'.join((name, hex))
 
    path = os.path.join(TEST_DIR, name)
 
    return get_normalized_path(path)
 

	
 

	
 
def init_stack(config=None):
 
    if not config:
 
        config = pylons.test.pylonsapp.config
 
    url._push_object(URLGenerator(config['routes.map'], environ))
 
    pylons.app_globals._push_object(config['pylons.app_globals'])
 
    pylons.config._push_object(config)
 
    pylons.tmpl_context._push_object(ContextObj())
 
    # Initialize a translator for tests that utilize i18n
 
    translator = _get_translator(pylons.config.get('lang'))
 
    pylons.translator._push_object(translator)
 

	
 

	
 
class BaseTestCase(unittest.TestCase):
 
    def __init__(self, *args, **kwargs):
 
        self.wsgiapp = pylons.test.pylonsapp
 
        init_stack(self.wsgiapp.config)
 
        unittest.TestCase.__init__(self, *args, **kwargs)
 

	
 

	
 
class TestController(BaseTestCase):
 

	
 
    def __init__(self, *args, **kwargs):
 
        BaseTestCase.__init__(self, *args, **kwargs)
 
        self.app = TestApp(self.wsgiapp)
 
        self.maxDiff = None
 
        self.index_location = config['app_conf']['index_dir']
 

	
 
    def log_user(self, username=TEST_USER_ADMIN_LOGIN,
 
                 password=TEST_USER_ADMIN_PASS):
 
        self._logged_username = username
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': username,
 
                                  'password': password})
 

	
 
        if 'invalid user name' in response.body:
 
            self.fail('could not login using %s %s' % (username, password))
 

	
 
        self.assertEqual(response.status, '302 Found')
 
        ses = response.session['authuser']
 
        self.assertEqual(ses.get('username'), username)
 
        response = response.follow()
 
        self.assertEqual(ses.get('is_authenticated'), True)
 

	
 
        return response.session['authuser']
 

	
 
    def _get_logged_user(self):
 
        return User.get_by_username(self._logged_username)
 

	
 
    def checkSessionFlash(self, response, msg):
 
        self.assertTrue('flash' in response.session,
 
                        msg='Response session have no flash key' % response.session)
 
        if not msg in response.session['flash'][0][1]:
 
            msg = u'msg `%s` not found in session flash: got `%s` instead' % (
 
                      msg, response.session['flash'][0][1])
 
            self.fail(safe_str(msg))
kallithea/tests/api/api_base.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
tests for api. run with::
 

	
 
    RC_WHOOSH_TEST_DISABLE=1 nosetests --with-coverage --cover-package=kallithea.controllers.api.api -x kallithea/tests/api
 
    KALLITHEA_WHOOSH_TEST_DISABLE=1 nosetests --with-coverage --cover-package=kallithea.controllers.api.api -x kallithea/tests/api
 
"""
 

	
 
from __future__ import with_statement
 
import os
 
import random
 
import mock
 

	
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.lib.compat import json
 
from kallithea.lib.auth import AuthUser
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.meta import Session
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.gist import GistModel
 
from kallithea.model.db import Repository, User, Setting
 
from kallithea.lib.utils2 import time_to_datetime
 

	
 

	
 
API_URL = '/_admin/api'
 
TEST_USER_GROUP = 'test_user_group'
 
TEST_REPO_GROUP = 'test_repo_group'
 

	
 
fixture = Fixture()
 

	
 

	
 
def _build_data(apikey, method, **kw):
 
    """
 
    Builds API data with given random ID
 

	
 
    :param random_id:
 
    """
 
    random_id = random.randrange(1, 9999)
 
    return random_id, json.dumps({
 
        "id": random_id,
 
        "api_key": apikey,
 
        "method": method,
 
        "args": kw
 
    })
 

	
 

	
 
jsonify = lambda obj: json.loads(json.dumps(obj))
 

	
 

	
 
def crash(*args, **kwargs):
 
    raise Exception('Total Crash !')
 

	
 

	
 
def api_call(test_obj, params):
 
    response = test_obj.app.post(API_URL, content_type='application/json',
 
                                 params=params)
 
    return response
 

	
 

	
 
## helpers
 
def make_user_group(name=TEST_USER_GROUP):
 
    gr = fixture.create_user_group(name, cur_user=TEST_USER_ADMIN_LOGIN)
 
    UserGroupModel().add_user_to_group(user_group=gr,
 
                                       user=TEST_USER_ADMIN_LOGIN)
 
    Session().commit()
 
    return gr
 

	
 

	
 
def make_repo_group(name=TEST_REPO_GROUP):
 
    gr = fixture.create_repo_group(name, cur_user=TEST_USER_ADMIN_LOGIN)
 
    Session().commit()
 
    return gr
 

	
 

	
 
class BaseTestApi(object):
 
    REPO = None
 
    REPO_TYPE = None
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        cls.usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
 
        cls.apikey = cls.usr.api_key
 
        cls.test_user = UserModel().create_or_update(
 
            username='test-api',
 
            password='test',
 
            email='test@example.com',
 
            firstname='first',
 
            lastname='last'
 
        )
 
        Session().commit()
 
        cls.TEST_USER_LOGIN = cls.test_user.username
 
        cls.apikey_regular = cls.test_user.api_key
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        pass
 

	
 
    def setUp(self):
 
        self.maxDiff = None
 
        make_user_group()
 
        make_repo_group()
 

	
 
    def tearDown(self):
 
        fixture.destroy_user_group(TEST_USER_GROUP)
 
        fixture.destroy_gists()
 
        fixture.destroy_repo_group(TEST_REPO_GROUP)
 

	
 
    def _compare_ok(self, id_, expected, given):
 
        expected = jsonify({
 
            'id': id_,
 
            'error': None,
 
            'result': expected
 
        })
 
        given = json.loads(given)
 
        self.assertEqual(expected, given)
 

	
 
    def _compare_error(self, id_, expected, given):
 
        expected = jsonify({
 
            'id': id_,
 
            'error': expected,
 
            'result': None
 
        })
 
        given = json.loads(given)
 
        self.assertEqual(expected, given)
 

	
 
    def test_Optional_object(self):
 
        from kallithea.controllers.api.api import Optional
 

	
 
        option1 = Optional(None)
 
        self.assertEqual('<Optional:%s>' % None, repr(option1))
 
        self.assertEqual(option1(), None)
 

	
 
        self.assertEqual(1, Optional.extract(Optional(1)))
 
        self.assertEqual('trololo', Optional.extract('trololo'))
 

	
 
    def test_Optional_OAttr(self):
 
        from kallithea.controllers.api.api import Optional, OAttr
 

	
 
        option1 = Optional(OAttr('apiuser'))
 
        self.assertEqual('apiuser', Optional.extract(option1))
 

	
 
    def test_OAttr_object(self):
 
        from kallithea.controllers.api.api import OAttr
 

	
 
        oattr1 = OAttr('apiuser')
 
        self.assertEqual('<OptionalAttr:apiuser>', repr(oattr1))
 
        self.assertEqual(oattr1(), oattr1)
 

	
 
    def test_api_wrong_key(self):
 
        id_, params = _build_data('trololo', 'get_user')
 
        response = api_call(self, params)
 

	
 
        expected = 'Invalid API KEY'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param_args_null(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        params = params.replace('"args": {}', '"args": null')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param_args_bad(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        params = params.replace('"args": {}', '"args": 1')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_args_is_null(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        params = params.replace('"args": {}', '"args": null')
 
        response = api_call(self, params)
 
        self.assertEqual(response.status, '200 OK')
 

	
 
    def test_api_args_is_bad(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        params = params.replace('"args": {}', '"args": 1')
 
        response = api_call(self, params)
 
        self.assertEqual(response.status, '200 OK')
 

	
 
    def test_api_args_different_args(self):
 
        import string
 
        expected = {
 
            'ascii_letters': string.ascii_letters,
 
            'ws': string.whitespace,
 
            'printables': string.printable
 
        }
 
        id_, params = _build_data(self.apikey, 'test', args=expected)
 
        response = api_call(self, params)
 
        self.assertEqual(response.status, '200 OK')
 
        self._compare_ok(id_, expected, response.body)
 

	
 
    def test_api_get_users(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        response = api_call(self, params)
 
        ret_all = []
 
        _users = User.query().filter(User.username != User.DEFAULT_USER) \
 
            .order_by(User.username).all()
 
        for usr in _users:
 
            ret = usr.get_api_data()
 
            ret_all.append(jsonify(ret))
 
        expected = ret_all
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user(self):
 
        id_, params = _build_data(self.apikey, 'get_user',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(usr.user_id).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_that_does_not_exist(self):
 
        id_, params = _build_data(self.apikey, 'get_user',
 
                                  userid='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "user `%s` does not exist" % 'trololo'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_without_giving_userid(self):
 
        id_, params = _build_data(self.apikey, 'get_user')
 
        response = api_call(self, params)
 

	
 
        usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(usr.user_id).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_without_giving_userid_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_user')
 
        response = api_call(self, params)
 

	
 
        usr = UserModel().get_by_username(self.TEST_USER_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(usr.user_id).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_with_giving_userid_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_user',
 
                                  userid=self.TEST_USER_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = 'userid is not the same as your user'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_pull(self):
 
        repo_name = 'test_pull'
 
        r = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        r.clone_uri = os.path.join(TESTS_TMP_PATH, self.REPO)
 
        Session.add(r)
 
        Session.commit()
 

	
 
        id_, params = _build_data(self.apikey, 'pull',
 
                                  repoid=repo_name,)
 
        response = api_call(self, params)
 

	
 
        expected = {'msg': 'Pulled from `%s`' % repo_name,
 
                    'repository': repo_name}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_pull_error(self):
 
        id_, params = _build_data(self.apikey, 'pull',
 
                                  repoid=self.REPO, )
 
        response = api_call(self, params)
 

	
 
        expected = 'Unable to pull changes from `%s`' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_rescan_repos(self):
 
        id_, params = _build_data(self.apikey, 'rescan_repos')
 
        response = api_call(self, params)
 

	
 
        expected = {'added': [], 'removed': []}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(ScmModel, 'repo_scan', crash)
 
    def test_api_rescann_error(self):
 
        id_, params = _build_data(self.apikey, 'rescan_repos', )
 
        response = api_call(self, params)
 

	
 
        expected = 'Error occurred during rescan repositories action'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_invalidate_cache(self):
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        repo.scm_instance_cached()  # seed cache
 

	
 
        id_, params = _build_data(self.apikey, 'invalidate_cache',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = {
 
            'msg': "Cache for repository `%s` was invalidated" % (self.REPO,),
 
            'repository': self.REPO
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(ScmModel, 'mark_for_invalidation', crash)
 
    def test_api_invalidate_cache_error(self):
 
        id_, params = _build_data(self.apikey, 'invalidate_cache',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = 'Error occurred during cache invalidation action'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_invalidate_cache_regular_user_no_permission(self):
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        repo.scm_instance_cached() # seed cache
 

	
 
        id_, params = _build_data(self.apikey_regular, 'invalidate_cache',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = "repository `%s` does not exist" % (self.REPO,)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_aquire(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  repoid=self.REPO,
 
                                  locked=True)
 
        response = api_call(self, params)
 
        expected = {
 
            'repo': self.REPO, 'locked': True,
 
            'locked_since': response.json['result']['locked_since'],
 
            'locked_by': TEST_USER_ADMIN_LOGIN,
 
            'lock_state_changed': True,
 
            'msg': ('User `%s` set lock state for repo `%s` to `%s`'
 
                    % (TEST_USER_ADMIN_LOGIN, self.REPO, True))
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_aquire_by_non_admin(self):
 
        repo_name = 'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
 
                            cur_user=self.TEST_USER_LOGIN)
 
        try:
 
            id_, params = _build_data(self.apikey_regular, 'lock',
 
                                      repoid=repo_name,
 
                                      locked=True)
 
            response = api_call(self, params)
 
            expected = {
 
                'repo': repo_name,
 
                'locked': True,
 
                'locked_since': response.json['result']['locked_since'],
 
                'locked_by': self.TEST_USER_LOGIN,
 
                'lock_state_changed': True,
 
                'msg': ('User `%s` set lock state for repo `%s` to `%s`'
 
                        % (self.TEST_USER_LOGIN, repo_name, True))
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_lock_repo_lock_aquire_non_admin_with_userid(self):
 
        repo_name = 'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
 
                            cur_user=self.TEST_USER_LOGIN)
 
        try:
 
            id_, params = _build_data(self.apikey_regular, 'lock',
 
                                      userid=TEST_USER_ADMIN_LOGIN,
 
                                      repoid=repo_name,
 
                                      locked=True)
 
            response = api_call(self, params)
 
            expected = 'userid is not the same as your user'
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_lock_repo_lock_aquire_non_admin_not_his_repo(self):
 
        id_, params = _build_data(self.apikey_regular, 'lock',
 
                                  repoid=self.REPO,
 
                                  locked=True)
 
        response = api_call(self, params)
 
        expected = 'repository `%s` does not exist' % (self.REPO)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_release(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  repoid=self.REPO,
 
                                  locked=False)
 
        response = api_call(self, params)
 
        expected = {
 
            'repo': self.REPO,
 
            'locked': False,
 
            'locked_since': None,
 
            'locked_by': TEST_USER_ADMIN_LOGIN,
 
            'lock_state_changed': True,
 
            'msg': ('User `%s` set lock state for repo `%s` to `%s`'
 
                    % (TEST_USER_ADMIN_LOGIN, self.REPO, False))
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_aquire_optional_userid(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  repoid=self.REPO,
 
                                  locked=True)
 
        response = api_call(self, params)
 
        time_ = response.json['result']['locked_since']
 
        expected = {
 
            'repo': self.REPO,
 
            'locked': True,
 
            'locked_since': time_,
 
            'locked_by': TEST_USER_ADMIN_LOGIN,
 
            'lock_state_changed': True,
 
            'msg': ('User `%s` set lock state for repo `%s` to `%s`'
 
                    % (TEST_USER_ADMIN_LOGIN, self.REPO, True))
 
        }
 

	
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_optional_locked(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 
        time_ = response.json['result']['locked_since']
 
        expected = {
 
            'repo': self.REPO,
 
            'locked': True,
 
            'locked_since': time_,
 
            'locked_by': TEST_USER_ADMIN_LOGIN,
 
            'lock_state_changed': False,
 
            'msg': ('Repo `%s` locked by `%s` on `%s`.'
 
                    % (self.REPO, TEST_USER_ADMIN_LOGIN,
 
                       json.dumps(time_to_datetime(time_))))
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_optional_not_locked(self):
 
        repo_name = 'api_not_locked'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
 
                            cur_user=self.TEST_USER_LOGIN)
 
        self.assertEqual(repo.locked, [None, None])
 
        try:
 
            id_, params = _build_data(self.apikey, 'lock',
 
                                      repoid=repo.repo_id)
 
            response = api_call(self, params)
 
            expected = {
 
                'repo': repo_name,
 
                'locked': False,
 
                'locked_since': None,
 
                'locked_by': None,
 
                'lock_state_changed': False,
 
                'msg': ('Repo `%s` not locked.' % (repo_name,))
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    @mock.patch.object(Repository, 'lock', crash)
 
    def test_api_lock_error(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  repoid=self.REPO,
 
                                  locked=True)
 
        response = api_call(self, params)
 

	
 
        expected = 'Error occurred locking repository `%s`' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_locks_regular_user(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_locks')
 
        response = api_call(self, params)
 
        expected = []
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_locks_with_userid_regular_user(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_locks',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 
        expected = 'userid is not the same as your user'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_locks(self):
 
        id_, params = _build_data(self.apikey, 'get_locks')
 
        response = api_call(self, params)
 
        expected = []
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_locks_with_one_locked_repo(self):
 
        repo_name = 'api_delete_me'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
 
                                   cur_user=self.TEST_USER_LOGIN)
 
        Repository.lock(repo, User.get_by_username(self.TEST_USER_LOGIN).user_id)
 
        try:
 
            id_, params = _build_data(self.apikey, 'get_locks')
 
            response = api_call(self, params)
 
            expected = [repo.get_api_data()]
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_get_locks_with_one_locked_repo_for_specific_user(self):
 
        repo_name = 'api_delete_me'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
 
                                   cur_user=self.TEST_USER_LOGIN)
 
        Repository.lock(repo, User.get_by_username(self.TEST_USER_LOGIN).user_id)
 
        try:
 
            id_, params = _build_data(self.apikey, 'get_locks',
 
                                      userid=self.TEST_USER_LOGIN)
 
            response = api_call(self, params)
 
            expected = [repo.get_api_data()]
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_get_locks_with_userid(self):
 
        id_, params = _build_data(self.apikey, 'get_locks',
 
                                  userid=TEST_USER_REGULAR_LOGIN)
 
        response = api_call(self, params)
 
        expected = []
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_create_existing_user(self):
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=TEST_USER_ADMIN_LOGIN,
 
                                  email='test@foo.com',
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "user `%s` already exist" % TEST_USER_ADMIN_LOGIN
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_user_with_existing_email(self):
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=TEST_USER_ADMIN_LOGIN + 'new',
 
                                  email=TEST_USER_REGULAR_EMAIL,
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "email `%s` already exist" % TEST_USER_REGULAR_EMAIL
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_user(self):
 
        username = 'test_new_api_user'
 
        email = username + "@foo.com"
 

	
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=username,
 
                                  email=email,
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        usr = UserModel().get_by_username(username)
 
        ret = dict(
 
            msg='created new user `%s`' % username,
 
            user=jsonify(usr.get_api_data())
 
        )
 

	
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user(usr.user_id)
 

	
 
    def test_api_create_user_without_password(self):
 
        username = 'test_new_api_user_passwordless'
 
        email = username + "@foo.com"
 

	
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=username,
 
                                  email=email)
 
        response = api_call(self, params)
 

	
 
        usr = UserModel().get_by_username(username)
 
        ret = dict(
 
            msg='created new user `%s`' % username,
 
            user=jsonify(usr.get_api_data())
 
        )
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user(usr.user_id)
 

	
 
    def test_api_create_user_with_extern_name(self):
 
        username = 'test_new_api_user_passwordless'
 
        email = username + "@foo.com"
 

	
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=username,
 
                                  email=email, extern_name='rhodecode')
 
        response = api_call(self, params)
 

	
 
        usr = UserModel().get_by_username(username)
 
        ret = dict(
 
            msg='created new user `%s`' % username,
 
            user=jsonify(usr.get_api_data())
 
        )
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user(usr.user_id)
 

	
 
    @mock.patch.object(UserModel, 'create_or_update', crash)
 
    def test_api_create_user_when_exception_happened(self):
 

	
 
        username = 'test_new_api_user'
 
        email = username + "@foo.com"
 

	
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=username,
 
                                  email=email,
 
                                  password='trololo')
 
        response = api_call(self, params)
 
        expected = 'failed to create user `%s`' % username
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_delete_user(self):
 
        usr = UserModel().create_or_update(username=u'test_user',
 
                                           password=u'qweqwe',
 
                                           email=u'u232@example.com',
 
                                           firstname=u'u1', lastname=u'u1')
 
        Session().commit()
 
        username = usr.username
 
        email = usr.email
 
        usr_id = usr.user_id
 
        ## DELETE THIS USER NOW
 

	
 
        id_, params = _build_data(self.apikey, 'delete_user',
 
                                  userid=username, )
 
        response = api_call(self, params)
 

	
 
        ret = {'msg': 'deleted user ID:%s %s' % (usr_id, username),
 
               'user': None}
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserModel, 'delete', crash)
 
    def test_api_delete_user_when_exception_happened(self):
 
        usr = UserModel().create_or_update(username=u'test_user',
 
                                           password=u'qweqwe',
 
                                           email=u'u232@example.com',
 
                                           firstname=u'u1', lastname=u'u1')
 
        Session().commit()
 
        username = usr.username
 

	
 
        id_, params = _build_data(self.apikey, 'delete_user',
 
                                  userid=username, )
 
        response = api_call(self, params)
 
        ret = 'failed to delete user ID:%s %s' % (usr.user_id,
 
                                                  usr.username)
 
        expected = ret
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([('firstname', 'new_username'),
 
                           ('lastname', 'new_username'),
 
                           ('email', 'new_username'),
 
                           ('admin', True),
 
                           ('admin', False),
 
                           ('extern_type', 'ldap'),
 
                           ('extern_type', None),
 
                           ('extern_name', 'test'),
 
                           ('extern_name', None),
 
                           ('active', False),
 
                           ('active', True),
 
                           ('password', 'newpass')
 
    ])
 
    def test_api_update_user(self, name, expected):
 
        usr = UserModel().get_by_username(self.TEST_USER_LOGIN)
 
        kw = {name: expected,
 
              'userid': usr.user_id}
 
        id_, params = _build_data(self.apikey, 'update_user', **kw)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'updated user ID:%s %s' % (
 
                usr.user_id, self.TEST_USER_LOGIN),
 
            'user': jsonify(UserModel() \
 
                .get_by_username(self.TEST_USER_LOGIN) \
 
                .get_api_data())
 
        }
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_update_user_no_changed_params(self):
 
        usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = jsonify(usr.get_api_data())
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 

	
 
        response = api_call(self, params)
 
        ret = {
 
            'msg': 'updated user ID:%s %s' % (
 
                usr.user_id, TEST_USER_ADMIN_LOGIN),
 
            'user': ret
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_update_user_by_user_id(self):
 
        usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = jsonify(usr.get_api_data())
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=usr.user_id)
 

	
 
        response = api_call(self, params)
 
        ret = {
 
            'msg': 'updated user ID:%s %s' % (
 
                usr.user_id, TEST_USER_ADMIN_LOGIN),
 
            'user': ret
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_update_user_default_user(self):
 
        usr = User.get_default_user()
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=usr.user_id)
 

	
 
        response = api_call(self, params)
 
        expected = 'editing default user is forbidden'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserModel, 'update_user', crash)
 
    def test_api_update_user_when_exception_happens(self):
 
        usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = jsonify(usr.get_api_data())
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=usr.user_id)
 

	
 
        response = api_call(self, params)
 
        ret = 'failed to update user `%s`' % usr.user_id
 

	
 
        expected = ret
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo(self):
 
        new_group = 'some_new_group'
 
        make_user_group(new_group)
 
        RepoModel().grant_user_group_permission(repo=self.REPO,
 
                                                group_name=new_group,
 
                                                perm='repository.read')
 
        Session().commit()
 
        id_, params = _build_data(self.apikey, 'get_repo',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        ret = repo.get_api_data()
 

	
 
        members = []
 
        followers = []
 
        for user in repo.repo_to_perm:
 
            perm = user.permission.permission_name
 
            user = user.user
 
            user_data = {'name': user.username, 'type': "user",
 
                         'permission': perm}
 
            members.append(user_data)
 

	
 
        for user_group in repo.users_group_to_perm:
 
            perm = user_group.permission.permission_name
 
            user_group = user_group.users_group
 
            user_group_data = {'name': user_group.users_group_name,
 
                               'type': "user_group", 'permission': perm}
 
            members.append(user_group_data)
 

	
 
        for user in repo.followers:
 
            followers.append(user.user.get_api_data())
 

	
 
        ret['members'] = members
 
        ret['followers'] = followers
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_user_group(new_group)
 

	
 
    @parameterized.expand([
 
        ('repository.admin',),
 
        ('repository.write',),
 
        ('repository.read',),
 
    ])
 
    def test_api_get_repo_by_non_admin(self, grant_perm):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm=grant_perm)
 
        Session().commit()
 
        id_, params = _build_data(self.apikey_regular, 'get_repo',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        ret = repo.get_api_data()
 

	
 
        members = []
 
        followers = []
 
        self.assertEqual(2, len(repo.repo_to_perm))
 
        for user in repo.repo_to_perm:
 
            perm = user.permission.permission_name
 
            user_obj = user.user
 
            user_data = {'name': user_obj.username, 'type': "user",
 
                         'permission': perm}
 
            members.append(user_data)
 

	
 
        for user_group in repo.users_group_to_perm:
 
            perm = user_group.permission.permission_name
 
            user_group_obj = user_group.users_group
 
            user_group_data = {'name': user_group_obj.users_group_name,
 
                               'type': "user_group", 'permission': perm}
 
            members.append(user_group_data)
 

	
 
        for user in repo.followers:
 
            followers.append(user.user.get_api_data())
 

	
 
        ret['members'] = members
 
        ret['followers'] = followers
 

	
 
        expected = ret
 
        try:
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
 

	
 
    def test_api_get_repo_by_non_admin_no_permission_to_repo(self):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.none')
 

	
 
        id_, params = _build_data(self.apikey_regular, 'get_repo',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = 'repository `%s` does not exist' % (self.REPO)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_that_doesn_not_exist(self):
 
        id_, params = _build_data(self.apikey, 'get_repo',
 
                                  repoid='no-such-repo')
 
        response = api_call(self, params)
 

	
 
        ret = 'repository `%s` does not exist' % 'no-such-repo'
 
        expected = ret
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repos(self):
 
        id_, params = _build_data(self.apikey, 'get_repos')
 
        response = api_call(self, params)
 

	
 
        result = []
 
        for repo in RepoModel().get_all():
 
            result.append(repo.get_api_data())
 
        ret = jsonify(result)
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_repos_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_repos')
 
        response = api_call(self, params)
 

	
 
        result = []
 
        for repo in RepoModel().get_all_user_repos(self.TEST_USER_LOGIN):
 
            result.append(repo.get_api_data())
 
        ret = jsonify(result)
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([('all', 'all'),
 
                           ('dirs', 'dirs'),
 
                           ('files', 'files'), ])
 
    def test_api_get_repo_nodes(self, name, ret_type):
 
        rev = 'tip'
 
        path = '/'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        # we don't the actual return types here since it's tested somewhere
 
        # else
 
        expected = response.json['result']
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_revisions(self):
 
        rev = 'i-dont-exist'
 
        path = '/'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path, )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to get repo: `%s` nodes' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_path(self):
 
        rev = 'tip'
 
        path = '/idontexits'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path, )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to get repo: `%s` nodes' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_ret_type(self):
 
        rev = 'tip'
 
        path = '/'
 
        ret_type = 'error'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        expected = ('ret_type must be one of %s'
 
                    % (','.join(['files', 'dirs', 'all'])))
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([('all', 'all', 'repository.write'),
 
                           ('dirs', 'dirs', 'repository.admin'),
 
                           ('files', 'files', 'repository.read'), ])
 
    def test_api_get_repo_nodes_by_regular_user(self, name, ret_type, grant_perm):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm=grant_perm)
 
        Session().commit()
 

	
 
        rev = 'tip'
 
        path = '/'
 
        id_, params = _build_data(self.apikey_regular, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        # we don't the actual return types here since it's tested somewhere
 
        # else
 
        expected = response.json['result']
 
        try:
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
 

	
 
    def test_api_create_repo(self):
 
        repo_name = 'api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        self.assertNotEqual(repo, None)
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_in_group(self):
 
        repo_name = 'my_gr/api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        print params
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        self.assertNotEqual(repo, None)
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 
        fixture.destroy_repo_group('my_gr')
 

	
 
    def test_api_create_repo_unknown_owner(self):
 
        repo_name = 'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=owner,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 
        expected = 'user `%s` does not exist' % owner
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_repo_dont_specify_owner(self):
 
        repo_name = 'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        self.assertNotEqual(repo, None)
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_by_non_admin(self):
 
        repo_name = 'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey_regular, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        self.assertNotEqual(repo, None)
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_by_non_admin_specify_owner(self):
 
        repo_name = 'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey_regular, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
                                  owner=owner)
 
        response = api_call(self, params)
 

	
 
        expected = 'Only RhodeCode admin can specify `owner` param'
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_exists(self):
 
        repo_name = self.REPO
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = "repo `%s` already exist" % repo_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'create', crash)
 
    def test_api_create_repo_exception_occurred(self):
 
        repo_name = 'api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = 'failed to create repository `%s`' % repo_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([
 
        ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
 
        ('description', {'description': 'new description'}),
 
        ('active', {'active': True}),
 
        ('active', {'active': False}),
 
        ('clone_uri', {'clone_uri': 'http://foo.com/repo'}),
 
        ('clone_uri', {'clone_uri': None}),
 
        ('landing_rev', {'landing_rev': 'branch:master'}),
 
        ('enable_statistics', {'enable_statistics': True}),
 
        ('enable_locking', {'enable_locking': True}),
 
        ('enable_downloads', {'enable_downloads': True}),
 
        ('name', {'name': 'new_repo_name'}),
 
        ('repo_group', {'group': 'test_group_for_update'}),
 
    ])
 
    def test_api_update_repo(self, changing_attr, updates):
 
        repo_name = 'api_update_me'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        if changing_attr == 'repo_group':
 
            fixture.create_repo_group(updates['group'])
 

	
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        if changing_attr == 'name':
 
            repo_name = updates['name']
 
        if changing_attr == 'repo_group':
 
            repo_name = '/'.join([updates['group'], repo_name])
 
        try:
 
            expected = {
 
                'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
 
                'repository': repo.get_api_data()
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            if changing_attr == 'repo_group':
 
                fixture.destroy_repo_group(updates['group'])
 

	
 
    def test_api_update_repo_repo_group_does_not_exist(self):
 
        repo_name = 'admin_owned'
 
        fixture.create_repo(repo_name)
 
        updates = {'group': 'test_group_for_update'}
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'repository group `%s` does not exist' % updates['group']
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_update_repo_regular_user_not_allowed(self):
 
        repo_name = 'admin_owned'
 
        fixture.create_repo(repo_name)
 
        updates = {'active': False}
 
        id_, params = _build_data(self.apikey_regular, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'repository `%s` does not exist' % repo_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    @mock.patch.object(RepoModel, 'update', crash)
 
    def test_api_update_repo_exception_occured(self):
 
        repo_name = 'api_update_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, owner=TEST_USER_ADMIN_LOGIN,)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'failed to update repo `%s`' % repo_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo(self):
 
        repo_name = 'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 

	
 
        id_, params = _build_data(self.apikey, 'delete_repo',
 
                                  repoid=repo_name, )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Deleted repository `%s`' % repo_name,
 
            'success': True
 
        }
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo_by_non_admin(self):
 
        repo_name = 'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
 
                            cur_user=self.TEST_USER_LOGIN)
 
        id_, params = _build_data(self.apikey_regular, 'delete_repo',
 
                                  repoid=repo_name, )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Deleted repository `%s`' % repo_name,
 
            'success': True
 
        }
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo_by_non_admin_no_permission(self):
 
        repo_name = 'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        try:
 
            id_, params = _build_data(self.apikey_regular, 'delete_repo',
 
                                      repoid=repo_name, )
 
            response = api_call(self, params)
 
            expected = 'repository `%s` does not exist' % (repo_name)
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo_exception_occurred(self):
 
        repo_name = 'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        try:
 
            with mock.patch.object(RepoModel, 'delete', crash):
 
                id_, params = _build_data(self.apikey, 'delete_repo',
 
                                          repoid=repo_name, )
 
                response = api_call(self, params)
 

	
 
                expected = 'failed to delete repository `%s`' % repo_name
 
                self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_fork_repo(self):
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
 
                                                     fork_name),
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_non_admin(self):
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
        )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
 
                                                     fork_name),
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_non_admin_specify_owner(self):
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 
        expected = 'Only RhodeCode admin can specify `owner` param'
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_non_admin_no_permission_to_fork(self):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.none')
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
        )
 
        response = api_call(self, params)
 
        expected = 'repository `%s` does not exist' % (self.REPO)
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_unknown_owner(self):
 
        fork_name = 'api-repo-fork'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=owner,
 
        )
 
        response = api_call(self, params)
 
        expected = 'user `%s` does not exist' % owner
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_fork_repo_fork_exists(self):
 
        fork_name = 'api-repo-fork'
 
        fixture.create_fork(self.REPO, fork_name)
 

	
 
        try:
 
            fork_name = 'api-repo-fork'
 

	
 
            id_, params = _build_data(self.apikey, 'fork_repo',
 
                                      repoid=self.REPO,
 
                                      fork_name=fork_name,
 
                                      owner=TEST_USER_ADMIN_LOGIN,
 
            )
 
            response = api_call(self, params)
 

	
 
            expected = "fork `%s` already exist" % fork_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_repo_exists(self):
 
        fork_name = self.REPO
 

	
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        expected = "repo `%s` already exist" % fork_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'create_fork', crash)
 
    def test_api_fork_repo_exception_occurred(self):
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to fork repository `%s` as `%s`' % (self.REPO,
 
                                                               fork_name)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_group(self):
 
        id_, params = _build_data(self.apikey, 'get_user_group',
 
                                  usergroupid=TEST_USER_GROUP)
 
        response = api_call(self, params)
 

	
 
        user_group = UserGroupModel().get_group(TEST_USER_GROUP)
 
        members = []
 
        for user in user_group.members:
 
            user = user.user
 
            members.append(user.get_api_data())
 

	
 
        ret = user_group.get_api_data()
 
        ret['members'] = members
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_groups(self):
 
        gr_name = 'test_user_group2'
 
        make_user_group(gr_name)
 

	
 
        id_, params = _build_data(self.apikey, 'get_user_groups', )
 
        response = api_call(self, params)
 

	
 
        try:
 
            expected = []
 
            for gr_name in [TEST_USER_GROUP, 'test_user_group2']:
 
                user_group = UserGroupModel().get_group(gr_name)
 
                ret = user_group.get_api_data()
 
                expected.append(ret)
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_create_user_group(self):
 
        group_name = 'some_new_group'
 
        id_, params = _build_data(self.apikey, 'create_user_group',
 
                                  group_name=group_name)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'created new user group `%s`' % group_name,
 
            'user_group': jsonify(UserGroupModel() \
 
                .get_by_name(group_name) \
 
                .get_api_data())
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        fixture.destroy_user_group(group_name)
 

	
 
    def test_api_get_user_group_that_exist(self):
 
        id_, params = _build_data(self.apikey, 'create_user_group',
 
                                  group_name=TEST_USER_GROUP)
 
        response = api_call(self, params)
 

	
 
        expected = "user group `%s` already exist" % TEST_USER_GROUP
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserGroupModel, 'create', crash)
 
    def test_api_get_user_group_exception_occurred(self):
 
        group_name = 'exception_happens'
 
        id_, params = _build_data(self.apikey, 'create_user_group',
 
                                  group_name=group_name)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to create group `%s`' % group_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([('group_name', {'group_name': 'new_group_name'}),
 
                           ('group_name', {'group_name': 'test_group_for_update'}),
 
                           ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
 
                           ('active', {'active': False}),
 
                           ('active', {'active': True})])
 
    def test_api_update_user_group(self, changing_attr, updates):
 
        gr_name = 'test_group_for_update'
 
        user_group = fixture.create_user_group(gr_name)
 
        id_, params = _build_data(self.apikey, 'update_user_group',
 
                                  usergroupid=gr_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = {
 
               'msg': 'updated user group ID:%s %s' % (user_group.users_group_id,
 
                                                     user_group.users_group_name),
 
               'user_group': user_group.get_api_data()
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            if changing_attr == 'group_name':
 
                # switch to updated name for proper cleanup
 
                gr_name = updates['group_name']
 
            fixture.destroy_user_group(gr_name)
 

	
 
    @mock.patch.object(UserGroupModel, 'update', crash)
 
    def test_api_update_user_group_exception_occured(self):
 
        gr_name = 'test_group'
 
        fixture.create_user_group(gr_name)
 
        id_, params = _build_data(self.apikey, 'update_user_group',
 
                                  usergroupid=gr_name)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'failed to update user group `%s`' % gr_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_add_user_to_user_group(self):
 
        gr_name = 'test_group'
 
        fixture.create_user_group(gr_name)
 
        id_, params = _build_data(self.apikey, 'add_user_to_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 
        try:
 
            expected = {
 
            'msg': 'added member `%s` to user group `%s`' % (
 
                    TEST_USER_ADMIN_LOGIN, gr_name),
 
            'success': True
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_add_user_to_user_group_that_doesnt_exist(self):
 
        id_, params = _build_data(self.apikey, 'add_user_to_user_group',
 
                                  usergroupid='false-group',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = 'user group `%s` does not exist' % 'false-group'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserGroupModel, 'add_user_to_group', crash)
 
    def test_api_add_user_to_user_group_exception_occurred(self):
 
        gr_name = 'test_group'
 
        fixture.create_user_group(gr_name)
 
        id_, params = _build_data(self.apikey, 'add_user_to_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        try:
 
            expected = 'failed to add member to user group `%s`' % gr_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_remove_user_from_user_group(self):
 
        gr_name = 'test_group_3'
 
        gr = fixture.create_user_group(gr_name)
 
        UserGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 
        id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        try:
 
            expected = {
 
                'msg': 'removed member `%s` from user group `%s`' % (
 
                    TEST_USER_ADMIN_LOGIN, gr_name
 
                ),
 
                'success': True}
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    @mock.patch.object(UserGroupModel, 'remove_user_from_group', crash)
 
    def test_api_remove_user_from_user_group_exception_occurred(self):
 
        gr_name = 'test_group_3'
 
        gr = fixture.create_user_group(gr_name)
 
        UserGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 
        id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'failed to remove member from user group `%s`' % gr_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_delete_user_group(self):
 
        gr_name = 'test_group'
 
        ugroup = fixture.create_user_group(gr_name)
 
        gr_id = ugroup.users_group_id
 
        id_, params = _build_data(self.apikey, 'delete_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	

Changeset was too big and was cut off... Show full diff anyway

0 comments (0 inline, 0 general)