Changeset - e2d7830a0438
[Not reviewed]
default
0 3 0
Mads Kiilerich - 3 years ago 2023-03-05 12:02:55
mads@kiilerich.com
ruff: Do not assign a `lambda` expression, use a `def`
3 files changed with 9 insertions and 9 deletions:
0 comments (0 inline, 0 general)
kallithea/tests/api/api_base.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
Tests for the JSON-RPC web api.
 
"""
 

	
 
import os
 
import random
 
import re
 
import string
 
from typing import Sized
 

	
 
import mock
 
import pytest
 
from webtest import TestApp
 

	
 
from kallithea.lib import ext_json
 
from kallithea.lib.auth import AuthUser
 
from kallithea.lib.utils2 import ascii_bytes
 
from kallithea.model import db, meta
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.gist import GistModel
 
from kallithea.model.pull_request import PullRequestModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture, raise_exception
 

	
 

	
 
API_URL = '/_admin/api'
 
TEST_USER_GROUP = 'test_user_group'
 
TEST_REPO_GROUP = 'test_repo_group'
 

	
 
fixture = Fixture()
 

	
 

	
 
def _build_data(apikey, method, **kw):
 
    """
 
    Builds API data with given random ID
 
    For convenience, the json is returned as str
 
    """
 
    random_id = random.randrange(1, 9999)
 
    return random_id, ext_json.dumps({
 
        "id": random_id,
 
        "api_key": apikey,
 
        "method": method,
 
        "args": kw
 
    })
 

	
 

	
 
jsonify = lambda obj: ext_json.loads(ext_json.dumps(obj))
 
def jsonify(obj):
 
    return ext_json.loads(ext_json.dumps(obj))
 

	
 

	
 
def api_call(test_obj, params):
 
    response = test_obj.app.post(API_URL, content_type='application/json',
 
                                 params=params)
 
    return response
 

	
 

	
 
## helpers
 
def make_user_group(name=TEST_USER_GROUP):
 
    gr = fixture.create_user_group(name, cur_user=base.TEST_USER_ADMIN_LOGIN)
 
    UserGroupModel().add_user_to_group(user_group=gr,
 
                                       user=base.TEST_USER_ADMIN_LOGIN)
 
    meta.Session().commit()
 
    return gr
 

	
 

	
 
def make_repo_group(name=TEST_REPO_GROUP):
 
    gr = fixture.create_repo_group(name, cur_user=base.TEST_USER_ADMIN_LOGIN)
 
    meta.Session().commit()
 
    return gr
 

	
 

	
 
class _BaseTestApi(object):
 
    app: TestApp  # assigned by app_fixture in subclass TestController mixin
 
    # assigned in subclass:
 
    REPO: str
 
    REPO_TYPE: str
 
    TEST_REVISION: str
 
    TEST_PR_SRC: str
 
    TEST_PR_DST: str
 
    TEST_PR_REVISIONS: Sized
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        cls.usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
 
        cls.apikey = cls.usr.api_key
 
        cls.test_user = UserModel().create_or_update(
 
            username='test-api',
 
            password='test',
 
            email='test@example.com',
 
            firstname='first',
 
            lastname='last'
 
        )
 
        meta.Session().commit()
 
        cls.TEST_USER_LOGIN = cls.test_user.username
 
        cls.apikey_regular = cls.test_user.api_key
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        pass
 

	
 
    def setup_method(self, method):
 
        make_user_group()
 
        make_repo_group()
 

	
 
    def teardown_method(self, method):
 
        fixture.destroy_user_group(TEST_USER_GROUP)
 
        fixture.destroy_gists()
 
        fixture.destroy_repo_group(TEST_REPO_GROUP)
 

	
 
    def _compare_ok(self, id_, expected, given):
 
        expected = jsonify({
 
            'id': id_,
 
            'error': None,
 
            'result': expected
 
        })
 
        given = ext_json.loads(given)
 
        assert expected == given, (expected, given)
 

	
 
    def _compare_error(self, id_, expected, given):
 
        expected = jsonify({
 
            'id': id_,
 
            'error': expected,
 
            'result': None
 
        })
 
        given = ext_json.loads(given)
 
        assert expected == given, (expected, given)
 

	
 
    def test_api_wrong_key(self):
 
        id_, params = _build_data('trololo', 'get_user')
 
        response = api_call(self, params)
 

	
 
        expected = 'Invalid API key'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param_args_null(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        params = params.replace('"args": {}', '"args": null')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param_args_bad(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        params = params.replace('"args": {}', '"args": 1')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_args_is_null(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        params = params.replace('"args": {}', '"args": null')
 
        response = api_call(self, params)
 
        assert response.status == '200 OK'
 

	
 
    def test_api_args_is_bad(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        params = params.replace('"args": {}', '"args": 1')
 
        response = api_call(self, params)
 
        assert response.status == '200 OK'
 

	
 
    def test_api_args_different_args(self):
 
        expected = {
 
            'ascii_letters': string.ascii_letters,
 
            'ws': string.whitespace,
 
            'printables': string.printable
 
        }
 
        id_, params = _build_data(self.apikey, 'test', args=expected)
 
        response = api_call(self, params)
 
        assert response.status == '200 OK'
 
        self._compare_ok(id_, expected, response.body)
 

	
 
    def test_api_get_users(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        response = api_call(self, params)
 
        ret_all = []
 
        _users = db.User.query().filter_by(is_default_user=False) \
 
            .order_by(db.User.username).all()
 
        for usr in _users:
 
            ret = usr.get_api_data()
 
            ret_all.append(jsonify(ret))
 
        expected = ret_all
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user(self):
 
        id_, params = _build_data(self.apikey, 'get_user',
 
                                  userid=base.TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(dbuser=usr).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_that_does_not_exist(self):
 
        id_, params = _build_data(self.apikey, 'get_user',
 
                                  userid='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "user `%s` does not exist" % 'trololo'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_without_giving_userid(self):
 
        id_, params = _build_data(self.apikey, 'get_user')
 
        response = api_call(self, params)
 

	
 
        usr = db.User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(dbuser=usr).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_without_giving_userid_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_user')
 
        response = api_call(self, params)
 

	
 
        usr = db.User.get_by_username(self.TEST_USER_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(dbuser=usr).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_with_giving_userid_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_user',
 
                                  userid=self.TEST_USER_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = 'userid is not the same as your user'
kallithea/tests/other/test_libs.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.tests.other.test_libs
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Package for testing various lib/helper functions in kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 9, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import datetime
 
import hashlib
 
import re
 

	
 
import mock
 
import routes
 
from dateutil import relativedelta
 
from tg import request
 
from tg.util.webtest import test_context
 

	
 
import kallithea.lib.helpers as h
 
from kallithea.lib import webutils
 
from kallithea.lib.utils2 import AttributeDict, get_clone_url, safe_bytes
 
from kallithea.model import db
 
from kallithea.tests import base
 

	
 

	
 
proto = 'http'
 
TEST_URLS = [
 
    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
 
     '%s://127.0.0.1:8080' % proto),
 
    ('%s://example.com' % proto, ['%s://' % proto, 'example.com'],
 
     '%s://example.com' % proto),
 
    ('%s://user:pass@example.com:8080' % proto, ['%s://' % proto, 'example.com',
 
                                                '8080'],
 
     '%s://example.com:8080' % proto),
 
]
 

	
 
proto = 'https'
 
TEST_URLS += [
 
    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
 
     '%s://127.0.0.1:8080' % proto),
 
    ('%s://example.com' % proto, ['%s://' % proto, 'example.com'],
 
     '%s://example.com' % proto),
 
    ('%s://user:pass@example.com:8080' % proto, ['%s://' % proto, 'example.com',
 
                                                '8080'],
 
     '%s://example.com:8080' % proto),
 
]
 

	
 

	
 
class TestLibs(base.TestController):
 

	
 
    @base.parametrize('test_url,expected,expected_creds', TEST_URLS)
 
    def test_uri_filter(self, test_url, expected, expected_creds):
 
        from kallithea.lib.utils2 import uri_filter
 
        assert uri_filter(test_url) == expected
 

	
 
    @base.parametrize('test_url,expected,expected_creds', TEST_URLS)
 
    def test_credentials_filter(self, test_url, expected, expected_creds):
 
        from kallithea.lib.utils2 import credentials_filter
 
        assert credentials_filter(test_url) == expected_creds
 

	
 
    @base.parametrize('str_bool,expected', [
 
                           ('t', True),
 
                           ('true', True),
 
                           ('y', True),
 
                           ('yes', True),
 
                           ('on', True),
 
                           ('1', True),
 
                           ('Y', True),
 
                           ('yeS', True),
 
                           ('Y', True),
 
                           ('TRUE', True),
 
                           ('T', True),
 
                           ('False', False),
 
                           ('F', False),
 
                           ('FALSE', False),
 
                           ('0', False),
 
    ])
 
    def test_asbool(self, str_bool, expected):
 
        from kallithea.lib.utils2 import asbool
 
        assert asbool(str_bool) == expected
 

	
 
    def test_mention_extractor(self):
 
        sample = (
 
            "@first hi there @world here's my email username@example.com "
 
            "@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three "
 
            "@UPPER    @cAmEL @2one_more22 @john please see this http://org.pl "
 
            "@marian.user just do it @marco-polo and next extract @marco_polo "
 
            "user.dot  hej ! not-needed maril@example.com"
 
        )
 

	
 
        expected = set([
 
            '2one_more22', 'first', 'lukaszb', 'one', 'one_more22', 'UPPER', 'cAmEL', 'john',
 
            'marian.user', 'marco-polo', 'marco_polo', 'world'])
 
        assert expected == set(webutils.extract_mentioned_usernames(sample))
 

	
 
    @base.parametrize('age_args,expected', [
 
        (dict(), 'just now'),
 
        (dict(seconds= -1), '1 second ago'),
 
        (dict(seconds= -60 * 2), '2 minutes ago'),
 
        (dict(hours= -1), '1 hour ago'),
 
        (dict(hours= -24), '1 day ago'),
 
        (dict(hours= -24 * 5), '5 days ago'),
 
        (dict(months= -1), '1 month ago'),
 
        (dict(months= -1, days= -2), '1 month and 2 days ago'),
 
        (dict(months= -1, days= -20), '1 month and 19 days ago'),
 
        (dict(years= -1, months= -1), '1 year and 1 month ago'),
 
        (dict(years= -1, months= -10), '1 year and 10 months ago'),
 
        (dict(years= -2, months= -4), '2 years and 4 months ago'),
 
        (dict(years= -2, months= -11), '2 years and 11 months ago'),
 
        (dict(years= -3, months= -2), '3 years and 2 months ago'),
 
    ])
 
    def test_age(self, age_args, expected):
 
        from kallithea.lib.webutils import age
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), now=n) == expected
 
            assert age(n + relativedelta.relativedelta(**age_args), now=n) == expected
 

	
 
    @base.parametrize('age_args,expected', [
 
        (dict(), 'just now'),
 
        (dict(seconds= -1), '1 second ago'),
 
        (dict(seconds= -60 * 2), '2 minutes ago'),
 
        (dict(hours= -1), '1 hour ago'),
 
        (dict(hours= -24), '1 day ago'),
 
        (dict(hours= -24 * 5), '5 days ago'),
 
        (dict(months= -1), '1 month ago'),
 
        (dict(months= -1, days= -2), '1 month ago'),
 
        (dict(months= -1, days= -20), '1 month ago'),
 
        (dict(years= -1, months= -1), '13 months ago'),
 
        (dict(years= -1, months= -10), '22 months ago'),
 
        (dict(years= -2, months= -4), '2 years ago'),
 
        (dict(years= -2, months= -11), '3 years ago'),
 
        (dict(years= -3, months= -2), '3 years ago'),
 
        (dict(years= -4, months= -8), '5 years ago'),
 
    ])
 
    def test_age_short(self, age_args, expected):
 
        from kallithea.lib.webutils import age
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), show_short_version=True, now=n) == expected
 
            assert age(n + relativedelta.relativedelta(**age_args), show_short_version=True, now=n) == expected
 

	
 
    @base.parametrize('age_args,expected', [
 
        (dict(), 'just now'),
 
        (dict(seconds=1), 'in 1 second'),
 
        (dict(seconds=60 * 2), 'in 2 minutes'),
 
        (dict(hours=1), 'in 1 hour'),
 
        (dict(hours=24), 'in 1 day'),
 
        (dict(hours=24 * 5), 'in 5 days'),
 
        (dict(months=1), 'in 1 month'),
 
        (dict(months=1, days=1), 'in 1 month and 1 day'),
 
        (dict(years=1, months=1), 'in 1 year and 1 month')
 
    ])
 
    def test_age_in_future(self, age_args, expected):
 
        from kallithea.lib.webutils import age
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), now=n) == expected
 
            assert age(n + relativedelta.relativedelta(**age_args), now=n) == expected
 

	
 
    def test_tag_extractor(self):
 
        sample = (
 
            "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]"
 
            "[requires] [stale] [see<>=>] [see => http://example.com]"
 
            "[requires => url] [lang => python] [just a tag]"
 
            "[,d] [ => ULR ] [obsolete] [desc]]"
 
        )
 
        res = webutils.urlify_text(sample, stylize=True)
 
        assert '<div class="label label-meta" data-tag="tag">tag</div>' in res
 
        assert '<div class="label label-meta" data-tag="obsolete">obsolete</div>' in res
 
        assert '<div class="label label-meta" data-tag="stale">stale</div>' in res
 
        assert '<div class="label label-meta" data-tag="lang">python</div>' in res
 
        assert '<div class="label label-meta" data-tag="requires">requires =&gt; <a href="/url">url</a></div>' in res
 
        assert '<div class="label label-meta" data-tag="tag">tag</div>' in res
 

	
 
    def test_alternative_gravatar(self):
 
        _md5 = lambda s: hashlib.md5(safe_bytes(s)).hexdigest()
 
        def _md5(s):
 
            return hashlib.md5(safe_bytes(s)).hexdigest()
 

	
 
        # mock tg.tmpl_context
 
        def fake_tmpl_context(_url):
 
            _c = AttributeDict()
 
            _c.visual = AttributeDict()
 
            _c.visual.use_gravatar = True
 
            _c.visual.gravatar_url = _url
 

	
 
            return _c
 

	
 
        with mock.patch('kallithea.lib.webutils.url.current', lambda *a, **b: 'https://example.com'):
 
            fake = fake_tmpl_context(_url='http://example.com/{email}')
 
            with mock.patch('kallithea.lib.helpers.c', fake):
 
                    assert webutils.url.current() == 'https://example.com'
 
                    grav = h.gravatar_url(email_address='test@example.com', size=24)
 
                    assert grav == 'http://example.com/test@example.com'
 

	
 
            fake = fake_tmpl_context(_url='http://example.com/{email}')
 
            with mock.patch('kallithea.lib.helpers.c', fake):
 
                grav = h.gravatar_url(email_address='test@example.com', size=24)
 
                assert grav == 'http://example.com/test@example.com'
 

	
 
            fake = fake_tmpl_context(_url='http://example.com/{md5email}')
 
            with mock.patch('kallithea.lib.helpers.c', fake):
 
                em = 'test@example.com'
 
                grav = h.gravatar_url(email_address=em, size=24)
 
                assert grav == 'http://example.com/%s' % (_md5(em))
 

	
 
            fake = fake_tmpl_context(_url='http://example.com/{md5email}/{size}')
 
            with mock.patch('kallithea.lib.helpers.c', fake):
 
                em = 'test@example.com'
 
                grav = h.gravatar_url(email_address=em, size=24)
 
                assert grav == 'http://example.com/%s/%s' % (_md5(em), 24)
 

	
 
            fake = fake_tmpl_context(_url='{scheme}://{netloc}/{md5email}/{size}')
 
            with mock.patch('kallithea.lib.helpers.c', fake):
 
                em = 'test@example.com'
 
                grav = h.gravatar_url(email_address=em, size=24)
 
                assert grav == 'https://example.com/%s/%s' % (_md5(em), 24)
 

	
 
    @base.parametrize('clone_uri_tmpl,repo_name,username,prefix,expected', [
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', None, '', 'http://vps1:8000/group/repo1'),
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', 'username', '', 'http://username@vps1:8000/group/repo1'),
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', None, '/prefix', 'http://vps1:8000/prefix/group/repo1'),
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', 'user', '/prefix', 'http://user@vps1:8000/prefix/group/repo1'),
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', 'username', '/prefix', 'http://username@vps1:8000/prefix/group/repo1'),
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', 'user', '/prefix/', 'http://user@vps1:8000/prefix/group/repo1'),
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', 'username', '/prefix/', 'http://username@vps1:8000/prefix/group/repo1'),
 
        ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', None, '', 'http://vps1:8000/_23'),
 
        ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', 'username', '', 'http://username@vps1:8000/_23'),
 
        ('http://{user}@{netloc}/_{repoid}', 'group/repo1', 'username', '', 'http://username@vps1:8000/_23'),
 
        ('http://{netloc}/_{repoid}', 'group/repo1', 'username', '', 'http://vps1:8000/_23'),
 
        ('https://{user}@proxy1.example.com/{repo}', 'group/repo1', 'username', '', 'https://username@proxy1.example.com/group/repo1'),
 
        ('https://{user}@proxy1.example.com/{repo}', 'group/repo1', None, '', 'https://proxy1.example.com/group/repo1'),
 
        ('https://proxy1.example.com/{user}/{repo}', 'group/repo1', 'username', '', 'https://proxy1.example.com/username/group/repo1'),
 
    ])
 
    def test_clone_url_generator(self, clone_uri_tmpl, repo_name, username, prefix, expected):
 
        clone_url = get_clone_url(clone_uri_tmpl=clone_uri_tmpl, prefix_url='http://vps1:8000' + prefix,
 
                                  repo_name=repo_name, repo_id=23, username=username)
 
        assert clone_url == expected
 

	
 
    def _quick_url(self, text, tmpl="""<a class="changeset_hash" href="%s">%s</a>""", url_=None):
 
        """
 
        Changes `some text url[foo]` => `some text <a href="/">foo</a>
 

	
 
        :param text:
 
        """
 
        # quickly change expected url[] into a link
 
        url_pattern = re.compile(r'(?:url\[)(.+?)(?:\])')
 

	
 
        def url_func(match_obj):
 
            _url = match_obj.groups()[0]
 
            return tmpl % (url_ or '/repo_name/changeset/%s' % _url, _url)
 
        return url_pattern.sub(url_func, text)
 

	
 
    @base.parametrize('sample,expected', [
 
      ("",
 
       ""),
 
      ("git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68",
 
       """git-svn-id: <a href="https://svn.apache.org/repos/asf/libcloud/trunk@1441655">https://svn.apache.org/repos/asf/libcloud/trunk@1441655</a> 13f79535-47bb-0310-9956-ffa450edef68"""),
 
      ("from rev 000000000000",
 
       """from rev url[000000000000]"""),
 
      ("from rev 000000000000123123 also rev 000000000000",
 
       """from rev url[000000000000123123] also rev url[000000000000]"""),
 
      ("this should-000 00",
 
       """this should-000 00"""),
 
      ("longtextffffffffff rev 123123123123",
 
       """longtextffffffffff rev url[123123123123]"""),
 
      ("rev ffffffffffffffffffffffffffffffffffffffffffffffffff",
 
       """rev ffffffffffffffffffffffffffffffffffffffffffffffffff"""),
 
      ("ffffffffffff some text traalaa",
 
       """url[ffffffffffff] some text traalaa"""),
 
       ("""Multi line
 
       123123123123
 
       some text 123123123123
 
       sometimes !
 
       """,
 
       """Multi line<br/>"""
 
       """       url[123123123123]<br/>"""
 
       """       some text url[123123123123]<br/>"""
 
       """       sometimes !"""),
 
    ])
 
    def test_urlify_text(self, sample, expected):
 
        expected = self._quick_url(expected)
 

	
 
        with mock.patch('kallithea.lib.webutils.UrlGenerator.__call__',
 
            lambda self, name, **kwargs: dict(changeset_home='/%(repo_name)s/changeset/%(revision)s')[name] % kwargs,
 
        ):
 
            assert webutils.urlify_text(sample, 'repo_name') == expected
 

	
 
    @base.parametrize('sample,expected,url_', [
 
      ("",
 
       "",
 
       ""),
 
      ("https://svn.apache.org/repos",
 
       """url[https://svn.apache.org/repos]""",
 
       "https://svn.apache.org/repos"),
 
      ("http://svn.apache.org/repos",
 
       """url[http://svn.apache.org/repos]""",
 
       "http://svn.apache.org/repos"),
 
      ("from rev a also rev http://google.com",
 
       """from rev a also rev url[http://google.com]""",
 
       "http://google.com"),
 
      ("http://imgur.com/foo.gif inline http://imgur.com/foo.gif ending http://imgur.com/foo.gif",
 
       """url[http://imgur.com/foo.gif] inline url[http://imgur.com/foo.gif] ending url[http://imgur.com/foo.gif]""",
 
       "http://imgur.com/foo.gif"),
 
      ("""Multi line
 
       https://foo.bar.example.com
 
       some text lalala""",
 
       """Multi line<br/>"""
 
       """       url[https://foo.bar.example.com]<br/>"""
 
       """       some text lalala""",
 
       "https://foo.bar.example.com"),
 
      ("@mention @someone",
 
       """<b>@mention</b> <b>@someone</b>""",
 
       ""),
 
      ("deadbeefcafe 123412341234",
 
       """<a class="changeset_hash" href="/repo_name/changeset/deadbeefcafe">deadbeefcafe</a> <a class="changeset_hash" href="/repo_name/changeset/123412341234">123412341234</a>""",
 
       ""),
 
      ("We support * markup for *bold* markup of *single or multiple* words, "
 
       "*a bit @like http://slack.com*. "
 
       "The first * must come after whitespace and not be followed by whitespace, "
 
       "contain anything but * and newline until the next *, "
 
       "which must not come after whitespace "
 
       "and not be followed by * or alphanumerical *characters*.",
 
       """We support * markup for <b>*bold*</b> markup of <b>*single or multiple*</b> words, """
 
       """<b>*a bit <b>@like</b> <a href="http://slack.com">http://slack.com</a>*</b>. """
 
       """The first * must come after whitespace and not be followed by whitespace, """
 
       """contain anything but * and newline until the next *, """
 
       """which must not come after whitespace """
 
       """and not be followed by * or alphanumerical <b>*characters*</b>.""",
 
       "-"),
 
      ("HTML escaping: <abc> 'single' \"double\" &pointer",
 
       "HTML escaping: &lt;abc&gt; &#39;single&#39; &quot;double&quot; &amp;pointer",
 
       "-"),
 
      # tags are covered by test_tag_extractor
 
    ])
 
    def test_urlify_test(self, sample, expected, url_):
 
        expected = self._quick_url(expected,
 
                                   tmpl="""<a href="%s">%s</a>""", url_=url_)
 
        with mock.patch('kallithea.lib.webutils.UrlGenerator.__call__',
 
            lambda self, name, **kwargs: dict(changeset_home='/%(repo_name)s/changeset/%(revision)s')[name] % kwargs,
 
        ):
 
            assert webutils.urlify_text(sample, 'repo_name', stylize=True) == expected
 

	
 
    @base.parametrize('sample,expected', [
 
      ("deadbeefcafe @mention, and http://foo.bar/ yo",
 
       """<a class="changeset_hash" href="/repo_name/changeset/deadbeefcafe">deadbeefcafe</a>"""
 
       """<a class="message-link" href="#the-link"> <b>@mention</b>, and </a>"""
 
       """<a href="http://foo.bar/">http://foo.bar/</a>"""
 
       """<a class="message-link" href="#the-link"> yo</a>"""),
 
    ])
 
    def test_urlify_link(self, sample, expected):
 
        with mock.patch('kallithea.lib.webutils.UrlGenerator.__call__',
 
            lambda self, name, **kwargs: dict(changeset_home='/%(repo_name)s/changeset/%(revision)s')[name] % kwargs,
 
        ):
 
            assert webutils.urlify_text(sample, 'repo_name', link_='#the-link') == expected
 

	
 
    @base.parametrize('issue_pat,issue_server,issue_sub,sample,expected', [
 
        (r'#(\d+)', 'http://foo/{repo}/issue/\\1', '#\\1',
 
            'issue #123 and issue#456',
 
            """issue <a class="issue-tracker-link" href="http://foo/repo_name/issue/123">#123</a> and """
 
            """issue<a class="issue-tracker-link" href="http://foo/repo_name/issue/456">#456</a>"""),
 
        (r'(?:\s*#)(\d+)', 'http://foo/{repo}/issue/\\1', '#\\1',
 
            'issue #123 and issue#456',
 
            """issue<a class="issue-tracker-link" href="http://foo/repo_name/issue/123">#123</a> and """
 
            """issue<a class="issue-tracker-link" href="http://foo/repo_name/issue/456">#456</a>"""),
 
        # to require whitespace before the issue reference, one may be tempted to use \b...
 
        (r'\bPR(\d+)', 'http://foo/{repo}/issue/\\1', '#\\1',
 
            'issue PR123 and issuePR456',
 
            """issue <a class="issue-tracker-link" href="http://foo/repo_name/issue/123">#123</a> and """
 
            """issuePR456"""),
kallithea/tests/vcs/test_git.py
Show inline comments
 
@@ -569,280 +569,281 @@ class TestGitChangeset(object):
 
        assert not node.added
 
        assert not node.changed
 
        assert node.not_changed
 
        assert not node.removed
 

	
 
        # If node has REMOVED state then trying to fetch it would raise
 
        # ChangesetError exception
 
        chset = self.repo.get_changeset(
 
            'fa6600f6848800641328adbf7811fd2372c02ab2')
 
        path = 'vcs/backends/BaseRepository.py'
 
        with pytest.raises(NodeDoesNotExistError):
 
            chset.get_node(path)
 
        # but it would be one of ``removed`` (changeset's attribute)
 
        assert path in [rf.path for rf in chset.removed]
 

	
 
        chset = self.repo.get_changeset(
 
            '54386793436c938cff89326944d4c2702340037d')
 
        changed = ['setup.py', 'tests/test_nodes.py', 'vcs/backends/hg.py',
 
            'vcs/nodes.py']
 
        assert set(changed) == set([f.path for f in chset.changed])
 

	
 
    def test_commit_message_is_str(self):
 
        for cs in self.repo:
 
            assert isinstance(cs.message, str)
 

	
 
    def test_changeset_author_is_str(self):
 
        for cs in self.repo:
 
            assert isinstance(cs.author, str)
 

	
 
    def test_repo_files_content_is_bytes(self):
 
        changeset = self.repo.get_changeset()
 
        for node in changeset.get_node('/'):
 
            if node.is_file():
 
                assert isinstance(node.content, bytes)
 

	
 
    def test_wrong_path(self):
 
        # There is 'setup.py' in the root dir but not there:
 
        path = 'foo/bar/setup.py'
 
        tip = self.repo.get_changeset()
 
        with pytest.raises(VCSError):
 
            tip.get_node(path)
 

	
 
    def test_author_email(self):
 
        assert 'marcin@python-blog.com' == self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3').author_email
 
        assert 'lukasz.balcerzak@python-center.pl' == self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b').author_email
 
        assert '' == self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992').author_email
 

	
 
    def test_author_username(self):
 
        assert 'Marcin Kuzminski' == self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3').author_name
 
        assert 'Lukasz Balcerzak' == self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b').author_name
 
        assert 'marcink none@none' == self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992').author_name
 

	
 

	
 
class TestGitSpecificWithRepo(_BackendTestMixin):
 
    backend_alias = 'git'
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return [
 
            {
 
                'message': 'Initial',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foobar/static/js/admin/base.js', content='base'),
 
                    FileNode('foobar/static/admin', content='admin',
 
                        mode=0o120000), # this is a link
 
                    FileNode('foo', content='foo'),
 
                ],
 
            },
 
            {
 
                'message': 'Second',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 22),
 
                'added': [
 
                    FileNode('foo2', content='foo2'),
 
                ],
 
            },
 
        ]
 

	
 
    def test_paths_slow_traversing(self):
 
        cs = self.repo.get_changeset()
 
        assert cs.get_node('foobar').get_node('static').get_node('js').get_node('admin').get_node('base.js').content == b'base'
 

	
 
    def test_paths_fast_traversing(self):
 
        cs = self.repo.get_changeset()
 
        assert cs.get_node('foobar/static/js/admin/base.js').content == b'base'
 

	
 
    def test_workdir_get_branch(self):
 
        self.repo.run_git_command(['checkout', '-b', 'production'])
 
        # Regression test: one of following would fail if we don't check
 
        # .git/HEAD file
 
        self.repo.run_git_command(['checkout', 'production'])
 
        assert self.repo.workdir.get_branch() == 'production'
 
        self.repo.run_git_command(['checkout', 'master'])
 
        assert self.repo.workdir.get_branch() == 'master'
 

	
 
    def test_get_diff_runs_git_command_with_hashes(self):
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1)
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1)], cwd=self.repo.path)
 

	
 
    def test_get_diff_runs_git_command_with_str_hashes(self):
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(self.repo.EMPTY_CHANGESET, 1)
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['show', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(1)], cwd=self.repo.path)
 

	
 
    def test_get_diff_runs_git_command_with_path_if_its_given(self):
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1, 'foo')
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'], cwd=self.repo.path)
 

	
 
    def test_get_diff_does_not_sanitize_valid_context(self):
 
        almost_overflowed_long_int = 2**31-1
 

	
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1, 'foo', context=almost_overflowed_long_int)
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U' + str(almost_overflowed_long_int), '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'], cwd=self.repo.path)
 

	
 
    def test_get_diff_sanitizes_overflowing_context(self):
 
        overflowed_long_int = 2**31
 
        sanitized_overflowed_long_int = overflowed_long_int-1
 

	
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1, 'foo', context=overflowed_long_int)
 

	
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U' + str(sanitized_overflowed_long_int), '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'], cwd=self.repo.path)
 

	
 
    def test_get_diff_does_not_sanitize_zero_context(self):
 
        zero_context = 0
 

	
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1, 'foo', context=zero_context)
 

	
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U' + str(zero_context), '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'], cwd=self.repo.path)
 

	
 
    def test_get_diff_sanitizes_negative_context(self):
 
        negative_context = -10
 

	
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1, 'foo', context=negative_context)
 

	
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U0', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'], cwd=self.repo.path)
 

	
 

	
 
class TestGitRegression(_BackendTestMixin):
 
    backend_alias = 'git'
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return [
 
            {
 
                'message': 'Initial',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('bot/__init__.py', content='base'),
 
                    FileNode('bot/templates/404.html', content='base'),
 
                    FileNode('bot/templates/500.html', content='base'),
 
                ],
 
            },
 
            {
 
                'message': 'Second',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 22),
 
                'added': [
 
                    FileNode('bot/build/migrations/1.py', content='foo2'),
 
                    FileNode('bot/build/migrations/2.py', content='foo2'),
 
                    FileNode('bot/build/static/templates/f.html', content='foo2'),
 
                    FileNode('bot/build/static/templates/f1.html', content='foo2'),
 
                    FileNode('bot/build/templates/err.html', content='foo2'),
 
                    FileNode('bot/build/templates/err2.html', content='foo2'),
 
                ],
 
            },
 
        ]
 

	
 
    def test_similar_paths(self):
 
        cs = self.repo.get_changeset()
 
        paths = lambda *n: [x.path for x in n]
 
        def paths(*n):
 
            return [x.path for x in n]
 
        assert paths(*cs.get_nodes('bot')) == ['bot/build', 'bot/templates', 'bot/__init__.py']
 
        assert paths(*cs.get_nodes('bot/build')) == ['bot/build/migrations', 'bot/build/static', 'bot/build/templates']
 
        assert paths(*cs.get_nodes('bot/build/static')) == ['bot/build/static/templates']
 
        # this get_nodes below causes troubles !
 
        assert paths(*cs.get_nodes('bot/build/static/templates')) == ['bot/build/static/templates/f.html', 'bot/build/static/templates/f1.html']
 
        assert paths(*cs.get_nodes('bot/build/templates')) == ['bot/build/templates/err.html', 'bot/build/templates/err2.html']
 
        assert paths(*cs.get_nodes('bot/templates/')) == ['bot/templates/404.html', 'bot/templates/500.html']
 

	
 

	
 
class TestGitHooks(object):
 
    """
 
    Tests related to hook functionality of Git repositories.
 
    """
 

	
 
    def setup_method(self):
 
        # For each run we want a fresh repo.
 
        self.repo_directory = get_new_dir("githookrepo")
 
        self.repo = GitRepository(self.repo_directory, create=True)
 

	
 
        # Create a dictionary where keys are hook names, and values are paths to
 
        # them in the non-bare repo. Deduplicates code in tests a bit.
 
        self.pre_receive = os.path.join(self.repo.path, '.git', 'hooks', "pre-receive")
 
        self.post_receive = os.path.join(self.repo.path, '.git', 'hooks', "post-receive")
 
        self.kallithea_hooks = {
 
            "pre-receive": self.pre_receive,
 
            "post-receive": self.post_receive,
 
        }
 

	
 
    def test_hooks_created_if_missing(self):
 
        """
 
        Tests if hooks are installed in repository if they are missing.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            if os.path.exists(hook_path):
 
                os.remove(hook_path)
 

	
 
        ScmModel().install_git_hooks(self.repo)
 

	
 
        assert not os.path.exists(self.pre_receive)
 
        assert os.path.exists(self.post_receive)
 

	
 
    def test_kallithea_hooks_updated(self):
 
        """
 
        Tests if hooks are updated if they are Kallithea hooks already.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path, "w") as f:
 
                f.write("KALLITHEA_HOOK_VER=0.0.0\nJUST_BOGUS")
 

	
 
        ScmModel().install_git_hooks(self.repo)
 

	
 
        assert not os.path.exists(self.pre_receive)
 
        with open(self.post_receive) as f:
 
            assert "JUST_BOGUS" not in f.read()
 

	
 
    def test_custom_hooks_untouched(self):
 
        """
 
        Tests if hooks are left untouched if they are not Kallithea hooks.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path, "w") as f:
 
                f.write("#!/bin/bash\n#CUSTOM_HOOK")
 

	
 
        ScmModel().install_git_hooks(self.repo)
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path) as f:
 
                assert "CUSTOM_HOOK" in f.read()
 

	
 
    def test_custom_hooks_forced_update(self):
 
        """
 
        Tests if hooks are forcefully updated even though they are custom hooks.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path, "w") as f:
 
                f.write("#!/bin/bash\n#CUSTOM_HOOK")
 

	
 
        ScmModel().install_git_hooks(self.repo, force=True)
 

	
 
        with open(self.pre_receive) as f:
 
            assert "KALLITHEA_HOOK_VER" not in f.read()
 
        with open(self.post_receive) as f:
 
            assert "KALLITHEA_HOOK_VER" in f.read()
0 comments (0 inline, 0 general)