Changeset - 20bf8e618bc9
[Not reviewed]
default
0 3 0
domruf - 10 years ago 2016-03-02 18:58:25
dominikruf@gmail.com
windows: node paths should always use posixpath - avoid \ in repo internal paths on windows

Using \ on Windows would result in a wrong node path which could not be checked
out with Git on Windows.
3 files changed with 29 insertions and 26 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/files.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.files
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Files controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 21, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import posixpath
 
import logging
 
import traceback
 
import tempfile
 
import shutil
 

	
 
from pylons import request, response, tmpl_context as c, url
 
from pylons.i18n.translation import _
 
from webob.exc import HTTPFound
 

	
 
from kallithea.lib.utils import jsonify, action_logger
 
from kallithea.lib import diffs
 
from kallithea.lib import helpers as h
 

	
 
from kallithea.lib.compat import OrderedDict
 
from kallithea.lib.utils2 import convert_line_endings, detect_mode, safe_str, \
 
    str2bool
 
from kallithea.lib.auth import LoginRequired, HasRepoPermissionAnyDecorator
 
from kallithea.lib.base import BaseRepoController, render
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import RepositoryError, \
 
    ChangesetDoesNotExistError, EmptyRepositoryError, \
 
    ImproperArchiveTypeError, VCSError, NodeAlreadyExistsError, \
 
    NodeDoesNotExistError, ChangesetError, NodeError
 
from kallithea.lib.vcs.nodes import FileNode
 

	
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.db import Repository
 

	
 
from kallithea.controllers.changeset import anchor_url, _ignorews_url, \
 
    _context_url, get_line_ctx, get_ignore_ws
 
from webob.exc import HTTPNotFound
 
from kallithea.lib.exceptions import NonRelativePathError
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class FilesController(BaseRepoController):
 

	
 
    def __before__(self):
 
        super(FilesController, self).__before__()
 
        c.cut_off_limit = self.cut_off_limit
 

	
 
    def __get_cs(self, rev, silent_empty=False):
 
        """
 
        Safe way to get changeset if error occur it redirects to tip with
 
        proper message
 

	
 
        :param rev: revision to fetch
 
        :silent_empty: return None if repository is empty
 
        """
 

	
 
        try:
 
            return c.db_repo_scm_instance.get_changeset(rev)
 
        except EmptyRepositoryError as e:
 
            if silent_empty:
 
                return None
 
            url_ = url('files_add_home',
 
                       repo_name=c.repo_name,
 
                       revision=0, f_path='', anchor='edit')
 
            add_new = h.link_to(_('Click here to add new file'), url_, class_="alert-link")
 
            h.flash(h.literal(_('There are no files yet. %s') % add_new),
 
                    category='warning')
 
            raise HTTPNotFound()
 
        except (ChangesetDoesNotExistError, LookupError):
 
            msg = _('Such revision does not exist for this repository')
 
            h.flash(msg, category='error')
 
            raise HTTPNotFound()
 
        except RepositoryError as e:
 
            h.flash(safe_str(e), category='error')
 
            raise HTTPNotFound()
 

	
 
    def __get_filenode(self, cs, path):
 
        """
 
        Returns file_node or raise HTTP error.
 

	
 
        :param cs: given changeset
 
        :param path: path to lookup
 
        """
 

	
 
        try:
 
            file_node = cs.get_node(path)
 
            if file_node.is_dir():
 
                raise RepositoryError('given path is a directory')
 
        except ChangesetDoesNotExistError:
 
            msg = _('Such revision does not exist for this repository')
 
            h.flash(msg, category='error')
 
            raise HTTPNotFound()
 
        except RepositoryError as e:
 
            h.flash(safe_str(e), category='error')
 
            raise HTTPNotFound()
 

	
 
        return file_node
 

	
 
@@ -378,193 +379,193 @@ class FilesController(BaseRepoController
 
            h.flash(_('You can only edit files with revision '
 
                      'being a valid branch'), category='warning')
 
            raise HTTPFound(location=h.url('files_home',
 
                                  repo_name=repo_name, revision='tip',
 
                                  f_path=f_path))
 

	
 
        r_post = request.POST
 

	
 
        c.cs = self.__get_cs(revision)
 
        c.file = self.__get_filenode(c.cs, f_path)
 

	
 
        if c.file.is_binary:
 
            raise HTTPFound(location=url('files_home', repo_name=c.repo_name,
 
                            revision=c.cs.raw_id, f_path=f_path))
 
        c.default_message = _('Edited file %s via Kallithea') % (f_path)
 
        c.f_path = f_path
 

	
 
        if r_post:
 

	
 
            old_content = c.file.content
 
            sl = old_content.splitlines(1)
 
            first_line = sl[0] if sl else ''
 
            # modes:  0 - Unix, 1 - Mac, 2 - DOS
 
            mode = detect_mode(first_line, 0)
 
            content = convert_line_endings(r_post.get('content', ''), mode)
 

	
 
            message = r_post.get('message') or c.default_message
 
            author = self.authuser.full_contact
 

	
 
            if content == old_content:
 
                h.flash(_('No changes'), category='warning')
 
                raise HTTPFound(location=url('changeset_home', repo_name=c.repo_name,
 
                                    revision='tip'))
 
            try:
 
                self.scm_model.commit_change(repo=c.db_repo_scm_instance,
 
                                             repo_name=repo_name, cs=c.cs,
 
                                             user=self.authuser.user_id,
 
                                             author=author, message=message,
 
                                             content=content, f_path=f_path)
 
                h.flash(_('Successfully committed to %s') % f_path,
 
                        category='success')
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                h.flash(_('Error occurred during commit'), category='error')
 
            raise HTTPFound(location=url('changeset_home',
 
                                repo_name=c.repo_name, revision='tip'))
 

	
 
        return render('files/files_edit.html')
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionAnyDecorator('repository.write', 'repository.admin')
 
    def add(self, repo_name, revision, f_path):
 

	
 
        repo = Repository.get_by_repo_name(repo_name)
 
        if repo.enable_locking and repo.locked[0]:
 
            h.flash(_('This repository has been locked by %s on %s')
 
                % (h.person_by_id(repo.locked[0]),
 
                   h.fmt_date(h.time_to_datetime(repo.locked[1]))),
 
                  'warning')
 
            raise HTTPFound(location=h.url('files_home',
 
                                  repo_name=repo_name, revision='tip'))
 

	
 
        r_post = request.POST
 
        c.cs = self.__get_cs(revision, silent_empty=True)
 
        if c.cs is None:
 
            c.cs = EmptyChangeset(alias=c.db_repo_scm_instance.alias)
 
        c.default_message = (_('Added file via Kallithea'))
 
        c.f_path = f_path
 

	
 
        if r_post:
 
            unix_mode = 0
 
            content = convert_line_endings(r_post.get('content', ''), unix_mode)
 

	
 
            message = r_post.get('message') or c.default_message
 
            filename = r_post.get('filename')
 
            location = r_post.get('location', '')
 
            file_obj = r_post.get('upload_file', None)
 

	
 
            if file_obj is not None and hasattr(file_obj, 'filename'):
 
                filename = file_obj.filename
 
                content = file_obj.file
 

	
 
                if hasattr(content, 'file'):
 
                    # non posix systems store real file under file attr
 
                    content = content.file
 

	
 
            if not content:
 
                h.flash(_('No content'), category='warning')
 
                raise HTTPFound(location=url('changeset_home', repo_name=c.repo_name,
 
                                    revision='tip'))
 
            if not filename:
 
                h.flash(_('No filename'), category='warning')
 
                raise HTTPFound(location=url('changeset_home', repo_name=c.repo_name,
 
                                    revision='tip'))
 
            #strip all crap out of file, just leave the basename
 
            filename = os.path.basename(filename)
 
            node_path = os.path.join(location, filename)
 
            node_path = posixpath.join(location, filename)
 
            author = self.authuser.full_contact
 

	
 
            try:
 
                nodes = {
 
                    node_path: {
 
                        'content': content
 
                    }
 
                }
 
                self.scm_model.create_nodes(
 
                    user=c.authuser.user_id, repo=c.db_repo,
 
                    message=message,
 
                    nodes=nodes,
 
                    parent_cs=c.cs,
 
                    author=author,
 
                )
 

	
 
                h.flash(_('Successfully committed to %s') % node_path,
 
                        category='success')
 
            except NonRelativePathError as e:
 
                h.flash(_('Location must be relative path and must not '
 
                          'contain .. in path'), category='warning')
 
                raise HTTPFound(location=url('changeset_home', repo_name=c.repo_name,
 
                                    revision='tip'))
 
            except (NodeError, NodeAlreadyExistsError) as e:
 
                h.flash(_(e), category='error')
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                h.flash(_('Error occurred during commit'), category='error')
 
            raise HTTPFound(location=url('changeset_home',
 
                                repo_name=c.repo_name, revision='tip'))
 

	
 
        return render('files/files_add.html')
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionAnyDecorator('repository.read', 'repository.write',
 
                                   'repository.admin')
 
    def archivefile(self, repo_name, fname):
 
        fileformat = None
 
        revision = None
 
        ext = None
 
        subrepos = request.GET.get('subrepos') == 'true'
 

	
 
        for a_type, ext_data in settings.ARCHIVE_SPECS.items():
 
            archive_spec = fname.split(ext_data[1])
 
            if len(archive_spec) == 2 and archive_spec[1] == '':
 
                fileformat = a_type or ext_data[1]
 
                revision = archive_spec[0]
 
                ext = ext_data[1]
 

	
 
        try:
 
            dbrepo = RepoModel().get_by_repo_name(repo_name)
 
            if not dbrepo.enable_downloads:
 
                return _('Downloads disabled') # TODO: do something else?
 

	
 
            if c.db_repo_scm_instance.alias == 'hg':
 
                # patch and reset hooks section of UI config to not run any
 
                # hooks on fetching archives with subrepos
 
                for k, v in c.db_repo_scm_instance._repo.ui.configitems('hooks'):
 
                    c.db_repo_scm_instance._repo.ui.setconfig('hooks', k, None)
 

	
 
            cs = c.db_repo_scm_instance.get_changeset(revision)
 
            content_type = settings.ARCHIVE_SPECS[fileformat][0]
 
        except ChangesetDoesNotExistError:
 
            return _('Unknown revision %s') % revision
 
        except EmptyRepositoryError:
 
            return _('Empty repository')
 
        except (ImproperArchiveTypeError, KeyError):
 
            return _('Unknown archive type')
 

	
 
        from kallithea import CONFIG
 
        rev_name = cs.raw_id[:12]
 
        archive_name = '%s-%s%s' % (safe_str(repo_name.replace('/', '_')),
 
                                    safe_str(rev_name), ext)
 

	
 
        archive_path = None
 
        cached_archive_path = None
 
        archive_cache_dir = CONFIG.get('archive_cache_dir')
 
        if archive_cache_dir and not subrepos: # TOOD: subrepo caching?
 
            if not os.path.isdir(archive_cache_dir):
 
                os.makedirs(archive_cache_dir)
 
            cached_archive_path = os.path.join(archive_cache_dir, archive_name)
 
            if os.path.isfile(cached_archive_path):
 
                log.debug('Found cached archive in %s', cached_archive_path)
 
                archive_path = cached_archive_path
 
            else:
 
                log.debug('Archive %s is not yet cached', archive_name)
 

	
 
        if archive_path is None:
 
            # generate new archive
 
            fd, archive_path = tempfile.mkstemp()
 
            log.debug('Creating new temp archive in %s', archive_path)
 
            with os.fdopen(fd, 'wb') as stream:
 
                cs.fill_archive(stream=stream, kind=fileformat, subrepos=subrepos)
 
                # stream (and thus fd) has been closed by cs.fill_archive
 
            if cached_archive_path is not None:
 
                # we generated the archive - move it to cache
kallithea/model/scm.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.scm
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Scm model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 9, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import posixpath
 
import re
 
import time
 
import traceback
 
import logging
 
import cStringIO
 
import pkg_resources
 
from os.path import join as jn
 

	
 
from sqlalchemy import func
 
from pylons.i18n.translation import _
 

	
 
import kallithea
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.exceptions import RepositoryError
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 

	
 
from kallithea import BACKENDS
 
from kallithea.lib import helpers as h
 
from kallithea.lib.utils2 import safe_str, safe_unicode, get_server_url, \
 
    _set_extras
 
from kallithea.lib.auth import HasRepoPermissionAny, HasRepoGroupPermissionAny, \
 
    HasUserGroupPermissionAny, HasPermissionAny, HasPermissionAll
 
from kallithea.lib.utils import get_filesystem_repos, make_ui, \
 
    action_logger
 
from kallithea.model import BaseModel
 
from kallithea.model.db import Repository, Ui, CacheInvalidation, \
 
    UserFollowing, UserLog, User, RepoGroup, PullRequest
 
from kallithea.lib.hooks import log_push_action
 
from kallithea.lib.exceptions import NonRelativePathError, IMCCommitError
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UserTemp(object):
 
    def __init__(self, user_id):
 
        self.user_id = user_id
 

	
 
    def __repr__(self):
 
        return "<%s('id:%s')>" % (self.__class__.__name__, self.user_id)
 

	
 

	
 
class RepoTemp(object):
 
    def __init__(self, repo_id):
 
        self.repo_id = repo_id
 

	
 
    def __repr__(self):
 
        return "<%s('id:%s')>" % (self.__class__.__name__, self.repo_id)
 

	
 

	
 
class CachedRepoList(object):
 
    """
 
    Cached repo list. Uses super-fast in-memory cache after initialization.
 
    """
 

	
 
    def __init__(self, db_repo_list, repos_path, order_by=None, perm_set=None):
 
        self.db_repo_list = db_repo_list
 
        self.repos_path = repos_path
 
        self.order_by = order_by
 
        self.reversed = (order_by or '').startswith('-')
 
        if not perm_set:
 
            perm_set = ['repository.read', 'repository.write',
 
                        'repository.admin']
 
        self.perm_set = perm_set
 

	
 
    def __len__(self):
 
        return len(self.db_repo_list)
 

	
 
    def __repr__(self):
 
        return '<%s (%s)>' % (self.__class__.__name__, self.__len__())
 

	
 
    def __iter__(self):
 
        # pre-propagated valid_cache_keys to save executing select statements
 
        # for each repo
 
        valid_cache_keys = CacheInvalidation.get_valid_cache_keys()
 

	
 
        for dbr in self.db_repo_list:
 
            scmr = dbr.scm_instance_cached(valid_cache_keys)
 
            # check permission at this level
 
            if not HasRepoPermissionAny(
 
                *self.perm_set)(dbr.repo_name, 'get repo check'):
 
                continue
 

	
 
            try:
 
                last_change = scmr.last_change
 
                tip = h.get_changeset_safe(scmr, 'tip')
 
            except Exception:
 
                log.error(
 
                    '%s this repository is present in database but it '
 
                    'cannot be created as an scm instance, org_exc:%s'
 
                    % (dbr.repo_name, traceback.format_exc())
 
                )
 
                continue
 

	
 
            tmp_d = {}
 
@@ -482,193 +483,193 @@ class ScmModel(BaseModel):
 
        _scm_repo = repo._repo
 
        # trigger push hook
 
        if repo.alias == 'hg':
 
            log_push_action(_scm_repo.ui, _scm_repo, node=revisions[0])
 
        elif repo.alias == 'git':
 
            log_push_action(None, _scm_repo, _git_revs=revisions)
 

	
 
    def _get_IMC_module(self, scm_type):
 
        """
 
        Returns InMemoryCommit class based on scm_type
 

	
 
        :param scm_type:
 
        """
 
        if scm_type == 'hg':
 
            from kallithea.lib.vcs.backends.hg import MercurialInMemoryChangeset
 
            return MercurialInMemoryChangeset
 

	
 
        if scm_type == 'git':
 
            from kallithea.lib.vcs.backends.git import GitInMemoryChangeset
 
            return GitInMemoryChangeset
 

	
 
        raise Exception('Invalid scm_type, must be one of hg,git got %s'
 
                        % (scm_type,))
 

	
 
    def pull_changes(self, repo, username):
 
        """
 
        Pull from "clone URL".
 
        """
 
        dbrepo = self.__get_repo(repo)
 
        clone_uri = dbrepo.clone_uri
 
        if not clone_uri:
 
            raise Exception("This repository doesn't have a clone uri")
 

	
 
        repo = dbrepo.scm_instance
 
        repo_name = dbrepo.repo_name
 
        try:
 
            if repo.alias == 'git':
 
                repo.fetch(clone_uri)
 
                # git doesn't really have something like post-fetch action
 
                # we fake that now. #TODO: extract fetched revisions somehow
 
                # here
 
                self._handle_push(repo,
 
                                  username=username,
 
                                  action='push_remote',
 
                                  repo_name=repo_name,
 
                                  revisions=[])
 
            else:
 
                self._handle_rc_scm_extras(username, dbrepo.repo_name,
 
                                           repo.alias, action='push_remote')
 
                repo.pull(clone_uri)
 

	
 
            self.mark_for_invalidation(repo_name)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def commit_change(self, repo, repo_name, cs, user, author, message,
 
                      content, f_path):
 
        """
 
        Commit a change to a single file
 

	
 
        :param repo: a db_repo.scm_instance
 
        """
 
        user = self._get_user(user)
 
        IMC = self._get_IMC_module(repo.alias)
 

	
 
        # decoding here will force that we have proper encoded values
 
        # in any other case this will throw exceptions and deny commit
 
        content = safe_str(content)
 
        path = safe_str(f_path)
 
        # message and author needs to be unicode
 
        # proper backend should then translate that into required type
 
        message = safe_unicode(message)
 
        author = safe_unicode(author)
 
        imc = IMC(repo)
 
        imc.change(FileNode(path, content, mode=cs.get_file_mode(f_path)))
 
        try:
 
            tip = imc.commit(message=message, author=author,
 
                             parents=[cs], branch=cs.branch)
 
        except Exception as e:
 
            log.error(traceback.format_exc())
 
            raise IMCCommitError(str(e))
 
        finally:
 
            # always clear caches, if commit fails we want fresh object also
 
            self.mark_for_invalidation(repo_name)
 
        self._handle_push(repo,
 
                          username=user.username,
 
                          action='push_local',
 
                          repo_name=repo_name,
 
                          revisions=[tip.raw_id])
 
        return tip
 

	
 
    def _sanitize_path(self, f_path):
 
        if f_path.startswith('/') or f_path.startswith('.') or '../' in f_path:
 
            raise NonRelativePathError('%s is not an relative path' % f_path)
 
        if f_path:
 
            f_path = os.path.normpath(f_path)
 
            f_path = posixpath.normpath(f_path)
 
        return f_path
 

	
 
    def get_nodes(self, repo_name, revision, root_path='/', flat=True):
 
        """
 
        Recursively walk root dir and return a set of all paths found.
 

	
 
        :param repo_name: name of repository
 
        :param revision: revision for which to list nodes
 
        :param root_path: root path to list
 
        :param flat: return as a list, if False returns a dict with description
 

	
 
        """
 
        _files = list()
 
        _dirs = list()
 
        try:
 
            _repo = self.__get_repo(repo_name)
 
            changeset = _repo.scm_instance.get_changeset(revision)
 
            root_path = root_path.lstrip('/')
 
            for topnode, dirs, files in changeset.walk(root_path):
 
                for f in files:
 
                    _files.append(f.path if flat else {"name": f.path,
 
                                                       "type": "file"})
 
                for d in dirs:
 
                    _dirs.append(d.path if flat else {"name": d.path,
 
                                                      "type": "dir"})
 
        except RepositoryError:
 
            log.debug(traceback.format_exc())
 
            raise
 

	
 
        return _dirs, _files
 

	
 
    def create_nodes(self, user, repo, message, nodes, parent_cs=None,
 
                     author=None, trigger_push_hook=True):
 
        """
 
        Commits specified nodes to repo.
 

	
 
        :param user: Kallithea User object or user_id, the committer
 
        :param repo: Kallithea Repository object
 
        :param message: commit message
 
        :param nodes: mapping {filename:{'content':content},...}
 
        :param parent_cs: parent changeset, can be empty than it's initial commit
 
        :param author: author of commit, cna be different that committer only for git
 
        :param trigger_push_hook: trigger push hooks
 

	
 
        :returns: new committed changeset
 
        """
 

	
 
        user = self._get_user(user)
 
        scm_instance = repo.scm_instance_no_cache()
 

	
 
        processed_nodes = []
 
        for f_path in nodes:
 
            content = nodes[f_path]['content']
 
            f_path = self._sanitize_path(f_path)
 
            f_path = safe_str(f_path)
 
            # decoding here will force that we have proper encoded values
 
            # in any other case this will throw exceptions and deny commit
 
            if isinstance(content, (basestring,)):
 
                content = safe_str(content)
 
            elif isinstance(content, (file, cStringIO.OutputType,)):
 
                content = content.read()
 
            else:
 
                raise Exception('Content is of unrecognized type %s' % (
 
                    type(content)
 
                ))
 
            processed_nodes.append((f_path, content))
 

	
 
        message = safe_unicode(message)
 
        committer = user.full_contact
 
        author = safe_unicode(author) if author else committer
 

	
 
        IMC = self._get_IMC_module(scm_instance.alias)
 
        imc = IMC(scm_instance)
 

	
 
        if not parent_cs:
 
            parent_cs = EmptyChangeset(alias=scm_instance.alias)
 

	
 
        if isinstance(parent_cs, EmptyChangeset):
 
            # EmptyChangeset means we we're editing empty repository
 
            parents = None
 
        else:
 
            parents = [parent_cs]
 
        # add multiple nodes
 
        for path, content in processed_nodes:
 
            imc.add(FileNode(path, content=content))
 

	
 
        tip = imc.commit(message=message,
 
                         author=author,
 
                         parents=parents,
 
                         branch=parent_cs.branch)
 

	
 
        self.mark_for_invalidation(repo.repo_name)
 
        if trigger_push_hook:
 
            self._handle_push(scm_instance,
 
                              username=user.username,
 
                              action='push_local',
kallithea/tests/functional/test_files.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
import os
 
import posixpath
 
from kallithea.tests import *
 
from kallithea.model.db import Repository
 
from kallithea.model.meta import Session
 
from kallithea.tests.fixture import Fixture
 

	
 
fixture = Fixture()
 

	
 
ARCHIVE_SPECS = {
 
    '.tar.bz2': ('application/x-bzip2', 'tbz2', ''),
 
    '.tar.gz': ('application/x-gzip', 'tgz', ''),
 
    '.zip': ('application/zip', 'zip', ''),
 
}
 

	
 
HG_NODE_HISTORY = fixture.load_resource('hg_node_history_response.json')
 
GIT_NODE_HISTORY = fixture.load_resource('git_node_history_response.json')
 

	
 

	
 
def _set_downloads(repo_name, set_to):
 
    repo = Repository.get_by_repo_name(repo_name)
 
    repo.enable_downloads = set_to
 
    Session().add(repo)
 
    Session().commit()
 

	
 

	
 
class TestFilesController(TestControllerPytest):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=HG_REPO,
 
                                    revision='tip',
 
                                    f_path='/'))
 
        # Test response...
 
        response.mustcontain('<a class="browser-dir ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/docs"><i class="icon-folder-open"></i><span>docs</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-dir ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/vcs"><i class="icon-folder-open"></i><span>vcs</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/.gitignore"><i class="icon-doc"></i><span>.gitignore</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/.hgignore"><i class="icon-doc"></i><span>.hgignore</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/.hgtags"><i class="icon-doc"></i><span>.hgtags</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/.travis.yml"><i class="icon-doc"></i><span>.travis.yml</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/MANIFEST.in"><i class="icon-doc"></i><span>MANIFEST.in</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/README.rst"><i class="icon-doc"></i><span>README.rst</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/run_test_and_report.sh"><i class="icon-doc"></i><span>run_test_and_report.sh</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/setup.cfg"><i class="icon-doc"></i><span>setup.cfg</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/setup.py"><i class="icon-doc"></i><span>setup.py</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/test_and_report.sh"><i class="icon-doc"></i><span>test_and_report.sh</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/96507bd11ecc815ebc6270fdf6db110928c09c1e/tox.ini"><i class="icon-doc"></i><span>tox.ini</span></a>' % HG_REPO)
 

	
 
    def test_index_revision(self):
 
        self.log_user()
 

	
 
        response = self.app.get(
 
            url(controller='files', action='index',
 
                repo_name=HG_REPO,
 
                revision='7ba66bec8d6dbba14a2155be32408c435c5f4492',
 
                f_path='/')
 
        )
 

	
 
        #Test response...
 

	
 
        response.mustcontain('<a class="browser-dir ypjax-link" href="/%s/files/7ba66bec8d6dbba14a2155be32408c435c5f4492/docs"><i class="icon-folder-open"></i><span>docs</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-dir ypjax-link" href="/%s/files/7ba66bec8d6dbba14a2155be32408c435c5f4492/tests"><i class="icon-folder-open"></i><span>tests</span></a>' % HG_REPO)
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/%s/files/7ba66bec8d6dbba14a2155be32408c435c5f4492/README.rst"><i class="icon-doc"></i><span>README.rst</span></a>' % HG_REPO)
 
        response.mustcontain('1.1 KiB')
 

	
 
    def test_index_different_branch(self):
 
        self.log_user()
 

	
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=HG_REPO,
 
                                    revision='97e8b885c04894463c51898e14387d80c30ed1ee',
 
                                    f_path='/'))
 

	
 
        response.mustcontain("""<option selected="selected" value="97e8b885c04894463c51898e14387d80c30ed1ee">git at 97e8b885c048</option>""")
 

	
 
    def test_index_paging(self):
 
        self.log_user()
 

	
 
        for r in [(73, 'a066b25d5df7016b45a41b7e2a78c33b57adc235'),
 
                  (92, 'cc66b61b8455b264a7a8a2d8ddc80fcfc58c221e'),
 
                  (109, '75feb4c33e81186c87eac740cee2447330288412'),
 
                  (1, '3d8f361e72ab303da48d799ff1ac40d5ac37c67e'),
 
                  (0, 'b986218ba1c9b0d6a259fac9b050b1724ed8e545')]:
 

	
 
            response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=HG_REPO,
 
                                    revision=r[1],
 
                                    f_path='/'))
 

	
 
            response.mustcontain("""@ r%s:%s""" % (r[0], r[1][:12]))
 

	
 
    def test_file_source(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=HG_REPO,
 
                                    revision='8911406ad776fdd3d0b9932a2e89677e57405a48',
 
                                    f_path='vcs/nodes.py'))
 
@@ -294,469 +295,469 @@ removed extra unicode conversion in diff
 
        response.mustcontain(msg)
 

	
 
    def test_raw_wrong_f_path(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        f_path = 'vcs/ERRORnodes.py'
 
        response = self.app.get(url(controller='files', action='raw',
 
                                    repo_name=HG_REPO,
 
                                    revision=rev,
 
                                    f_path=f_path), status=404)
 
        msg = "There is no file nor directory at the given path: &#39;%s&#39; at revision %s" % (f_path, rev[:12])
 
        response.mustcontain(msg)
 

	
 
    def test_ajaxed_files_list(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        response = self.app.get(
 
            url('files_nodelist_home', repo_name=HG_REPO, f_path='/',
 
                revision=rev),
 
            extra_environ={'HTTP_X_PARTIAL_XHR': '1'},
 
        )
 
        response.mustcontain("vcs/web/simplevcs/views/repository.py")
 

	
 
    # Hg - ADD FILE
 
    def test_add_file_view_hg(self):
 
        self.log_user()
 
        response = self.app.get(url('files_add_home',
 
                                      repo_name=HG_REPO,
 
                                      revision='tip', f_path='/'))
 

	
 
    def test_add_file_into_hg_missing_content(self):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=HG_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': '',
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'No content')
 

	
 
    def test_add_file_into_hg_missing_filename(self):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=HG_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'No filename')
 

	
 
    @parametrize('location,filename', [
 
        ('/abs', 'foo'),
 
        ('../rel', 'foo'),
 
        ('file/../foo', 'foo'),
 
    ])
 
    def test_add_file_into_hg_bad_filenames(self, location, filename):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=HG_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'Location must be relative path and must not contain .. in path')
 

	
 
    @parametrize('cnt,location,filename', [
 
        (1, '', 'foo.txt'),
 
        (2, 'dir', 'foo.rst'),
 
        (3, 'rel/dir', 'foo.bar'),
 
    ])
 
    def test_add_file_into_hg(self, cnt, location, filename):
 
        self.log_user()
 
        repo = fixture.create_repo(u'commit-test-%s' % cnt, repo_type='hg')
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
                                   % posixpath.join(location, filename))
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    # Git - add file
 
    def test_add_file_view_git(self):
 
        self.log_user()
 
        response = self.app.get(url('files_add_home',
 
                                      repo_name=GIT_REPO,
 
                                      revision='tip', f_path='/'))
 

	
 
    def test_add_file_into_git_missing_content(self):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=GIT_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                     'content': '',
 
                                     '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        self.checkSessionFlash(response, 'No content')
 

	
 
    def test_add_file_into_git_missing_filename(self):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=GIT_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'No filename')
 

	
 
    @parametrize('location,filename', [
 
        ('/abs', 'foo'),
 
        ('../rel', 'foo'),
 
        ('file/../foo', 'foo'),
 
    ])
 
    def test_add_file_into_git_bad_filenames(self, location, filename):
 
        self.log_user()
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=GIT_REPO,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 

	
 
        self.checkSessionFlash(response, 'Location must be relative path and must not contain .. in path')
 

	
 
    @parametrize('cnt,location,filename', [
 
        (1, '', 'foo.txt'),
 
        (2, 'dir', 'foo.rst'),
 
        (3, 'rel/dir', 'foo.bar'),
 
    ])
 
    def test_add_file_into_git(self, cnt, location, filename):
 
        self.log_user()
 
        repo = fixture.create_repo(u'commit-test-%s' % cnt, repo_type='git')
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "foo",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
                                   % posixpath.join(location, filename))
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    # Hg - EDIT
 
    def test_edit_file_view_hg(self):
 
        self.log_user()
 
        response = self.app.get(url('files_edit_home',
 
                                      repo_name=HG_REPO,
 
                                      revision='tip', f_path='vcs/nodes.py'))
 

	
 
    def test_edit_file_view_not_on_branch_hg(self):
 
        self.log_user()
 
        repo = fixture.create_repo(u'test-edit-repo', repo_type='hg')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
                                   % posixpath.join(location, filename))
 
            response = self.app.get(url('files_edit_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision='tip', f_path='vcs/nodes.py'),
 
                                          revision='tip', f_path=posixpath.join(location, filename)),
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                'You can only edit files with revision being a valid branch')
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    def test_edit_file_view_commit_changes_hg(self):
 
        self.log_user()
 
        repo = fixture.create_repo(u'test-edit-repo', repo_type='hg')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip',
 
                                      f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
                                   % posixpath.join(location, filename))
 
            response = self.app.post(url('files_edit_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision=repo.scm_instance.DEFAULT_BRANCH_NAME,
 
                                          f_path='vcs/nodes.py'),
 
                                          f_path=posixpath.join(location, filename)),
 
                                     params={
 
                                        'content': "def py():\n print 'hello world'\n",
 
                                        'message': 'i commited',
 
                                        '_authentication_token': self.authentication_token(),
 
                                     },
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                                   'Successfully committed to vcs/nodes.py')
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % posixpath.join(location, filename))
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    # Git - edit
 
    def test_edit_file_view_git(self):
 
        self.log_user()
 
        response = self.app.get(url('files_edit_home',
 
                                      repo_name=GIT_REPO,
 
                                      revision='tip', f_path='vcs/nodes.py'))
 

	
 
    def test_edit_file_view_not_on_branch_git(self):
 
        self.log_user()
 
        repo = fixture.create_repo(u'test-edit-repo', repo_type='git')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
                                   % posixpath.join(location, filename))
 
            response = self.app.get(url('files_edit_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision='tip', f_path='vcs/nodes.py'),
 
                                          revision='tip', f_path=posixpath.join(location, filename)),
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                'You can only edit files with revision being a valid branch')
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    def test_edit_file_view_commit_changes_git(self):
 
        self.log_user()
 
        repo = fixture.create_repo(u'test-edit-repo', repo_type='git')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip',
 
                                      f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
                                   % posixpath.join(location, filename))
 
            response = self.app.post(url('files_edit_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision=repo.scm_instance.DEFAULT_BRANCH_NAME,
 
                                          f_path='vcs/nodes.py'),
 
                                          f_path=posixpath.join(location, filename)),
 
                                     params={
 
                                        'content': "def py():\n print 'hello world'\n",
 
                                        'message': 'i commited',
 
                                        '_authentication_token': self.authentication_token(),
 
                                     },
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                                   'Successfully committed to vcs/nodes.py')
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % posixpath.join(location, filename))
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    # Hg - delete
 
    def test_delete_file_view_hg(self):
 
        self.log_user()
 
        response = self.app.get(url('files_delete_home',
 
                                     repo_name=HG_REPO,
 
                                     revision='tip', f_path='vcs/nodes.py'))
 

	
 
    def test_delete_file_view_not_on_branch_hg(self):
 
        self.log_user()
 
        repo = fixture.create_repo(u'test-delete-repo', repo_type='hg')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
                                   % posixpath.join(location, filename))
 
            response = self.app.get(url('files_delete_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision='tip', f_path='vcs/nodes.py'),
 
                                          revision='tip', f_path=posixpath.join(location, filename)),
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                'You can only delete files with revision being a valid branch')
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    def test_delete_file_view_commit_changes_hg(self):
 
        self.log_user()
 
        repo = fixture.create_repo(u'test-delete-repo', repo_type='hg')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip',
 
                                      f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
                                   % posixpath.join(location, filename))
 
            response = self.app.post(url('files_delete_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision=repo.scm_instance.DEFAULT_BRANCH_NAME,
 
                                          f_path='vcs/nodes.py'),
 
                                          f_path=posixpath.join(location, filename)),
 
                                     params={
 
                                        'message': 'i commited',
 
                                        '_authentication_token': self.authentication_token(),
 
                                     },
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                                   'Successfully deleted file vcs/nodes.py')
 
                                   'Successfully deleted file %s' % posixpath.join(location, filename))
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    # Git - delete
 
    def test_delete_file_view_git(self):
 
        self.log_user()
 
        response = self.app.get(url('files_delete_home',
 
                                     repo_name=HG_REPO,
 
                                     revision='tip', f_path='vcs/nodes.py'))
 

	
 
    def test_delete_file_view_not_on_branch_git(self):
 
        self.log_user()
 
        repo = fixture.create_repo(u'test-delete-repo', repo_type='git')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip', f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
                                   % posixpath.join(location, filename))
 
            response = self.app.get(url('files_delete_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision='tip', f_path='vcs/nodes.py'),
 
                                          revision='tip', f_path=posixpath.join(location, filename)),
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                'You can only delete files with revision being a valid branch')
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
 

	
 
    def test_delete_file_view_commit_changes_git(self):
 
        self.log_user()
 
        repo = fixture.create_repo(u'test-delete-repo', repo_type='git')
 

	
 
        ## add file
 
        location = 'vcs'
 
        filename = 'nodes.py'
 
        response = self.app.post(url('files_add_home',
 
                                      repo_name=repo.repo_name,
 
                                      revision='tip',
 
                                      f_path='/'),
 
                                 params={
 
                                    'content': "def py():\n print 'hello'\n",
 
                                    'filename': filename,
 
                                    'location': location,
 
                                    '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response.follow()
 
        try:
 
            self.checkSessionFlash(response, 'Successfully committed to %s'
 
                                   % os.path.join(location, filename))
 
                                   % posixpath.join(location, filename))
 
            response = self.app.post(url('files_delete_home',
 
                                          repo_name=repo.repo_name,
 
                                          revision=repo.scm_instance.DEFAULT_BRANCH_NAME,
 
                                          f_path='vcs/nodes.py'),
 
                                          f_path=posixpath.join(location, filename)),
 
                                     params={
 
                                        'message': 'i commited',
 
                                        '_authentication_token': self.authentication_token(),
 
                                     },
 
                                    status=302)
 
            self.checkSessionFlash(response,
 
                                   'Successfully deleted file vcs/nodes.py')
 
                                   'Successfully deleted file %s' % posixpath.join(location, filename))
 
        finally:
 
            fixture.destroy_repo(repo.repo_name)
0 comments (0 inline, 0 general)