Changeset - 959e009afcae
[Not reviewed]
stable
0 2 0
Mads Kiilerich - 8 years ago 2018-05-07 00:49:44
mads@kiilerich.com
repos: add missing access control check for repository permission management

This issue was found and reported by
Kacper Szurek
https://security.szurek.pl/
2 files changed with 8 insertions and 6 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/admin/repos.py
Show inline comments
 
@@ -270,203 +270,205 @@ class ReposController(BaseRepoController
 
            defaults = self.__load_data(repo_name)
 
            defaults.update(errors.value)
 
            c.users_array = repo_model.get_users_js()
 
            return htmlfill.render(
 
                render('admin/repos/repo_edit.html'),
 
                defaults=defaults,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8",
 
                force_defaults=False)
 

	
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during update of repository %s') \
 
                    % repo_name, category='error')
 
        return redirect(url('edit_repo', repo_name=changed_name))
 

	
 
    @HasRepoPermissionAllDecorator('repository.admin')
 
    def delete(self, repo_name):
 
        """
 
        DELETE /repos/repo_name: Delete an existing item"""
 
        # Forms posted to this method should contain a hidden field:
 
        #    <input type="hidden" name="_method" value="DELETE" />
 
        # Or using helpers:
 
        #    h.form(url('delete_repo', repo_name=ID),
 
        #           method='delete')
 
        # url('delete_repo', repo_name=ID)
 

	
 
        repo_model = RepoModel()
 
        repo = repo_model.get_by_repo_name(repo_name)
 
        if not repo:
 
            h.not_mapped_error(repo_name)
 
            return redirect(url('repos'))
 
        try:
 
            _forks = repo.forks.count()
 
            handle_forks = None
 
            if _forks and request.POST.get('forks'):
 
                do = request.POST['forks']
 
                if do == 'detach_forks':
 
                    handle_forks = 'detach'
 
                    h.flash(_('Detached %s forks') % _forks, category='success')
 
                elif do == 'delete_forks':
 
                    handle_forks = 'delete'
 
                    h.flash(_('Deleted %s forks') % _forks, category='success')
 
            repo_model.delete(repo, forks=handle_forks)
 
            action_logger(self.authuser, 'admin_deleted_repo',
 
                  repo_name, self.ip_addr, self.sa)
 
            ScmModel().mark_for_invalidation(repo_name)
 
            h.flash(_('Deleted repository %s') % repo_name, category='success')
 
            Session().commit()
 
        except AttachedForksError:
 
            h.flash(_('Cannot delete repository %s which still has forks')
 
                        % repo_name, category='warning')
 

	
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during deletion of %s') % repo_name,
 
                    category='error')
 

	
 
        if repo.group:
 
            return redirect(url('repos_group_home', group_name=repo.group.group_name))
 
        return redirect(url('repos'))
 

	
 
    @HasRepoPermissionAllDecorator('repository.admin')
 
    def edit(self, repo_name):
 
        """GET /repo_name/settings: Form to edit an existing item"""
 
        # url('edit_repo', repo_name=ID)
 
        defaults = self.__load_data(repo_name)
 
        c.repo_fields = RepositoryField.query()\
 
            .filter(RepositoryField.repository == c.repo_info).all()
 
        repo_model = RepoModel()
 
        c.users_array = repo_model.get_users_js()
 
        c.active = 'settings'
 
        return htmlfill.render(
 
            render('admin/repos/repo_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasRepoPermissionAllDecorator('repository.admin')
 
    def edit_permissions(self, repo_name):
 
        """GET /repo_name/settings: Form to edit an existing item"""
 
        # url('edit_repo', repo_name=ID)
 
        c.repo_info = self._load_repo(repo_name)
 
        repo_model = RepoModel()
 
        c.users_array = repo_model.get_users_js()
 
        c.user_groups_array = repo_model.get_user_groups_js()
 
        c.active = 'permissions'
 
        defaults = RepoModel()._get_defaults(repo_name)
 

	
 
        return htmlfill.render(
 
            render('admin/repos/repo_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasRepoPermissionAllDecorator('repository.admin')
 
    def edit_permissions_update(self, repo_name):
 
        form = RepoPermsForm()().to_python(request.POST)
 
        RepoModel()._update_permissions(repo_name, form['perms_new'],
 
                                        form['perms_updates'])
 
        #TODO: implement this
 
        #action_logger(self.authuser, 'admin_changed_repo_permissions',
 
        #              repo_name, self.ip_addr, self.sa)
 
        Session().commit()
 
        h.flash(_('Repository permissions updated'), category='success')
 
        return redirect(url('edit_repo_perms', repo_name=repo_name))
 

	
 
    @HasRepoPermissionAllDecorator('repository.admin')
 
    def edit_permissions_revoke(self, repo_name):
 
        try:
 
            obj_type = request.POST.get('obj_type')
 
            obj_id = None
 
            if obj_type == 'user':
 
                obj_id = safe_int(request.POST.get('user_id'))
 
            elif obj_type == 'user_group':
 
                obj_id = safe_int(request.POST.get('user_group_id'))
 

	
 
            if obj_type == 'user':
 
                RepoModel().revoke_user_permission(repo=repo_name, user=obj_id)
 
            elif obj_type == 'user_group':
 
                RepoModel().revoke_user_group_permission(
 
                    repo=repo_name, group_name=obj_id
 
                )
 
            #TODO: implement this
 
            #action_logger(self.authuser, 'admin_revoked_repo_permissions',
 
            #              repo_name, self.ip_addr, self.sa)
 
            Session().commit()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during revoking of permission'),
 
                    category='error')
 
            raise HTTPInternalServerError()
 

	
 
    @HasRepoPermissionAllDecorator('repository.admin')
 
    def edit_fields(self, repo_name):
 
        """GET /repo_name/settings: Form to edit an existing item"""
 
        # url('edit_repo', repo_name=ID)
 
        c.repo_info = self._load_repo(repo_name)
 
        c.repo_fields = RepositoryField.query()\
 
            .filter(RepositoryField.repository == c.repo_info).all()
 
        c.active = 'fields'
 
        if request.POST:
 

	
 
            return redirect(url('repo_edit_fields'))
 
        return render('admin/repos/repo_edit.html')
 

	
 
    @HasRepoPermissionAllDecorator('repository.admin')
 
    def create_repo_field(self, repo_name):
 
        try:
 
            form_result = RepoFieldForm()().to_python(dict(request.POST))
 
            new_field = RepositoryField()
 
            new_field.repository = Repository.get_by_repo_name(repo_name)
 
            new_field.field_key = form_result['new_field_key']
 
            new_field.field_type = form_result['new_field_type']  # python type
 
            new_field.field_value = form_result['new_field_value']  # set initial blank value
 
            new_field.field_desc = form_result['new_field_desc']
 
            new_field.field_label = form_result['new_field_label']
 
            Session().add(new_field)
 
            Session().commit()
 
        except Exception as e:
 
            log.error(traceback.format_exc())
 
            msg = _('An error occurred during creation of field')
 
            if isinstance(e, formencode.Invalid):
 
                msg += ". " + e.msg
 
            h.flash(msg, category='error')
 
        return redirect(url('edit_repo_fields', repo_name=repo_name))
 

	
 
    @HasRepoPermissionAllDecorator('repository.admin')
 
    def delete_repo_field(self, repo_name, field_id):
 
        field = RepositoryField.get_or_404(field_id)
 
        try:
 
            Session().delete(field)
 
            Session().commit()
 
        except Exception as e:
 
            log.error(traceback.format_exc())
 
            msg = _('An error occurred during removal of field')
 
            h.flash(msg, category='error')
 
        return redirect(url('edit_repo_fields', repo_name=repo_name))
 

	
 
    @HasRepoPermissionAllDecorator('repository.admin')
 
    def edit_advanced(self, repo_name):
 
        """GET /repo_name/settings: Form to edit an existing item"""
 
        # url('edit_repo', repo_name=ID)
 
        c.repo_info = self._load_repo(repo_name)
 
        c.default_user_id = User.get_default_user().user_id
 
        c.in_public_journal = UserFollowing.query()\
 
            .filter(UserFollowing.user_id == c.default_user_id)\
 
            .filter(UserFollowing.follows_repository == c.repo_info).scalar()
 

	
 
        _repos = Repository.query().order_by(Repository.repo_name).all()
 
        read_access_repos = RepoList(_repos)
 
        c.repos_list = [(None, _('-- Not a fork --'))]
 
        c.repos_list += [(x.repo_id, x.repo_name)
 
                         for x in read_access_repos
 
                         if x.repo_id != c.repo_info.repo_id]
 

	
 
        defaults = {
 
            'id_fork_of': c.repo_info.fork.repo_id if c.repo_info.fork else ''
 
        }
 

	
 
        c.active = 'advanced'
 
        if request.POST:
 
            return redirect(url('repo_edit_advanced'))
 
        return htmlfill.render(
kallithea/tests/functional/test_admin_permissions.py
Show inline comments
 
from kallithea.model.db import User, UserIpMap
 
from kallithea.tests import *
 

	
 
class TestAdminPermissionsController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_permissions'))
 
        # Test response...
 

	
 
    def test_index_ips(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_permissions_ips'))
 
        # Test response...
 
        response.mustcontain('All IP addresses are allowed')
 

	
 
    def test_add_ips(self):
 
        self.log_user()
 
        default_user_id = User.get_default_user().user_id
 
        response = self.app.put(url('edit_user_ips', id=default_user_id),
 
                                 params=dict(new_ip='127.0.0.0/24',
 
                                 _authentication_token=self.authentication_token()))
 

	
 
        response = self.app.get(url('admin_permissions_ips'))
 
        response.mustcontain('127.0.0.0/24')
 
        response.mustcontain('127.0.0.0 - 127.0.0.255')
 

	
 
        ## delete
 
        default_user_id = User.get_default_user().user_id
 
        del_ip_id = UserIpMap.query().filter(UserIpMap.user_id ==
 
                                             default_user_id).first().ip_id
 

	
 
        response = self.app.post(url('edit_user_ips', id=default_user_id),
 
                                 params=dict(_method='delete',
 
                                             del_ip_id=del_ip_id,
 
                                             _authentication_token=self.authentication_token()))
 

	
 
        response = self.app.get(url('admin_permissions_ips'))
 
        response.mustcontain('All IP addresses are allowed')
 
        response.mustcontain(no=['127.0.0.0/24'])
 
        response.mustcontain(no=['127.0.0.0 - 127.0.0.255'])
 

	
 

	
 
    def test_index_overview(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_permissions_perms'))
 
        # Test response...
 

	
 
    def test_edit_permissions_permissions(self):
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 

	
 
        # Test unauthenticated access
 
        # FIXME: access without authentication
 
        # Test unauthenticated access - it will redirect to login page
 
        response = self.app.post(
 
            url('edit_repo_perms_update', repo_name=HG_REPO),
 
            params=dict(
 
                _method='put',
 
                perm_new_member_1='repository.read',
 
                perm_new_member_name_1=user.username,
 
                perm_new_member_type_1='user',
 
                _authentication_token=self.authentication_token()),
 
            status=302)
 

	
 
        assert response.location.endswith(url('edit_repo_perms_update', repo_name=HG_REPO))
 
        assert not response.location.endswith(url('edit_repo_perms_update', repo_name=HG_REPO))
 
        assert response.location.endswith(url('login_home', came_from=url('edit_repo_perms_update', repo_name=HG_REPO)))
 

	
 
        # FIXME: access without authentication
 
        response = self.app.post(
 
            url('edit_repo_perms_revoke', repo_name=HG_REPO),
 
            params=dict(
 
                _method='delete',
 
                obj_type='user',
 
                user_id=user.user_id,
 
                _authentication_token=self.authentication_token()),
 
            status=200) # success has no content
 
        assert not response.body
 
            status=302)
 

	
 
        assert response.location.endswith(url('login_home', came_from=url('edit_repo_perms_update', repo_name=HG_REPO)))
 

	
 
        # Test authenticated access
 
        self.log_user()
 

	
 
        response = self.app.post(
 
            url('edit_repo_perms_update', repo_name=HG_REPO),
 
            params=dict(
 
                _method='put',
 
                perm_new_member_1='repository.read',
 
                perm_new_member_name_1=user.username,
 
                perm_new_member_type_1='user',
 
                _authentication_token=self.authentication_token()),
 
            status=302)
 

	
 
        assert response.location.endswith(url('edit_repo_perms_update', repo_name=HG_REPO))
 

	
 
        response = self.app.post(
 
            url('edit_repo_perms_revoke', repo_name=HG_REPO),
 
            params=dict(
 
                _method='delete',
 
                obj_type='user',
 
                user_id=user.user_id,
 
                _authentication_token=self.authentication_token()),
 
            status=200) # success has no content
 
        assert not response.body
0 comments (0 inline, 0 general)