Changeset - 81db5704b285
[Not reviewed]
stable
0 6 0
Thomas De Schampheleire - 7 years ago 2019-01-26 20:00:14
thomas.de_schampheleire@nokia.com
cleanup: remove unnecessary (and potentially problematic) use of 'literal'

webhelpers.html.literal (kallithea.lib.helpers.literal) is only needed when
the passed string may contain HTML that needs to be interpreted literally.
It is unnecessary for plain strings.

Incorrect usage of literal can lead to XSS issues, via a malicious user
controlling data which will be rendered in other users' browsers. The data
could either be stored previously in the system or be part of a forged URL
the victim clicks on.

For example, when a user browses to a forged URL where a repository
changeset or branch name contains a javascript snippet, the snippet
was executed when printed on the page using 'literal'.

Remaining uses of 'literal' have been reviewed with no apparent problems
found.

Reported by Bob Hogg <wombat@rwhogg.site> (thanks!).
6 files changed with 11 insertions and 14 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/changelog.py
Show inline comments
 
@@ -38,98 +38,97 @@ from kallithea.lib.auth import LoginRequ
 
from kallithea.lib.base import BaseRepoController, render
 
from kallithea.lib.helpers import RepoPage
 
from kallithea.lib.compat import json
 
from kallithea.lib.graphmod import graph_data
 
from kallithea.lib.vcs.exceptions import RepositoryError, ChangesetDoesNotExistError,\
 
    ChangesetError, NodeDoesNotExistError, EmptyRepositoryError
 
from kallithea.lib.utils2 import safe_int, safe_str
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def _load_changelog_summary():
 
    p = safe_int(request.GET.get('page'), 1)
 
    size = safe_int(request.GET.get('size'), 10)
 

	
 
    def url_generator(**kw):
 
        return url('changelog_summary_home',
 
                   repo_name=c.db_repo.repo_name, size=size, **kw)
 

	
 
    collection = c.db_repo_scm_instance
 

	
 
    c.repo_changesets = RepoPage(collection, page=p,
 
                                 items_per_page=size,
 
                                 url=url_generator)
 
    page_revisions = [x.raw_id for x in list(c.repo_changesets)]
 
    c.comments = c.db_repo.get_comments(page_revisions)
 
    c.statuses = c.db_repo.statuses(page_revisions)
 

	
 

	
 
class ChangelogController(BaseRepoController):
 

	
 
    def __before__(self):
 
        super(ChangelogController, self).__before__()
 
        c.affected_files_cut_off = 60
 

	
 
    @staticmethod
 
    def __get_cs(rev, repo):
 
        """
 
        Safe way to get changeset. If error occur fail with error message.
 

	
 
        :param rev: revision to fetch
 
        :param repo: repo instance
 
        """
 

	
 
        try:
 
            return c.db_repo_scm_instance.get_changeset(rev)
 
        except EmptyRepositoryError as e:
 
            h.flash(h.literal(_('There are no changesets yet')),
 
                    category='error')
 
            h.flash(_('There are no changesets yet'), category='error')
 
        except RepositoryError as e:
 
            log.error(traceback.format_exc())
 
            h.flash(safe_str(e), category='error')
 
        raise HTTPBadRequest()
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionAnyDecorator('repository.read', 'repository.write',
 
                                   'repository.admin')
 
    def index(self, repo_name, revision=None, f_path=None):
 
        # Fix URL after page size form submission via GET
 
        # TODO: Somehow just don't send this extra junk in the GET URL
 
        if request.GET.get('set'):
 
            request.GET.pop('set', None)
 
            if revision is None:
 
                return redirect(url('changelog_home', repo_name=repo_name, **request.GET))
 
            return redirect(url('changelog_file_home', repo_name=repo_name, revision=revision, f_path=f_path, **request.GET))
 

	
 
        limit = 2000
 
        default = 100
 
        if request.GET.get('size'):
 
            c.size = max(min(safe_int(request.GET.get('size')), limit), 1)
 
            session['changelog_size'] = c.size
 
            session.save()
 
        else:
 
            c.size = int(session.get('changelog_size', default))
 
        # min size must be 1
 
        c.size = max(c.size, 1)
 
        p = safe_int(request.GET.get('page', 1), 1)
 
        branch_name = request.GET.get('branch', None)
 
        if (branch_name and
 
            branch_name not in c.db_repo_scm_instance.branches and
 
            branch_name not in c.db_repo_scm_instance.closed_branches and
 
            not revision):
 
            return redirect(url('changelog_file_home', repo_name=c.repo_name,
 
                                    revision=branch_name, f_path=f_path or ''))
 

	
 
        if revision == 'tip':
 
            revision = None
 

	
 
        c.changelog_for_path = f_path
 
        try:
 

	
 
            if f_path:
 
                log.debug('generating changelog for path %s', f_path)
 
                # get the history for the file !
 
                tip_cs = c.db_repo_scm_instance.get_changeset()
 
                try:
 
                    collection = tip_cs.get_file_history(f_path)
kallithea/controllers/pullrequests.py
Show inline comments
 
@@ -190,98 +190,97 @@ class PullrequestsController(BaseRepoCon
 
    @LoginRequired()
 
    @HasRepoPermissionAnyDecorator('repository.read', 'repository.write',
 
                                   'repository.admin')
 
    def show_all(self, repo_name):
 
        c.from_ = request.GET.get('from_') or ''
 
        c.closed = request.GET.get('closed') or ''
 
        c.pull_requests = PullRequestModel().get_all(repo_name, from_=c.from_, closed=c.closed)
 
        c.repo_name = repo_name
 
        p = safe_int(request.GET.get('page', 1), 1)
 

	
 
        c.pullrequests_pager = Page(c.pull_requests, page=p, items_per_page=100)
 

	
 
        return render('/pullrequests/pullrequest_show_all.html')
 

	
 
    @LoginRequired()
 
    @NotAnonymous()
 
    def show_my(self):
 
        c.closed = request.GET.get('closed') or ''
 

	
 
        def _filter(pr):
 
            s = sorted(pr, key=lambda o: o.created_on, reverse=True)
 
            if not c.closed:
 
                s = filter(lambda p: p.status != PullRequest.STATUS_CLOSED, s)
 
            return s
 

	
 
        c.my_pull_requests = _filter(PullRequest.query()\
 
                                .filter(PullRequest.user_id ==
 
                                        self.authuser.user_id)\
 
                                .all())
 

	
 
        c.participate_in_pull_requests = _filter(PullRequest.query()\
 
                                .join(PullRequestReviewers)\
 
                                .filter(PullRequestReviewers.user_id ==
 
                                        self.authuser.user_id)\
 
                                                 )
 

	
 
        return render('/pullrequests/pullrequest_show_my.html')
 

	
 
    @LoginRequired()
 
    @NotAnonymous()
 
    @HasRepoPermissionAnyDecorator('repository.read', 'repository.write',
 
                                   'repository.admin')
 
    def index(self):
 
        org_repo = c.db_repo
 
        org_scm_instance = org_repo.scm_instance
 
        try:
 
            org_scm_instance.get_changeset()
 
        except EmptyRepositoryError as e:
 
            h.flash(h.literal(_('There are no changesets yet')),
 
                    category='warning')
 
            h.flash(_('There are no changesets yet'), category='warning')
 
            redirect(url('summary_home', repo_name=org_repo.repo_name))
 

	
 
        org_rev = request.GET.get('rev_end')
 
        # rev_start is not directly useful - its parent could however be used
 
        # as default for other and thus give a simple compare view
 
        rev_start = request.GET.get('rev_start')
 
        other_rev = None
 
        if rev_start:
 
            starters = org_repo.get_changeset(rev_start).parents
 
            if starters:
 
                other_rev = starters[0].raw_id
 
            else:
 
                other_rev = org_repo.scm_instance.EMPTY_CHANGESET
 
        branch = request.GET.get('branch')
 

	
 
        c.cs_repos = [(org_repo.repo_name, org_repo.repo_name)]
 
        c.default_cs_repo = org_repo.repo_name
 
        c.cs_refs, c.default_cs_ref = self._get_repo_refs(org_scm_instance, rev=org_rev, branch=branch)
 

	
 
        default_cs_ref_type, default_cs_branch, default_cs_rev = c.default_cs_ref.split(':')
 
        if default_cs_ref_type != 'branch':
 
            default_cs_branch = org_repo.get_changeset(default_cs_rev).branch
 

	
 
        # add org repo to other so we can open pull request against peer branches on itself
 
        c.a_repos = [(org_repo.repo_name, '%s (self)' % org_repo.repo_name)]
 

	
 
        if org_repo.parent:
 
            # add parent of this fork also and select it.
 
            # use the same branch on destination as on source, if available.
 
            c.a_repos.append((org_repo.parent.repo_name, '%s (parent)' % org_repo.parent.repo_name))
 
            c.a_repo = org_repo.parent
 
            c.a_refs, c.default_a_ref = self._get_repo_refs(
 
                    org_repo.parent.scm_instance, branch=default_cs_branch, rev=other_rev)
 

	
 
        else:
 
            c.a_repo = org_repo
 
            c.a_refs, c.default_a_ref = self._get_repo_refs(org_scm_instance, rev=other_rev)
 

	
 
        # gather forks and add to this list ... even though it is rare to
 
        # request forks to pull from their parent
 
        for fork in org_repo.forks:
 
            c.a_repos.append((fork.repo_name, fork.repo_name))
 

	
 
        return render('/pullrequests/pullrequest.html')
 

	
 
    @LoginRequired()
 
    @NotAnonymous()
 
    @HasRepoPermissionAnyDecorator('repository.read', 'repository.write',
kallithea/lib/auth.py
Show inline comments
 
@@ -674,97 +674,97 @@ class AuthUser(object):
 
                    _set.add(ip.ip_addr)
 
                except ObjectDeletedError:
 
                    # since we use heavy caching sometimes it happens that we get
 
                    # deleted objects here, we just skip them
 
                    pass
 

	
 
        user_ips = UserIpMap.query().filter(UserIpMap.user_id == user_id)
 
        if cache:
 
            user_ips = user_ips.options(FromCache("sql_cache_short",
 
                                                  "get_user_ips_%s" % user_id))
 

	
 
        for ip in user_ips:
 
            try:
 
                _set.add(ip.ip_addr)
 
            except ObjectDeletedError:
 
                # since we use heavy caching sometimes it happens that we get
 
                # deleted objects here, we just skip them
 
                pass
 
        return _set or set(['0.0.0.0/0', '::/0'])
 

	
 

	
 
def set_available_permissions(config):
 
    """
 
    This function will propagate pylons globals with all available defined
 
    permission given in db. We don't want to check each time from db for new
 
    permissions since adding a new permission also requires application restart
 
    ie. to decorate new views with the newly created permission
 

	
 
    :param config: current pylons config instance
 

	
 
    """
 
    log.info('getting information about all available permissions')
 
    try:
 
        sa = meta.Session
 
        all_perms = sa.query(Permission).all()
 
        config['available_permissions'] = [x.permission_name for x in all_perms]
 
    finally:
 
        meta.Session.remove()
 

	
 

	
 
#==============================================================================
 
# CHECK DECORATORS
 
#==============================================================================
 

	
 
def redirect_to_login(message=None):
 
    from kallithea.lib import helpers as h
 
    p = request.path_qs
 
    if message:
 
        h.flash(h.literal(message), category='warning')
 
        h.flash(message, category='warning')
 
    log.debug('Redirecting to login page, origin: %s', p)
 
    return redirect(url('login_home', came_from=p))
 

	
 

	
 
class LoginRequired(object):
 
    """
 
    Must be logged in to execute this function else
 
    redirect to login page
 

	
 
    :param api_access: if enabled this checks only for valid auth token
 
        and grants access based on valid token
 
    """
 

	
 
    def __init__(self, api_access=False):
 
        self.api_access = api_access
 

	
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 

	
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        controller = fargs[0]
 
        user = controller.authuser
 
        loc = "%s:%s" % (controller.__class__.__name__, func.__name__)
 
        log.debug('Checking access for user %s @ %s', user, loc)
 

	
 
        if not AuthUser.check_ip_allowed(user, controller.ip_addr):
 
            return redirect_to_login(_('IP %s not allowed') % controller.ip_addr)
 

	
 
        # check if we used an API key and it's a valid one
 
        api_key = request.GET.get('api_key')
 
        if api_key is not None:
 
            # explicit controller is enabled or API is in our whitelist
 
            if self.api_access or allowed_api_access(loc, api_key=api_key):
 
                if api_key in user.api_keys:
 
                    log.info('user %s authenticated with API key ****%s @ %s',
 
                             user, api_key[-4:], loc)
 
                    return func(*fargs, **fkwargs)
 
                else:
 
                    log.warning('API key ****%s is NOT valid', api_key[-4:])
 
                    return redirect_to_login(_('Invalid API key'))
 
            else:
 
                # controller does not allow API access
 
                log.warning('API access to %s is not allowed', loc)
 
                return abort(403)
 

	
 
        # Only allow the following HTTP request methods. (We sometimes use POST
 
        # requests with a '_method' set to 'PUT' or 'DELETE'; but that is only
 
        # used for the route lookup, and does not affect request.method.)
kallithea/lib/base.py
Show inline comments
 
@@ -442,99 +442,98 @@ class BaseController(WSGIController):
 
            log.info('IP: %s User: %s accessed %s',
 
                self.ip_addr, self.authuser,
 
                safe_unicode(_get_access_path(environ)),
 
            )
 
            return WSGIController.__call__(self, environ, start_response)
 
        finally:
 
            meta.Session.remove()
 

	
 

	
 
class BaseRepoController(BaseController):
 
    """
 
    Base class for controllers responsible for loading all needed data for
 
    repository loaded items are
 

	
 
    c.db_repo_scm_instance: instance of scm repository
 
    c.db_repo: instance of db
 
    c.repository_followers: number of followers
 
    c.repository_forks: number of forks
 
    c.repository_following: weather the current user is following the current repo
 
    """
 

	
 
    def __before__(self):
 
        super(BaseRepoController, self).__before__()
 
        if c.repo_name:  # extracted from routes
 
            _dbr = Repository.get_by_repo_name(c.repo_name)
 
            if not _dbr:
 
                return
 

	
 
            log.debug('Found repository in database %s with state `%s`',
 
                      safe_unicode(_dbr), safe_unicode(_dbr.repo_state))
 
            route = getattr(request.environ.get('routes.route'), 'name', '')
 

	
 
            # allow to delete repos that are somehow damages in filesystem
 
            if route in ['delete_repo']:
 
                return
 

	
 
            if _dbr.repo_state in [Repository.STATE_PENDING]:
 
                if route in ['repo_creating_home']:
 
                    return
 
                check_url = url('repo_creating_home', repo_name=c.repo_name)
 
                return redirect(check_url)
 

	
 
            dbr = c.db_repo = _dbr
 
            c.db_repo_scm_instance = c.db_repo.scm_instance
 
            if c.db_repo_scm_instance is None:
 
                log.error('%s this repository is present in database but it '
 
                          'cannot be created as an scm instance', c.repo_name)
 
                from kallithea.lib import helpers as h
 
                h.flash(h.literal(_('Repository not found in the filesystem')),
 
                h.flash(_('Repository not found in the filesystem'),
 
                        category='error')
 
                raise paste.httpexceptions.HTTPNotFound()
 

	
 
            # some globals counter for menu
 
            c.repository_followers = self.scm_model.get_followers(dbr)
 
            c.repository_forks = self.scm_model.get_forks(dbr)
 
            c.repository_pull_requests = self.scm_model.get_pull_requests(dbr)
 
            c.repository_following = self.scm_model.is_following_repo(
 
                                    c.repo_name, self.authuser.user_id)
 

	
 
    @staticmethod
 
    def _get_ref_rev(repo, ref_type, ref_name, returnempty=False):
 
        """
 
        Safe way to get changeset. If error occurs show error.
 
        """
 
        from kallithea.lib import helpers as h
 
        try:
 
            return repo.scm_instance.get_ref_revision(ref_type, ref_name)
 
        except EmptyRepositoryError as e:
 
            if returnempty:
 
                return repo.scm_instance.EMPTY_CHANGESET
 
            h.flash(h.literal(_('There are no changesets yet')),
 
                    category='error')
 
            h.flash(_('There are no changesets yet'), category='error')
 
            raise webob.exc.HTTPNotFound()
 
        except ChangesetDoesNotExistError as e:
 
            h.flash(h.literal(_('Changeset for %s %s not found in %s') %
 
                              (ref_type, ref_name, repo.repo_name)),
 
            h.flash(_('Changeset for %s %s not found in %s') %
 
                              (ref_type, ref_name, repo.repo_name),
 
                    category='error')
 
            raise webob.exc.HTTPNotFound()
 
        except RepositoryError as e:
 
            log.error(traceback.format_exc())
 
            h.flash(safe_str(e), category='error')
 
            raise webob.exc.HTTPBadRequest()
 

	
 

	
 
class WSGIResultCloseCallback(object):
 
    """Wrap a WSGI result and let close call close after calling the
 
    close method on the result.
 
    """
 
    def __init__(self, result, close):
 
        self._result = result
 
        self._close = close
 

	
 
    def __iter__(self):
 
        return iter(self._result)
 

	
 
    def close(self):
 
        if hasattr(self._result, 'close'):
 
            self._result.close()
 
        self._close()
kallithea/templates/admin/settings/settings_system_update.html
Show inline comments
 
## -*- coding: utf-8 -*-
 
## upgrade block rendered afte on-click check
 

	
 
<div class="alert ${'alert-warning' if c.should_upgrade else 'alert-success'}">
 
<p style="padding: 2px 0px 5px 0px; margin: 0px">
 

	
 
%if c.should_upgrade:
 
    A <b>new version</b> is available:
 
    %if c.latest_data.get('title'):
 
        <b>${h.literal(c.latest_data['title'])}</b>
 
        <b>${c.latest_data['title']}</b>
 
    %else:
 
        <b>${c.latest_ver}</b>
 
    %endif
 
%else:
 
    You already have the <b>latest</b> stable version.
 
%endif
 
</p>
 

	
 
% if c.should_upgrade and c.important_notices:
 
<div style="color: #5f5f5f; padding: 3px 0px 5px 0px;">Important notes for this release:</div>
 
    <ul>
 
    % for notice in c.important_notices:
 
        <li>- ${notice}</li>
 
    % endfor
 
    </ul>
 
% endif
 
</div>
kallithea/templates/base/default_perms_box.html
Show inline comments
 
## snippet for displaying default permission box
 
## usage:
 
##    <%namespace name="dpb" file="/base/default_perms_box.html"/>
 
##    ${dpb.default_perms_box(<url_to_form>)}
 

	
 

	
 
<%def name="default_perms_box(form_url)">
 
${h.form(form_url, method='put')}
 
    <div class="form">
 
        <!-- fields -->
 
        <div class="fields">
 
             <div class="field">
 
                <div class="label label-checkbox">
 
                    <label for="inherit_default_permissions">${_('Inherit defaults')}:</label>
 
                </div>
 
                <div class="checkboxes">
 
                    ${h.checkbox('inherit_default_permissions',value=True)}
 
                    <span class="help-block">
 
                    ${h.literal(_('Select to inherit global settings, IP whitelist and permissions from the %s.')
 
                                % h.link_to('default permissions', url('admin_permissions')))}
 
                    </span>
 
                </div>
 
             </div>
 

	
 
             <div id="inherit_overlay">
 
             <div class="field">
 
                <div class="label label-checkbox">
 
                    <label for="create_repo_perm">${_('Create repositories')}:</label>
 
                </div>
 
                <div class="checkboxes">
 
                    ${h.checkbox('create_repo_perm',value=True)}
 
                    <span class="help-block">
 
                    ${h.literal(_('Select this option to allow repository creation for this user'))}
 
                    ${_('Select this option to allow repository creation for this user')}
 
                    </span>
 
                </div>
 
             </div>
 

	
 
             <div class="field">
 
                <div class="label label-checkbox">
 
                    <label for="create_user_group_perm">${_('Create user groups')}:</label>
 
                </div>
 
                <div class="checkboxes">
 
                    ${h.checkbox('create_user_group_perm',value=True)}
 
                    <span class="help-block">
 
                        ${h.literal(_('Select this option to allow user group creation for this user'))}
 
                        ${_('Select this option to allow user group creation for this user')}
 
                    </span>
 
                </div>
 
             </div>
 

	
 
             <div class="field">
 
                <div class="label label-checkbox">
 
                    <label for="fork_repo_perm">${_('Fork repositories')}:</label>
 
                </div>
 
                <div class="checkboxes">
 
                    ${h.checkbox('fork_repo_perm',value=True)}
 
                    <span class="help-block">
 
                        ${h.literal(_('Select this option to allow repository forking for this user'))}
 
                        ${_('Select this option to allow repository forking for this user')}
 
                    </span>
 
                </div>
 
             </div>
 

	
 
            </div>
 
            <div class="buttons">
 
              ${h.submit('save',_('Save'),class_="btn")}
 
              ${h.reset('reset',_('Reset'),class_="btn")}
 
            </div>
 
        </div>
 
    </div>
 
${h.end_form()}
 

	
 
## JS
 
<script>
 
$(document).ready(function(e){
 
    var show_custom_perms = function(inherit_default){
 
        if(inherit_default){
 
            $('#inherit_overlay').hide();
 
        }else{
 
            $('#inherit_overlay').show();
 
        }
 
    };
 

	
 
    show_custom_perms($('#inherit_default_permissions').prop('checked'));
 
    $('#inherit_default_permissions').change(function(){
 
        show_custom_perms($('#inherit_default_permissions').prop('checked'));
 
    });
 
});
 
</script>
 

	
 
</%def>
0 comments (0 inline, 0 general)