Changeset - 57a733313e4f
[Not reviewed]
default
0 3 0
Mads Kiilerich - 7 years ago 2018-05-29 12:25:59
mads@kiilerich.com
repos: introduce low level slug check of repo and group names

The high level web forms already slug-ify repo and repo group names. It might
thus not create the exact repo that was created, but the name will be "safe".

For API, we would rather have it fail than not doing exactly what was requested.

Thus, always verify at low level that the provided name wouldn't be modified by
slugification. This makes sure the API provide allow the same actual names as
the web UI.

This will only influence creation and renaming of repositories and repo groups.
Existing repositories will continue working as before.

This is a slight API change, but it makes the system more stable and can
prevent some security issues - especially XSS attacks.

This issue was found and reported by
Kacper Szurek
https://security.szurek.pl/
3 files changed with 16 insertions and 11 deletions:
0 comments (0 inline, 0 general)
kallithea/model/repo.py
Show inline comments
 
@@ -30,12 +30,13 @@ import os
 
import shutil
 
import logging
 
import traceback
 
from datetime import datetime
 
from sqlalchemy.orm import subqueryload
 

	
 
import kallithea.lib.utils
 
from kallithea.lib.utils import make_ui, is_valid_repo_uri
 
from kallithea.lib.vcs.backends import get_backend
 
from kallithea.lib.utils2 import LazyProperty, safe_str, safe_unicode, \
 
    remove_prefix, obfuscate_url_pw, get_current_authuser
 
from kallithea.lib.caching_query import FromCache
 
from kallithea.lib.hooks import log_delete_repository
 
@@ -311,13 +312,16 @@ class RepoModel(object):
 
                if clone_uri != '':
 
                    # will raise exception on error
 
                    is_valid_repo_uri(cur_repo.repo_type, clone_uri, make_ui('db', clear_session=False))
 
                cur_repo.clone_uri = clone_uri
 

	
 
            if 'repo_name' in kwargs:
 
                cur_repo.repo_name = cur_repo.get_new_name(kwargs['repo_name'])
 
                repo_name = kwargs['repo_name']
 
                if kallithea.lib.utils.repo_name_slug(repo_name) != repo_name:
 
                    raise Exception('invalid repo name %s' % repo_name)
 
                cur_repo.repo_name = cur_repo.get_new_name(repo_name)
 

	
 
            # if private flag is set, reset default permission to NONE
 
            if kwargs.get('repo_private'):
 
                EMPTY_PERM = 'repository.none'
 
                RepoModel().grant_user_permission(
 
                    repo=cur_repo, user='default', perm=EMPTY_PERM
 
@@ -360,12 +364,14 @@ class RepoModel(object):
 
            description = safe_unicode(description)
 
            # repo name is just a name of repository
 
            # while repo_name_full is a full qualified name that is combined
 
            # with name and path of group
 
            repo_name_full = repo_name
 
            repo_name = repo_name.split(self.URL_SEPARATOR)[-1]
 
            if kallithea.lib.utils.repo_name_slug(repo_name) != repo_name:
 
                raise Exception('invalid repo name %s' % repo_name)
 

	
 
            new_repo = Repository()
 
            new_repo.repo_state = state
 
            new_repo.enable_statistics = False
 
            new_repo.repo_name = repo_name_full
 
            new_repo.repo_type = repo_type
kallithea/model/repo_group.py
Show inline comments
 
@@ -29,12 +29,13 @@ Original author and date, and relevant c
 
import os
 
import logging
 
import traceback
 
import shutil
 
import datetime
 

	
 
import kallithea.lib.utils
 
from kallithea.lib.utils2 import LazyProperty
 

	
 
from kallithea.model.db import RepoGroup, Session, Ui, UserRepoGroupToPerm, \
 
    User, Permission, UserGroupRepoGroupToPerm, UserGroup, Repository
 

	
 
log = logging.getLogger(__name__)
 
@@ -132,12 +133,15 @@ class RepoGroupModel(object):
 
                                          group.name)
 
                shutil.move(rm_path, os.path.join(self.repos_path, _d))
 

	
 
    def create(self, group_name, group_description, owner, parent=None,
 
               just_db=False, copy_permissions=False):
 
        try:
 
            if kallithea.lib.utils.repo_name_slug(group_name) != group_name:
 
                raise Exception('invalid repo group name %s' % group_name)
 

	
 
            owner = User.guess_instance(owner)
 
            parent_group = RepoGroup.guess_instance(parent)
 
            new_repo_group = RepoGroup()
 
            new_repo_group.owner = owner
 
            new_repo_group.group_description = group_description or group_name
 
            new_repo_group.parent_group = parent_group
 
@@ -287,13 +291,16 @@ class RepoGroupModel(object):
 
                repo_group.enable_locking = kwargs['enable_locking']
 

	
 
            if 'parent_group_id' in kwargs:
 
                assert kwargs['parent_group_id'] != u'-1', kwargs # RepoGroupForm should have converted to None
 
                repo_group.parent_group = RepoGroup.get(kwargs['parent_group_id'])
 
            if 'group_name' in kwargs:
 
                repo_group.group_name = repo_group.get_new_name(kwargs['group_name'])
 
                group_name = kwargs['group_name']
 
                if kallithea.lib.utils.repo_name_slug(group_name) != group_name:
 
                    raise Exception('invalid repo group name %s' % group_name)
 
                repo_group.group_name = repo_group.get_new_name(group_name)
 
            new_path = repo_group.full_path
 
            Session().add(repo_group)
 

	
 
            # iterate over all members of this groups and do fixes
 
            # set locking if given
 
            # if obj is a repoGroup also fix the name of the group according
kallithea/tests/api/api_base.py
Show inline comments
 
@@ -1060,20 +1060,12 @@ class _BaseTestApi(object):
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 
        if repo_name == '/':
 
            expected = "repo group `` not found"
 
            self._compare_error(id_, expected, given=response.body)
 
        elif repo_name in [':', '<test>']:
 
            # FIXME: special characters and XSS injection should not be allowed
 
            expected = {
 
                'msg': 'Created new repository `%s`' % repo_name,
 
                'success': True,
 
                'task': None,
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        else:
 
            expected = "failed to create repository `%s`" % repo_name
 
            self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_clone_uri_local(self):
 
@@ -1131,13 +1123,13 @@ class _BaseTestApi(object):
 
        repo_group_basename = u'api-repo-repo'
 
        repo_group_name = u'%s/%s' % (TEST_REPO_GROUP, repo_group_basename)
 
        repo_name = u'%s/api-repo' % repo_group_name
 

	
 
        top_group = RepoGroup.get_by_group_name(TEST_REPO_GROUP)
 
        assert top_group
 
        rg = fixture.create_repo_group(repo_group_basename, group_parent_id=top_group)
 
        rg = fixture.create_repo_group(repo_group_basename, parent_group_id=top_group)
 
        Session().commit()
 
        RepoGroupModel().grant_user_permission(repo_group_name,
 
                                               self.TEST_USER_LOGIN,
 
                                               'group.none')
 
        Session().commit()
 

	
0 comments (0 inline, 0 general)