# HG changeset patch # User Mads Kiilerich # Date 2018-05-07 11:38:40 # Node ID fa3365c940644a52e7fc842c83dcca4295ce586b # Parent 02e0d2d469bf2f5b7c8c99e85ec6f96e13b00409 repos: introduce low level check of clone URIs to prevent direct file system access to local repos This is already checked in web form validation, but also check at low level to make sure API access enforce the same invariants. This issue was found and reported by Kacper Szurek https://security.szurek.pl/ diff --git a/kallithea/model/repo.py b/kallithea/model/repo.py --- a/kallithea/model/repo.py +++ b/kallithea/model/repo.py @@ -33,7 +33,7 @@ import traceback from datetime import datetime from sqlalchemy.orm import subqueryload -from kallithea.lib.utils import make_ui +from kallithea.lib.utils import make_ui, is_valid_repo_uri from kallithea.lib.vcs.backends import get_backend from kallithea.lib.compat import json from kallithea.lib.utils2 import LazyProperty, safe_str, safe_unicode, \ @@ -335,6 +335,10 @@ class RepoModel(BaseModel): setattr(cur_repo, remove_prefix(k, 'repo_'), kwargs[k]) clone_uri = kwargs.get('clone_uri') if clone_uri is not None and clone_uri != cur_repo.clone_uri_hidden: + # clone_uri is modified - if given a value, check it is valid + if clone_uri != '': + # will raise exception on error + is_valid_repo_uri(cur_repo.repo_type, clone_uri, make_ui('db', clear_session=False)) cur_repo.clone_uri = clone_uri if 'repo_name' in kwargs: @@ -399,6 +403,9 @@ class RepoModel(BaseModel): new_repo.group = repo_group new_repo.description = description or repo_name new_repo.private = private + if clone_uri: + # will raise exception on error + is_valid_repo_uri(repo_type, clone_uri, make_ui('db', clear_session=False)) new_repo.clone_uri = clone_uri new_repo.landing_rev = landing_rev @@ -677,11 +684,7 @@ class RepoModel(BaseModel): Makes repository on filesystem. Operation is group aware, meaning that it will create a repository within a group, and alter the paths accordingly to the group location. - :param repo_name: - :param alias: - :param parent: - :param clone_uri: - :param repo_store_location: + Note: clone_uri is low level and not validated - it might be a file system path used for validated cloning """ from kallithea.lib.utils import is_valid_repo, is_valid_repo_group from kallithea.model.scm import ScmModel diff --git a/kallithea/tests/api/api_base.py b/kallithea/tests/api/api_base.py --- a/kallithea/tests/api/api_base.py +++ b/kallithea/tests/api/api_base.py @@ -278,8 +278,12 @@ class _BaseTestApi(object): self._compare_error(id_, expected, given=response.body) def test_api_pull(self): + # Note: pulling from local repos is a mis-feature - it will bypass access control + # ... but ok, if the path already has been set in the database repo_name = 'test_pull' r = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE) + # hack around that clone_uri can't be set to to a local path + # (as shown by test_api_create_repo_clone_uri_local) r.clone_uri = os.path.join(TESTS_TMP_PATH, self.REPO) Session.add(r) Session.commit() @@ -995,7 +999,8 @@ class _BaseTestApi(object): fixture.destroy_repo(repo_name) def test_api_create_repo_clone_uri_local(self): - # FIXME: cloning from local repo is a mis-feature - it will bypass access control + # cloning from local repo was a mis-feature - it would bypass access control + # TODO: introduce other test coverage of actual remote cloning clone_uri = os.path.join(TESTS_TMP_PATH, self.REPO) repo_name = u'api-repo' id_, params = _build_data(self.apikey, 'create_repo', @@ -1005,12 +1010,8 @@ class _BaseTestApi(object): clone_uri=clone_uri, ) response = api_call(self, params) - expected = { - 'msg': 'Created new repository `%s`' % repo_name, - 'success': True, - 'task': None, - } - self._compare_ok(id_, expected, given=response.body) + expected = "failed to create repository `%s`" % repo_name + self._compare_error(id_, expected, given=response.body) fixture.destroy_repo(repo_name) def test_api_create_repo_and_repo_group(self): @@ -1157,8 +1158,8 @@ class _BaseTestApi(object): ('description', {'description': 'new description'}), ('active', {'active': True}), ('active', {'active': False}), - ('clone_uri', {'clone_uri': 'http://example.com/repo'}), - ('clone_uri', {'clone_uri': '/repo'}), # FIXME: pulling from local repo is a mis-feature - it will bypass access control + ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail + ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a mis-feature - it would bypass access control ('clone_uri', {'clone_uri': None}), ('landing_rev', {'landing_rev': 'branch:master'}), ('enable_statistics', {'enable_statistics': True}), @@ -1181,11 +1182,15 @@ class _BaseTestApi(object): if changing_attr == 'repo_group': repo_name = '/'.join([updates['group'], repo_name]) try: - expected = { - 'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name), - 'repository': repo.get_api_data() - } - self._compare_ok(id_, expected, given=response.body) + if changing_attr == 'clone_uri' and updates['clone_uri']: + expected = u'failed to update repo `%s`' % repo_name + self._compare_error(id_, expected, given=response.body) + else: + expected = { + 'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name), + 'repository': repo.get_api_data() + } + self._compare_ok(id_, expected, given=response.body) finally: fixture.destroy_repo(repo_name) if changing_attr == 'repo_group': @@ -1196,8 +1201,8 @@ class _BaseTestApi(object): ('description', {'description': u'new description'}), ('active', {'active': True}), ('active', {'active': False}), - ('clone_uri', {'clone_uri': 'http://example.com/repo'}), - ('clone_uri', {'clone_uri': '/repo'}), # FIXME: pulling from local repo is a mis-feature - it will bypass access control + ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail + ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a mis-feature - it would bypass access control ('clone_uri', {'clone_uri': None}), ('landing_rev', {'landing_rev': 'branch:master'}), ('enable_statistics', {'enable_statistics': True}), @@ -1222,11 +1227,15 @@ class _BaseTestApi(object): if changing_attr == 'repo_group': repo_name = u'/'.join([updates['group'], repo_name.rsplit('/', 1)[-1]]) try: - expected = { - 'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name), - 'repository': repo.get_api_data() - } - self._compare_ok(id_, expected, given=response.body) + if changing_attr == 'clone_uri' and updates['clone_uri']: + expected = u'failed to update repo `%s`' % repo_name + self._compare_error(id_, expected, given=response.body) + else: + expected = { + 'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name), + 'repository': repo.get_api_data() + } + self._compare_ok(id_, expected, given=response.body) finally: fixture.destroy_repo(repo_name) if changing_attr == 'repo_group':