Changeset - 21f7b699d467
[Not reviewed]
kallithea/lib/colored_formatter.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 

	
 

	
 
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = xrange(30, 38)
 
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(30, 38)
 

	
 
# Sequences
 
RESET_SEQ = "\033[0m"
 
COLOR_SEQ = "\033[0;%dm"
 
BOLD_SEQ = "\033[1m"
 

	
 
COLORS = {
 
    'CRITICAL': MAGENTA,
 
    'ERROR': RED,
 
    'WARNING': CYAN,
 
    'INFO': GREEN,
 
    'DEBUG': BLUE,
 
    'SQL': YELLOW
 
}
 

	
 

	
 
def one_space_trim(s):
 
    if s.find("  ") == -1:
 
        return s
 
    else:
 
        s = s.replace('  ', ' ')
 
        return one_space_trim(s)
 

	
 

	
 
def format_sql(sql):
 
    sql = sql.replace('\n', '')
 
    sql = one_space_trim(sql)
 
    sql = sql \
 
        .replace(',', ',\n\t') \
 
        .replace('SELECT', '\n\tSELECT \n\t') \
 
        .replace('UPDATE', '\n\tUPDATE \n\t') \
 
        .replace('DELETE', '\n\tDELETE \n\t') \
 
        .replace('FROM', '\n\tFROM') \
 
        .replace('ORDER BY', '\n\tORDER BY') \
 
        .replace('LIMIT', '\n\tLIMIT') \
 
        .replace('WHERE', '\n\tWHERE') \
 
        .replace('AND', '\n\tAND') \
 
        .replace('LEFT', '\n\tLEFT') \
 
        .replace('INNER', '\n\tINNER') \
 
        .replace('INSERT', '\n\tINSERT') \
 
        .replace('DELETE', '\n\tDELETE')
 
    return sql
 

	
 

	
 
class ColorFormatter(logging.Formatter):
 

	
 
    def __init__(self, *args, **kwargs):
 
        # can't do super(...) here because Formatter is an old school class
kallithea/lib/helpers.py
Show inline comments
 
@@ -321,97 +321,97 @@ def _markup_whitespace(m):
 
def markup_whitespace(s):
 
    return _whitespace_re.sub(_markup_whitespace, s)
 

	
 

	
 
def pygmentize(filenode, **kwargs):
 
    """
 
    pygmentize function using pygments
 

	
 
    :param filenode:
 
    """
 
    lexer = get_custom_lexer(filenode.extension) or filenode.lexer
 
    return literal(markup_whitespace(
 
        code_highlight(safe_unicode(filenode.content), lexer, CodeHtmlFormatter(**kwargs))))
 

	
 

	
 
def hsv_to_rgb(h, s, v):
 
    if s == 0.0:
 
        return v, v, v
 
    i = int(h * 6.0)  # XXX assume int() truncates!
 
    f = (h * 6.0) - i
 
    p = v * (1.0 - s)
 
    q = v * (1.0 - s * f)
 
    t = v * (1.0 - s * (1.0 - f))
 
    i = i % 6
 
    if i == 0:
 
        return v, t, p
 
    if i == 1:
 
        return q, v, p
 
    if i == 2:
 
        return p, v, t
 
    if i == 3:
 
        return p, q, v
 
    if i == 4:
 
        return t, p, v
 
    if i == 5:
 
        return v, p, q
 

	
 

	
 
def gen_color(n=10000):
 
    """generator for getting n of evenly distributed colors using
 
    hsv color and golden ratio. It always return same order of colors
 

	
 
    :returns: RGB tuple
 
    """
 

	
 
    golden_ratio = 0.618033988749895
 
    h = 0.22717784590367374
 

	
 
    for _unused in xrange(n):
 
    for _unused in range(n):
 
        h += golden_ratio
 
        h %= 1
 
        HSV_tuple = [h, 0.95, 0.95]
 
        RGB_tuple = hsv_to_rgb(*HSV_tuple)
 
        yield [str(int(x * 256)) for x in RGB_tuple]
 

	
 

	
 
def pygmentize_annotation(repo_name, filenode, **kwargs):
 
    """
 
    pygmentize function for annotation
 

	
 
    :param filenode:
 
    """
 
    cgenerator = gen_color()
 
    color_dict = {}
 

	
 
    def get_color_string(cs):
 
        if cs in color_dict:
 
            col = color_dict[cs]
 
        else:
 
            col = color_dict[cs] = next(cgenerator)
 
        return "color: rgb(%s)! important;" % (', '.join(col))
 

	
 
    def url_func(changeset):
 
        author = escape(changeset.author)
 
        date = changeset.date
 
        message = escape(changeset.message)
 
        tooltip_html = ("<b>Author:</b> %s<br/>"
 
                        "<b>Date:</b> %s</b><br/>"
 
                        "<b>Message:</b> %s") % (author, date, message)
 

	
 
        lnk_format = show_id(changeset)
 
        uri = link_to(
 
                lnk_format,
 
                url('changeset_home', repo_name=repo_name,
 
                    revision=changeset.raw_id),
 
                style=get_color_string(changeset.raw_id),
 
                **{'data-toggle': 'popover',
 
                   'data-content': tooltip_html}
 
              )
 

	
 
        uri += '\n'
 
        return uri
 

	
 
    return literal(markup_whitespace(annotate_highlight(filenode, url_func, **kwargs)))
 

	
 

	
 
class _Message(object):
kallithea/lib/timerproxy.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 
import time
 

	
 
from sqlalchemy.interfaces import ConnectionProxy
 

	
 

	
 
log = logging.getLogger('timerproxy')
 

	
 
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = xrange(30, 38)
 
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(30, 38)
 

	
 

	
 
def color_sql(sql):
 
    COLOR_SEQ = "\033[1;%dm"
 
    COLOR_SQL = YELLOW
 
    normal = '\x1b[0m'
 
    return ''.join([COLOR_SEQ % COLOR_SQL, sql, normal])
 

	
 

	
 
class TimerProxy(ConnectionProxy):
 

	
 
    def __init__(self):
 
        super(TimerProxy, self).__init__()
 

	
 
    def cursor_execute(self, execute, cursor, statement, parameters,
 
                       context, executemany):
 

	
 
        now = time.time()
 
        try:
 
            log.info(color_sql(">>>>> STARTING QUERY >>>>>"))
 
            return execute(cursor, statement, parameters, context)
 
        finally:
 
            total = time.time() - now
 
            log.info(color_sql("<<<<< TOTAL TIME: %f <<<<<" % total))
kallithea/lib/vcs/utils/progressbar.py
Show inline comments
 
# encoding: UTF-8
 

	
 
from __future__ import print_function
 

	
 
import datetime
 
import string
 
import sys
 

	
 
from kallithea.lib.vcs.utils.filesize import filesizeformat
 

	
 

	
 
class ProgressBarError(Exception):
 
    pass
 

	
 

	
 
class AlreadyFinishedError(ProgressBarError):
 
    pass
 

	
 

	
 
class ProgressBar(object):
 

	
 
    default_elements = ['percentage', 'bar', 'steps']
 

	
 
    def __init__(self, steps=100, stream=None, elements=None):
 
        self.step = 0
 
        self.steps = steps
 
        self.stream = stream or sys.stderr
 
        self.bar_char = '='
 
        self.width = 50
 
        self.separator = ' | '
 
        self.elements = elements or self.default_elements
 
        self.started = None
 
        self.finished = False
 
        self.steps_label = 'Step'
 
        self.time_label = 'Time'
 
        self.eta_label = 'ETA'
 
        self.speed_label = 'Speed'
 
        self.transfer_label = 'Transfer'
 

	
 
    def __str__(self):
 
        return self.get_line()
 

	
 
    def __iter__(self):
 
        start = self.step
 
        end = self.steps + 1
 
        for x in xrange(start, end):
 
        for x in range(start, end):
 
            self.render(x)
 
            yield x
 

	
 
    def get_separator(self):
 
        return self.separator
 

	
 
    def get_bar_char(self):
 
        return self.bar_char
 

	
 
    def get_bar(self):
 
        char = self.get_bar_char()
 
        perc = self.get_percentage()
 
        length = int(self.width * perc / 100)
 
        bar = char * length
 
        bar = bar.ljust(self.width)
 
        return bar
 

	
 
    def get_elements(self):
 
        return self.elements
 

	
 
    def get_template(self):
 
        separator = self.get_separator()
 
        elements = self.get_elements()
 
        return string.Template(separator.join((('$%s' % e) for e in elements)))
 

	
 
    def get_total_time(self, current_time=None):
 
        if current_time is None:
 
            current_time = datetime.datetime.now()
 
        if not self.started:
 
            return datetime.timedelta()
 
        return current_time - self.started
 

	
 
    def get_rendered_total_time(self):
 
        delta = self.get_total_time()
 
        if not delta:
 
            ttime = '-'
 
        else:
 
            ttime = str(delta)
 
        return '%s %s' % (self.time_label, ttime)
 

	
 
    def get_eta(self, current_time=None):
 
        if current_time is None:
 
            current_time = datetime.datetime.now()
 
        if self.step == 0:
 
            return datetime.timedelta()
 
        total_seconds = self.get_total_time().total_seconds()
 
        eta_seconds = total_seconds * self.steps / self.step - total_seconds
 
        return datetime.timedelta(seconds=int(eta_seconds))
 
@@ -314,111 +314,111 @@ class ColoredProgressBar(ProgressBar):
 
        perc = self.get_percentage()
 
        if perc > 100:
 
            color = 'blue'
 
        for max_perc, color in self.BAR_COLORS:
 
            if perc <= max_perc:
 
                break
 
        return colorize(line, fg=color)
 

	
 

	
 
class AnimatedProgressBar(ProgressBar):
 

	
 
    def get_bar_char(self):
 
        chars = '-/|\\'
 
        if self.step >= self.steps:
 
            return '='
 
        return chars[self.step % len(chars)]
 

	
 

	
 
class BarOnlyProgressBar(ProgressBar):
 

	
 
    default_elements = ['bar', 'steps']
 

	
 
    def get_bar(self):
 
        bar = super(BarOnlyProgressBar, self).get_bar()
 
        perc = self.get_percentage()
 
        perc_text = '%s%%' % int(perc)
 
        text = (' %s%% ' % (perc_text)).center(self.width, '=')
 
        L = text.find(' ')
 
        R = text.rfind(' ')
 
        bar = ' '.join((bar[:L], perc_text, bar[R:]))
 
        return bar
 

	
 

	
 
class AnimatedColoredProgressBar(AnimatedProgressBar,
 
                                 ColoredProgressBar):
 
    pass
 

	
 

	
 
class BarOnlyColoredProgressBar(ColoredProgressBar,
 
                                BarOnlyProgressBar):
 
    pass
 

	
 

	
 
def main():
 
    import time
 

	
 
    print("Standard progress bar...")
 
    bar = ProgressBar(30)
 
    for x in xrange(1, 31):
 
    for x in range(1, 31):
 
        bar.render(x)
 
        time.sleep(0.02)
 
    bar.stream.write('\n')
 
    print()
 

	
 
    print("Empty bar...")
 
    bar = ProgressBar(50)
 
    bar.render(0)
 
    print()
 
    print()
 

	
 
    print("Colored bar...")
 
    bar = ColoredProgressBar(20)
 
    for x in bar:
 
        time.sleep(0.01)
 
    print()
 

	
 
    print("Animated char bar...")
 
    bar = AnimatedProgressBar(20)
 
    for x in bar:
 
        time.sleep(0.01)
 
    print()
 

	
 
    print("Animated + colored char bar...")
 
    bar = AnimatedColoredProgressBar(20)
 
    for x in bar:
 
        time.sleep(0.01)
 
    print()
 

	
 
    print("Bar only ...")
 
    bar = BarOnlyProgressBar(20)
 
    for x in bar:
 
        time.sleep(0.01)
 
    print()
 

	
 
    print("Colored, longer bar-only, eta, total time ...")
 
    bar = BarOnlyColoredProgressBar(40)
 
    bar.width = 60
 
    bar.elements += ['time', 'eta']
 
    for x in bar:
 
        time.sleep(0.01)
 
    print()
 
    print()
 

	
 
    print("File transfer bar, breaks after 2 seconds ...")
 
    total_bytes = 1024 * 1024 * 2
 
    bar = ProgressBar(total_bytes)
 
    bar.width = 50
 
    bar.elements.remove('steps')
 
    bar.elements += ['transfer', 'time', 'eta', 'speed']
 
    for x in xrange(0, bar.steps, 1024):
 
    for x in range(0, bar.steps, 1024):
 
        bar.render(x)
 
        time.sleep(0.01)
 
        now = datetime.datetime.now()
 
        if now - bar.started >= datetime.timedelta(seconds=2):
 
            break
 
    print()
 
    print()
 

	
 

	
 
if __name__ == '__main__':
 
    main()
kallithea/model/gist.py
Show inline comments
 
@@ -5,97 +5,97 @@
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.gist
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
gist model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: May 9, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import os
 
import random
 
import shutil
 
import time
 
import traceback
 

	
 
from kallithea.lib import ext_json
 
from kallithea.lib.utils2 import AttributeDict, ascii_bytes, safe_int, safe_unicode, time_to_datetime
 
from kallithea.model.db import Gist, Session, User
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.scm import ScmModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
GIST_STORE_LOC = '.rc_gist_store'
 
GIST_METADATA_FILE = '.rc_gist_metadata'
 

	
 

	
 
def make_gist_access_id():
 
    """Generate a random, URL safe, almost certainly unique gist identifier."""
 
    rnd = random.SystemRandom() # use cryptographically secure system PRNG
 
    alphabet = '23456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghjklmnpqrstuvwxyz'
 
    length = 20
 
    return u''.join(rnd.choice(alphabet) for _ in xrange(length))
 
    return u''.join(rnd.choice(alphabet) for _ in range(length))
 

	
 

	
 
class GistModel(object):
 

	
 
    def __delete_gist(self, gist):
 
        """
 
        removes gist from filesystem
 

	
 
        :param gist: gist object
 
        """
 
        root_path = RepoModel().repos_path
 
        rm_path = os.path.join(root_path, GIST_STORE_LOC, gist.gist_access_id)
 
        log.info("Removing %s", rm_path)
 
        shutil.rmtree(rm_path)
 

	
 
    def _store_metadata(self, repo, gist_id, gist_access_id, user_id, gist_type,
 
                        gist_expires):
 
        """
 
        store metadata inside the gist, this can be later used for imports
 
        or gist identification
 
        """
 
        metadata = {
 
            'metadata_version': '1',
 
            'gist_db_id': gist_id,
 
            'gist_access_id': gist_access_id,
 
            'gist_owner_id': user_id,
 
            'gist_type': gist_type,
 
            'gist_expires': gist_expires,
 
            'gist_updated': time.time(),
 
        }
 
        with open(os.path.join(repo.path, '.hg', GIST_METADATA_FILE), 'wb') as f:
 
            f.write(ascii_bytes(ext_json.dumps(metadata)))
 

	
 
    def get_gist(self, gist):
 
        return Gist.guess_instance(gist)
 

	
 
    def get_gist_files(self, gist_access_id, revision=None):
 
        """
 
        Get files for given gist
 

	
 
        :param gist_access_id:
 
        """
 
        repo = Gist.get_by_access_id(gist_access_id)
 
        cs = repo.scm_instance.get_changeset(revision)
 
        return cs, [n for n in cs.get_node('/')]
 

	
 
    def create(self, description, owner, ip_addr, gist_mapping,
 
               gist_type=Gist.GIST_PUBLIC, lifetime=-1):
kallithea/tests/other/test_vcs_operations.py
Show inline comments
 
@@ -142,97 +142,97 @@ class Command(object):
 
    def execute(self, *args, **environ):
 
        """
 
        Runs command on the system with given ``args`` using simple space
 
        join without safe quoting.
 
        """
 
        command = ' '.join(args)
 
        ignoreReturnCode = environ.pop('ignoreReturnCode', False)
 
        if DEBUG:
 
            print('*** CMD %s ***' % command)
 
        testenv = dict(os.environ)
 
        testenv['LANG'] = 'en_US.UTF-8'
 
        testenv['LANGUAGE'] = 'en_US:en'
 
        testenv['HGPLAIN'] = ''
 
        testenv['HGRCPATH'] = ''
 
        testenv.update(environ)
 
        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd, env=testenv)
 
        stdout, stderr = p.communicate()
 
        if DEBUG:
 
            if stdout:
 
                print('stdout:', stdout)
 
            if stderr:
 
                print('stderr:', stderr)
 
        if not ignoreReturnCode:
 
            assert p.returncode == 0
 
        return stdout, stderr
 

	
 

	
 
def _get_tmp_dir(prefix='vcs_operations-', suffix=''):
 
    return tempfile.mkdtemp(dir=base.TESTS_TMP_PATH, prefix=prefix, suffix=suffix)
 

	
 

	
 
def _add_files(vcs, dest_dir, files_no=3):
 
    """
 
    Generate some files, add it to dest_dir repo and push back
 
    vcs is git or hg and defines what VCS we want to make those files for
 

	
 
    :param vcs:
 
    :param dest_dir:
 
    """
 
    added_file = '%ssetup.py' % next(_RandomNameSequence())
 
    open(os.path.join(dest_dir, added_file), 'a').close()
 
    Command(dest_dir).execute(vcs, 'add', added_file)
 

	
 
    email = 'me@example.com'
 
    if os.name == 'nt':
 
        author_str = 'User <%s>' % email
 
    else:
 
        author_str = 'User ǝɯɐᴎ <%s>' % email
 
    for i in xrange(files_no):
 
    for i in range(files_no):
 
        cmd = """echo "added_line%s" >> %s""" % (i, added_file)
 
        Command(dest_dir).execute(cmd)
 
        if vcs == 'hg':
 
            cmd = """hg commit -m "committed new %s" -u "%s" "%s" """ % (
 
                i, author_str, added_file
 
            )
 
        elif vcs == 'git':
 
            cmd = """git commit -m "committed new %s" --author "%s" "%s" """ % (
 
                i, author_str, added_file
 
            )
 
        # git commit needs EMAIL on some machines
 
        Command(dest_dir).execute(cmd, EMAIL=email)
 

	
 
def _add_files_and_push(webserver, vt, dest_dir, clone_url, ignoreReturnCode=False, files_no=3):
 
    _add_files(vt.repo_type, dest_dir, files_no=files_no)
 
    # PUSH it back
 
    stdout = stderr = None
 
    if vt.repo_type == 'hg':
 
        stdout, stderr = Command(dest_dir).execute('hg push -f --verbose', clone_url, ignoreReturnCode=ignoreReturnCode)
 
    elif vt.repo_type == 'git':
 
        stdout, stderr = Command(dest_dir).execute('git push -f --verbose', clone_url, "master", ignoreReturnCode=ignoreReturnCode)
 

	
 
    return stdout, stderr
 

	
 

	
 
def _check_outgoing(vcs, cwd, clone_url):
 
    if vcs == 'hg':
 
        # hg removes the password from default URLs, so we have to provide it here via the clone_url
 
        return Command(cwd).execute('hg -q outgoing', clone_url, ignoreReturnCode=True)
 
    elif vcs == 'git':
 
        Command(cwd).execute('git remote update')
 
        return Command(cwd).execute('git log origin/master..master')
 

	
 

	
 
def set_anonymous_access(enable=True):
 
    user = User.get_default_user()
 
    user.active = enable
 
    Session().commit()
 
    if enable != User.get_default_user().active:
 
        raise Exception('Cannot set anonymous access')
 

	
 

	
 
#==============================================================================
 
# TESTS
 
#==============================================================================
 

	
 

	
 
def _check_proper_git_push(stdout, stderr):
kallithea/tests/vcs/test_archives.py
Show inline comments
 
import datetime
 
import io
 
import os
 
import tarfile
 
import tempfile
 
import zipfile
 

	
 
import pytest
 

	
 
from kallithea.lib.vcs.exceptions import VCSError
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import TESTS_TMP_PATH
 

	
 

	
 
class ArchivesTestCaseMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
        for x in range(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('%d/file_%d.txt' % (x, x),
 
                        content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test_archive_zip(self):
 
        path = tempfile.mkstemp(dir=TESTS_TMP_PATH, prefix='test_archive_zip-')[1]
 
        with open(path, 'wb') as f:
 
            self.tip.fill_archive(stream=f, kind='zip', prefix='repo')
 
        out = zipfile.ZipFile(path)
 

	
 
        for x in xrange(5):
 
        for x in range(5):
 
            node_path = '%d/file_%d.txt' % (x, x)
 
            decompressed = out.read('repo/' + node_path)
 
            assert decompressed == self.tip.get_node(node_path).content
 

	
 
    def test_archive_tgz(self):
 
        path = tempfile.mkstemp(dir=TESTS_TMP_PATH, prefix='test_archive_tgz-')[1]
 
        with open(path, 'wb') as f:
 
            self.tip.fill_archive(stream=f, kind='tgz', prefix='repo')
 
        outdir = tempfile.mkdtemp(dir=TESTS_TMP_PATH, prefix='test_archive_tgz-', suffix='-outdir')
 

	
 
        outfile = tarfile.open(path, 'r|gz')
 
        outfile.extractall(outdir)
 

	
 
        for x in xrange(5):
 
        for x in range(5):
 
            node_path = '%d/file_%d.txt' % (x, x)
 
            assert open(os.path.join(outdir, 'repo/' + node_path), 'rb').read() == self.tip.get_node(node_path).content
 

	
 
    def test_archive_tbz2(self):
 
        path = tempfile.mkstemp(dir=TESTS_TMP_PATH, prefix='test_archive_tbz2-')[1]
 
        with open(path, 'w+b') as f:
 
            self.tip.fill_archive(stream=f, kind='tbz2', prefix='repo')
 
        outdir = tempfile.mkdtemp(dir=TESTS_TMP_PATH, prefix='test_archive_tbz2-', suffix='-outdir')
 

	
 
        outfile = tarfile.open(path, 'r|bz2')
 
        outfile.extractall(outdir)
 

	
 
        for x in xrange(5):
 
        for x in range(5):
 
            node_path = '%d/file_%d.txt' % (x, x)
 
            assert open(os.path.join(outdir, 'repo/' + node_path), 'rb').read() == self.tip.get_node(node_path).content
 

	
 
    def test_archive_default_stream(self):
 
        tmppath = tempfile.mkstemp(dir=TESTS_TMP_PATH, prefix='test_archive_default_stream-')[1]
 
        with open(tmppath, 'wb') as stream:
 
            self.tip.fill_archive(stream=stream)
 
        mystream = io.BytesIO()
 
        self.tip.fill_archive(stream=mystream)
 
        mystream.seek(0)
 
        with open(tmppath, 'rb') as f:
 
            file_content = f.read()
 
            stringio_content = mystream.read()
 
            # the gzip header contains a MTIME header
 
            # because is takes a little bit of time from one fill_archive call to the next
 
            # this part may differ so don't include that part in the comparison
 
            assert file_content[:4] == stringio_content[:4]
 
            assert file_content[8:] == stringio_content[8:]
 

	
 
    def test_archive_wrong_kind(self):
 
        with pytest.raises(VCSError):
 
            self.tip.fill_archive(kind='wrong kind')
 

	
 
    def test_archive_empty_prefix(self):
 
        with pytest.raises(VCSError):
 
            self.tip.fill_archive(prefix='')
 

	
 
    def test_archive_prefix_with_leading_slash(self):
 
        with pytest.raises(VCSError):
 
            self.tip.fill_archive(prefix='/any')
 

	
 

	
 
class TestGitArchive(ArchivesTestCaseMixin):
 
    backend_alias = 'git'
 

	
 

	
 
class TestHgArchive(ArchivesTestCaseMixin):
 
    backend_alias = 'hg'
kallithea/tests/vcs/test_changesets.py
Show inline comments
 
# encoding: utf-8
 

	
 
import datetime
 

	
 
import pytest
 

	
 
from kallithea.lib import vcs
 
from kallithea.lib.vcs.backends.base import BaseChangeset
 
from kallithea.lib.vcs.exceptions import BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError
 
from kallithea.lib.vcs.nodes import AddedFileNodesGenerator, ChangedFileNodesGenerator, FileNode, RemovedFileNodesGenerator
 
from kallithea.tests.vcs.base import _BackendTestMixin
 

	
 

	
 
class TestBaseChangeset(object):
 

	
 
    def test_as_dict(self):
 
        changeset = BaseChangeset()
 
        changeset.raw_id = 'RAW_ID'
 
        changeset.short_id = 'SHORT_ID'
 
        changeset.revision = 1009
 
        changeset.date = datetime.datetime(2011, 1, 30, 1, 45)
 
        changeset.message = 'Message of a commit'
 
        changeset.author = 'Joe Doe <joe.doe@example.com>'
 
        changeset.added = [FileNode('foo/bar/baz'), FileNode(u'foobar'), FileNode(u'blåbærgrød')]
 
        changeset.changed = []
 
        changeset.removed = []
 
        assert changeset.as_dict() == {
 
            'raw_id': 'RAW_ID',
 
            'short_id': 'SHORT_ID',
 
            'revision': 1009,
 
            'date': datetime.datetime(2011, 1, 30, 1, 45),
 
            'message': 'Message of a commit',
 
            'author': {
 
                'name': 'Joe Doe',
 
                'email': 'joe.doe@example.com',
 
            },
 
            'added': ['foo/bar/baz', 'foobar', u'bl\xe5b\xe6rgr\xf8d'],
 
            'changed': [],
 
            'removed': [],
 
        }
 

	
 

	
 
class _ChangesetsWithCommitsTestCaseixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
        for x in range(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test_new_branch(self):
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
        )
 
        assert 'foobar' in self.repo.branches
 
        assert foobar_tip.branch == 'foobar'
 
        assert foobar_tip.branches == ['foobar']
 
        # 'foobar' should be the only branch that contains the new commit
 
        branch_tips = list(self.repo.branches.values())
 
        assert branch_tips.count(str(foobar_tip.raw_id)) == 1
 

	
 
    def test_new_head_in_default_branch(self):
 
        tip = self.repo.get_changeset()
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
            parents=[tip],
 
        )
 
        self.imc.change(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\nand more...\n'))
 
        newtip = self.imc.commit(
 
            message=u'At default branch',
 
            author=u'joe',
 
            branch=foobar_tip.branch,
 
            parents=[foobar_tip],
 
        )
 

	
 
        newest_tip = self.imc.commit(
 
            message=u'Merged with %s' % foobar_tip.raw_id,
 
            author=u'joe',
 
            branch=self.backend_class.DEFAULT_BRANCH_NAME,
 
            parents=[newtip, foobar_tip],
 
@@ -98,192 +98,192 @@ class _ChangesetsWithCommitsTestCaseixin
 

	
 
        assert newest_tip.branch == self.backend_class.DEFAULT_BRANCH_NAME
 
        assert newest_tip.branches == [self.backend_class.DEFAULT_BRANCH_NAME]
 

	
 
    def test_get_changesets_respects_branch_name(self):
 
        tip = self.repo.get_changeset()
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        doc_changeset = self.imc.commit(
 
            message=u'New branch: docs',
 
            author=u'joe',
 
            branch='docs',
 
        )
 
        self.imc.add(vcs.nodes.FileNode('newfile', content=''))
 
        self.imc.commit(
 
            message=u'Back in default branch',
 
            author=u'joe',
 
            parents=[tip],
 
        )
 
        default_branch_changesets = self.repo.get_changesets(
 
            branch_name=self.repo.DEFAULT_BRANCH_NAME)
 
        assert doc_changeset not in default_branch_changesets
 

	
 
    def test_get_changeset_by_branch(self):
 
        for branch, sha in self.repo.branches.items():
 
            assert sha == self.repo.get_changeset(branch).raw_id
 

	
 
    def test_get_changeset_by_tag(self):
 
        for tag, sha in self.repo.tags.items():
 
            assert sha == self.repo.get_changeset(tag).raw_id
 

	
 
    def test_get_changeset_parents(self):
 
        for test_rev in [1, 2, 3]:
 
            sha = self.repo.get_changeset(test_rev-1)
 
            assert [sha] == self.repo.get_changeset(test_rev).parents
 

	
 
    def test_get_changeset_children(self):
 
        for test_rev in [1, 2, 3]:
 
            sha = self.repo.get_changeset(test_rev+1)
 
            assert [sha] == self.repo.get_changeset(test_rev).children
 

	
 

	
 
class _ChangesetsTestCaseMixin(_BackendTestMixin):
 
    recreate_repo_per_test = False
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
        for x in range(5):
 
            yield {
 
                'message': u'Commit %d' % x,
 
                'author': u'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test_simple(self):
 
        tip = self.repo.get_changeset()
 
        assert tip.date == datetime.datetime(2010, 1, 3, 20)
 

	
 
    def test_get_changesets_is_ordered_by_date(self):
 
        changesets = list(self.repo.get_changesets())
 
        ordered_by_date = sorted(changesets,
 
            key=lambda cs: cs.date)
 

	
 
        assert changesets == ordered_by_date
 

	
 
    def test_get_changesets_respects_start(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(start=second_id))
 
        assert len(changesets) == 4
 

	
 
    def test_get_changesets_numerical_id_respects_start(self):
 
        second_id = 1
 
        changesets = list(self.repo.get_changesets(start=second_id))
 
        assert len(changesets) == 4
 

	
 
    def test_get_changesets_includes_start_changeset(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(start=second_id))
 
        assert changesets[0].raw_id == second_id
 

	
 
    def test_get_changesets_respects_end(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(end=second_id))
 
        assert changesets[-1].raw_id == second_id
 
        assert len(changesets) == 2
 

	
 
    def test_get_changesets_numerical_id_respects_end(self):
 
        second_id = 1
 
        changesets = list(self.repo.get_changesets(end=second_id))
 
        assert changesets.index(changesets[-1]) == second_id
 
        assert len(changesets) == 2
 

	
 
    def test_get_changesets_respects_both_start_and_end(self):
 
        second_id = self.repo.revisions[1]
 
        third_id = self.repo.revisions[2]
 
        changesets = list(self.repo.get_changesets(start=second_id,
 
            end=third_id))
 
        assert len(changesets) == 2
 

	
 
    def test_get_changesets_numerical_id_respects_both_start_and_end(self):
 
        changesets = list(self.repo.get_changesets(start=2, end=3))
 
        assert len(changesets) == 2
 

	
 
    def test_get_changesets_on_empty_repo_raises_EmptyRepository_error(self):
 
        repo = self.setup_empty_repo(self.backend_class)
 
        with pytest.raises(EmptyRepositoryError):
 
            list(repo.get_changesets(start='foobar'))
 

	
 
    def test_get_changesets_includes_end_changeset(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(end=second_id))
 
        assert changesets[-1].raw_id == second_id
 

	
 
    def test_get_changesets_respects_start_date(self):
 
        start_date = datetime.datetime(2010, 2, 1)
 
        for cs in self.repo.get_changesets(start_date=start_date):
 
            assert cs.date >= start_date
 

	
 
    def test_get_changesets_respects_end_date(self):
 
        start_date = datetime.datetime(2010, 1, 1)
 
        end_date = datetime.datetime(2010, 2, 1)
 
        for cs in self.repo.get_changesets(start_date=start_date,
 
                                           end_date=end_date):
 
            assert cs.date >= start_date
 
            assert cs.date <= end_date
 

	
 
    def test_get_changesets_respects_start_date_and_end_date(self):
 
        end_date = datetime.datetime(2010, 2, 1)
 
        for cs in self.repo.get_changesets(end_date=end_date):
 
            assert cs.date <= end_date
 

	
 
    def test_get_changesets_respects_reverse(self):
 
        changesets_id_list = [cs.raw_id for cs in
 
            self.repo.get_changesets(reverse=True)]
 
        assert changesets_id_list == list(reversed(self.repo.revisions))
 

	
 
    def test_get_filenodes_generator(self):
 
        tip = self.repo.get_changeset()
 
        filepaths = [node.path for node in tip.get_filenodes_generator()]
 
        assert filepaths == ['file_%d.txt' % x for x in xrange(5)]
 
        assert filepaths == ['file_%d.txt' % x for x in range(5)]
 

	
 
    def test_size(self):
 
        tip = self.repo.get_changeset()
 
        size = 5 * len('Foobar N') # Size of 5 files
 
        assert tip.size == size
 

	
 
    def test_author(self):
 
        tip = self.repo.get_changeset()
 
        assert tip.author == u'Joe Doe <joe.doe@example.com>'
 

	
 
    def test_author_name(self):
 
        tip = self.repo.get_changeset()
 
        assert tip.author_name == u'Joe Doe'
 

	
 
    def test_author_email(self):
 
        tip = self.repo.get_changeset()
 
        assert tip.author_email == u'joe.doe@example.com'
 

	
 
    def test_get_changesets_raise_changesetdoesnotexist_for_wrong_start(self):
 
        with pytest.raises(ChangesetDoesNotExistError):
 
            list(self.repo.get_changesets(start='foobar'))
 

	
 
    def test_get_changesets_raise_changesetdoesnotexist_for_wrong_end(self):
 
        with pytest.raises(ChangesetDoesNotExistError):
 
            list(self.repo.get_changesets(end='foobar'))
 

	
 
    def test_get_changesets_raise_branchdoesnotexist_for_wrong_branch_name(self):
 
        with pytest.raises(BranchDoesNotExistError):
 
            list(self.repo.get_changesets(branch_name='foobar'))
 

	
 
    def test_get_changesets_raise_repositoryerror_for_wrong_start_end(self):
 
        start = self.repo.revisions[-1]
 
        end = self.repo.revisions[0]
 
        with pytest.raises(RepositoryError):
 
            list(self.repo.get_changesets(start=start, end=end))
 

	
 
    def test_get_changesets_numerical_id_reversed(self):
 
        with pytest.raises(RepositoryError):
 
            [x for x in self.repo.get_changesets(start=3, end=2)]
 

	
 
    def test_get_changesets_numerical_id_respects_both_start_and_end_last(self):
 
        with pytest.raises(RepositoryError):
 
            last = len(self.repo.revisions)
 
            list(self.repo.get_changesets(start=last-1, end=last-2))
 

	
 
    def test_get_changesets_numerical_id_last_zero_error(self):
 
        with pytest.raises(RepositoryError):
 
            last = len(self.repo.revisions)
kallithea/tests/vcs/test_getitem.py
Show inline comments
 
import datetime
 

	
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.tests.vcs.base import _BackendTestMixin
 

	
 

	
 
class GetitemTestCaseMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
        for x in range(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test__getitem__last_item_is_tip(self):
 
        assert self.repo[-1] == self.repo.get_changeset()
 

	
 
    def test__getitem__returns_correct_items(self):
 
        changesets = [self.repo[x] for x in xrange(len(self.repo.revisions))]
 
        changesets = [self.repo[x] for x in range(len(self.repo.revisions))]
 
        assert changesets == list(self.repo.get_changesets())
 

	
 

	
 
class TestGitGetitem(GetitemTestCaseMixin):
 
    backend_alias = 'git'
 

	
 

	
 
class TestHgGetitem(GetitemTestCaseMixin):
 
    backend_alias = 'hg'
kallithea/tests/vcs/test_getslice.py
Show inline comments
 
import datetime
 

	
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.tests.vcs.base import _BackendTestMixin
 

	
 

	
 
class GetsliceTestCaseMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
        for x in range(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test__getslice__last_item_is_tip(self):
 
        assert list(self.repo[-1:])[0] == self.repo.get_changeset()
 

	
 
    def test__getslice__respects_start_index(self):
 
        assert list(self.repo[2:]) == [self.repo.get_changeset(rev) for rev in self.repo.revisions[2:]]
 

	
 
    def test__getslice__respects_negative_start_index(self):
 
        assert list(self.repo[-2:]) == [self.repo.get_changeset(rev) for rev in self.repo.revisions[-2:]]
 

	
 
    def test__getslice__respects_end_index(self):
 
        assert list(self.repo[:2]) == [self.repo.get_changeset(rev) for rev in self.repo.revisions[:2]]
 

	
 
    def test__getslice__respects_negative_end_index(self):
 
        assert list(self.repo[:-2]) == [self.repo.get_changeset(rev) for rev in self.repo.revisions[:-2]]
 

	
 

	
 
class TestGitGetslice(GetsliceTestCaseMixin):
 
    backend_alias = 'git'
 

	
 

	
 
class TestHgGetslice(GetsliceTestCaseMixin):
 
    backend_alias = 'hg'
kallithea/tests/vcs/test_inmemchangesets.py
Show inline comments
 
@@ -255,79 +255,79 @@ class InMemoryChangesetTestMixin(_Backen
 
        self.imc.remove(self.nodes[0])
 
        with pytest.raises(NodeDoesNotExistError):
 
            self.imc.commit(
 
                message='Trying to remove node at empty repository',
 
                author=str(self)
 
            )
 

	
 
    def test_check_integrity_remove_raise_node_does_not_exist(self):
 
        self.test_add()  # Performs first commit
 

	
 
        node = FileNode('no-such-file')
 
        self.imc.remove(node)
 
        with pytest.raises(NodeDoesNotExistError):
 
            self.imc.commit(
 
                message=u'Trying to remove not existing node',
 
                author=unicode(self)
 
            )
 

	
 
    def test_remove_raise_node_already_removed(self):
 
        self.test_add() # Performs first commit
 

	
 
        node = FileNode(self.nodes[0].path)
 
        self.imc.remove(node)
 
        with pytest.raises(NodeAlreadyRemovedError):
 
            self.imc.remove(node)
 

	
 
    def test_remove_raise_node_already_changed(self):
 
        self.test_add()  # Performs first commit
 

	
 
        node = FileNode(self.nodes[0].path, content='Bending time')
 
        self.imc.change(node)
 
        with pytest.raises(NodeAlreadyChangedError):
 
            self.imc.remove(node)
 

	
 
    def test_reset(self):
 
        self.imc.add(FileNode('foo', content='bar'))
 
        #self.imc.change(FileNode('baz', content='new'))
 
        #self.imc.remove(FileNode('qwe'))
 
        self.imc.reset()
 
        assert not any((
 
            self.imc.added,
 
            self.imc.changed,
 
            self.imc.removed
 
        ))
 

	
 
    def test_multiple_commits(self):
 
        N = 3  # number of commits to perform
 
        last = None
 
        for x in xrange(N):
 
        for x in range(N):
 
            fname = 'file%s' % str(x).rjust(5, '0')
 
            content = 'foobar\n' * x
 
            node = FileNode(fname, content=content)
 
            self.imc.add(node)
 
            commit = self.imc.commit(u"Commit no. %s" % (x + 1), author=u'vcs')
 
            assert last != commit
 
            last = commit
 

	
 
        # Check commit number for same repo
 
        assert len(self.repo.revisions) == N
 

	
 
        # Check commit number for recreated repo
 
        assert len(self.repo.revisions) == N
 

	
 
    def test_date_attr(self):
 
        node = FileNode('foobar.txt', content='Foobared!')
 
        self.imc.add(node)
 
        date = datetime.datetime(1985, 1, 30, 1, 45)
 
        commit = self.imc.commit(u"Committed at time when I was born ;-)",
 
            author=u'lb <lb@example.com>', date=date)
 

	
 
        assert commit.date == date
 

	
 

	
 
class TestGitInMemoryChangeset(InMemoryChangesetTestMixin):
 
    backend_alias = 'git'
 

	
 

	
 
class TestHgInMemoryChangeset(InMemoryChangesetTestMixin):
 
    backend_alias = 'hg'
0 comments (0 inline, 0 general)