Changeset - 21f7b699d467
[Not reviewed]
kallithea/lib/colored_formatter.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 

	
 

	
 
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = xrange(30, 38)
 
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(30, 38)
 

	
 
# Sequences
 
RESET_SEQ = "\033[0m"
 
COLOR_SEQ = "\033[0;%dm"
 
BOLD_SEQ = "\033[1m"
 

	
 
COLORS = {
 
    'CRITICAL': MAGENTA,
 
    'ERROR': RED,
 
    'WARNING': CYAN,
 
    'INFO': GREEN,
 
    'DEBUG': BLUE,
 
    'SQL': YELLOW
 
}
 

	
 

	
 
def one_space_trim(s):
 
    if s.find("  ") == -1:
 
        return s
 
    else:
 
        s = s.replace('  ', ' ')
 
        return one_space_trim(s)
 

	
 

	
 
def format_sql(sql):
 
    sql = sql.replace('\n', '')
 
    sql = one_space_trim(sql)
 
    sql = sql \
 
        .replace(',', ',\n\t') \
 
        .replace('SELECT', '\n\tSELECT \n\t') \
 
        .replace('UPDATE', '\n\tUPDATE \n\t') \
 
        .replace('DELETE', '\n\tDELETE \n\t') \
 
        .replace('FROM', '\n\tFROM') \
 
        .replace('ORDER BY', '\n\tORDER BY') \
 
        .replace('LIMIT', '\n\tLIMIT') \
 
        .replace('WHERE', '\n\tWHERE') \
 
        .replace('AND', '\n\tAND') \
 
        .replace('LEFT', '\n\tLEFT') \
 
        .replace('INNER', '\n\tINNER') \
 
        .replace('INSERT', '\n\tINSERT') \
 
        .replace('DELETE', '\n\tDELETE')
 
    return sql
 

	
 

	
 
class ColorFormatter(logging.Formatter):
 

	
 
    def __init__(self, *args, **kwargs):
 
        # can't do super(...) here because Formatter is an old school class
 
        logging.Formatter.__init__(self, *args, **kwargs)
 

	
 
    def format(self, record):
 
        """
 
        Changes record's levelname to use with COLORS enum
 
        """
 

	
 
        levelname = record.levelname
 
        start = COLOR_SEQ % (COLORS[levelname])
 
        def_record = logging.Formatter.format(self, record)
 
        end = RESET_SEQ
 

	
 
        colored_record = ''.join([start, def_record, end])
 
        return colored_record
 

	
 

	
 
class ColorFormatterSql(logging.Formatter):
 

	
 
    def __init__(self, *args, **kwargs):
 
        # can't do super(...) here because Formatter is an old school class
 
        logging.Formatter.__init__(self, *args, **kwargs)
 

	
 
    def format(self, record):
 
        """
 
        Changes record's levelname to use with COLORS enum
 
        """
 

	
 
        start = COLOR_SEQ % (COLORS['SQL'])
 
        def_record = format_sql(logging.Formatter.format(self, record))
 
        end = RESET_SEQ
 

	
 
        colored_record = ''.join([start, def_record, end])
 
        return colored_record
kallithea/lib/helpers.py
Show inline comments
 
@@ -177,385 +177,385 @@ def select(name, selected_values, option
 
                    if isinstance(x, tuple) and len(x) == 2:
 
                        group_value, group_label = x
 
                    elif isinstance(x, basestring):
 
                        group_value = group_label = x
 
                    else:
 
                        log.error('invalid select option %r', x)
 
                        raise
 
                    og.add_option(group_label, group_value)
 
            else:
 
                options.add_option(label, value)
 
    return webhelpers2_select(name, selected_values, options, id=id, **attrs)
 

	
 

	
 
safeid = _make_safe_id_component
 

	
 

	
 
def FID(raw_id, path):
 
    """
 
    Creates a unique ID for filenode based on it's hash of path and revision
 
    it's safe to use in urls
 

	
 
    :param raw_id:
 
    :param path:
 
    """
 

	
 
    return 'C-%s-%s' % (short_id(raw_id), hashlib.md5(safe_bytes(path)).hexdigest()[:12])
 

	
 

	
 
class _FilesBreadCrumbs(object):
 

	
 
    def __call__(self, repo_name, rev, paths):
 
        if isinstance(paths, str):
 
            paths = safe_unicode(paths)
 
        url_l = [link_to(repo_name, url('files_home',
 
                                        repo_name=repo_name,
 
                                        revision=rev, f_path=''),
 
                         class_='ypjax-link')]
 
        paths_l = paths.split('/')
 
        for cnt, p in enumerate(paths_l):
 
            if p != '':
 
                url_l.append(link_to(p,
 
                                     url('files_home',
 
                                         repo_name=repo_name,
 
                                         revision=rev,
 
                                         f_path='/'.join(paths_l[:cnt + 1])
 
                                         ),
 
                                     class_='ypjax-link'
 
                                     )
 
                             )
 

	
 
        return literal('/'.join(url_l))
 

	
 

	
 
files_breadcrumbs = _FilesBreadCrumbs()
 

	
 

	
 
class CodeHtmlFormatter(HtmlFormatter):
 
    """
 
    My code Html Formatter for source codes
 
    """
 

	
 
    def wrap(self, source, outfile):
 
        return self._wrap_div(self._wrap_pre(self._wrap_code(source)))
 

	
 
    def _wrap_code(self, source):
 
        for cnt, it in enumerate(source):
 
            i, t = it
 
            t = '<span id="L%s">%s</span>' % (cnt + 1, t)
 
            yield i, t
 

	
 
    def _wrap_tablelinenos(self, inner):
 
        inner_lines = []
 
        lncount = 0
 
        for t, line in inner:
 
            if t:
 
                lncount += 1
 
            inner_lines.append(line)
 

	
 
        fl = self.linenostart
 
        mw = len(str(lncount + fl - 1))
 
        sp = self.linenospecial
 
        st = self.linenostep
 
        la = self.lineanchors
 
        aln = self.anchorlinenos
 
        nocls = self.noclasses
 
        if sp:
 
            lines = []
 

	
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if i % sp == 0:
 
                        if aln:
 
                            lines.append('<a href="#%s%d" class="special">%*d</a>' %
 
                                         (la, i, mw, i))
 
                        else:
 
                            lines.append('<span class="special">%*d</span>' % (mw, i))
 
                    else:
 
                        if aln:
 
                            lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
 
                        else:
 
                            lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 
        else:
 
            lines = []
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if aln:
 
                        lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
 
                    else:
 
                        lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 

	
 
        # in case you wonder about the seemingly redundant <div> here: since the
 
        # content in the other cell also is wrapped in a div, some browsers in
 
        # some configurations seem to mess up the formatting...
 
        if nocls:
 
            yield 0, ('<table class="%stable">' % self.cssclass +
 
                      '<tr><td><div class="linenodiv">'
 
                      '<pre>' + ls + '</pre></div></td>'
 
                      '<td id="hlcode" class="code">')
 
        else:
 
            yield 0, ('<table class="%stable">' % self.cssclass +
 
                      '<tr><td class="linenos"><div class="linenodiv">'
 
                      '<pre>' + ls + '</pre></div></td>'
 
                      '<td id="hlcode" class="code">')
 
        yield 0, ''.join(inner_lines)
 
        yield 0, '</td></tr></table>'
 

	
 

	
 
_whitespace_re = re.compile(r'(\t)|( )(?=\n|</div>)')
 

	
 

	
 
def _markup_whitespace(m):
 
    groups = m.groups()
 
    if groups[0]:
 
        return '<u>\t</u>'
 
    if groups[1]:
 
        return ' <i></i>'
 

	
 

	
 
def markup_whitespace(s):
 
    return _whitespace_re.sub(_markup_whitespace, s)
 

	
 

	
 
def pygmentize(filenode, **kwargs):
 
    """
 
    pygmentize function using pygments
 

	
 
    :param filenode:
 
    """
 
    lexer = get_custom_lexer(filenode.extension) or filenode.lexer
 
    return literal(markup_whitespace(
 
        code_highlight(safe_unicode(filenode.content), lexer, CodeHtmlFormatter(**kwargs))))
 

	
 

	
 
def hsv_to_rgb(h, s, v):
 
    if s == 0.0:
 
        return v, v, v
 
    i = int(h * 6.0)  # XXX assume int() truncates!
 
    f = (h * 6.0) - i
 
    p = v * (1.0 - s)
 
    q = v * (1.0 - s * f)
 
    t = v * (1.0 - s * (1.0 - f))
 
    i = i % 6
 
    if i == 0:
 
        return v, t, p
 
    if i == 1:
 
        return q, v, p
 
    if i == 2:
 
        return p, v, t
 
    if i == 3:
 
        return p, q, v
 
    if i == 4:
 
        return t, p, v
 
    if i == 5:
 
        return v, p, q
 

	
 

	
 
def gen_color(n=10000):
 
    """generator for getting n of evenly distributed colors using
 
    hsv color and golden ratio. It always return same order of colors
 

	
 
    :returns: RGB tuple
 
    """
 

	
 
    golden_ratio = 0.618033988749895
 
    h = 0.22717784590367374
 

	
 
    for _unused in xrange(n):
 
    for _unused in range(n):
 
        h += golden_ratio
 
        h %= 1
 
        HSV_tuple = [h, 0.95, 0.95]
 
        RGB_tuple = hsv_to_rgb(*HSV_tuple)
 
        yield [str(int(x * 256)) for x in RGB_tuple]
 

	
 

	
 
def pygmentize_annotation(repo_name, filenode, **kwargs):
 
    """
 
    pygmentize function for annotation
 

	
 
    :param filenode:
 
    """
 
    cgenerator = gen_color()
 
    color_dict = {}
 

	
 
    def get_color_string(cs):
 
        if cs in color_dict:
 
            col = color_dict[cs]
 
        else:
 
            col = color_dict[cs] = next(cgenerator)
 
        return "color: rgb(%s)! important;" % (', '.join(col))
 

	
 
    def url_func(changeset):
 
        author = escape(changeset.author)
 
        date = changeset.date
 
        message = escape(changeset.message)
 
        tooltip_html = ("<b>Author:</b> %s<br/>"
 
                        "<b>Date:</b> %s</b><br/>"
 
                        "<b>Message:</b> %s") % (author, date, message)
 

	
 
        lnk_format = show_id(changeset)
 
        uri = link_to(
 
                lnk_format,
 
                url('changeset_home', repo_name=repo_name,
 
                    revision=changeset.raw_id),
 
                style=get_color_string(changeset.raw_id),
 
                **{'data-toggle': 'popover',
 
                   'data-content': tooltip_html}
 
              )
 

	
 
        uri += '\n'
 
        return uri
 

	
 
    return literal(markup_whitespace(annotate_highlight(filenode, url_func, **kwargs)))
 

	
 

	
 
class _Message(object):
 
    """A message returned by ``pop_flash_messages()``.
 

	
 
    Converting the message to a string returns the message text. Instances
 
    also have the following attributes:
 

	
 
    * ``category``: the category specified when the message was created.
 
    * ``message``: the html-safe message text.
 
    """
 

	
 
    def __init__(self, category, message):
 
        self.category = category
 
        self.message = message
 

	
 

	
 
def _session_flash_messages(append=None, clear=False):
 
    """Manage a message queue in tg.session: return the current message queue
 
    after appending the given message, and possibly clearing the queue."""
 
    key = 'flash'
 
    from tg import session
 
    if key in session:
 
        flash_messages = session[key]
 
    else:
 
        if append is None:  # common fast path - also used for clearing empty queue
 
            return []  # don't bother saving
 
        flash_messages = []
 
        session[key] = flash_messages
 
    if append is not None and append not in flash_messages:
 
        flash_messages.append(append)
 
    if clear:
 
        session.pop(key, None)
 
    session.save()
 
    return flash_messages
 

	
 

	
 
def flash(message, category, logf=None):
 
    """
 
    Show a message to the user _and_ log it through the specified function
 

	
 
    category: notice (default), warning, error, success
 
    logf: a custom log function - such as log.debug
 

	
 
    logf defaults to log.info, unless category equals 'success', in which
 
    case logf defaults to log.debug.
 
    """
 
    assert category in ('error', 'success', 'warning'), category
 
    if hasattr(message, '__html__'):
 
        # render to HTML for storing in cookie
 
        safe_message = unicode(message)
 
    else:
 
        # Apply str - the message might be an exception with __str__
 
        # Escape, so we can trust the result without further escaping, without any risk of injection
 
        safe_message = html_escape(unicode(message))
 
    if logf is None:
 
        logf = log.info
 
        if category == 'success':
 
            logf = log.debug
 

	
 
    logf('Flash %s: %s', category, safe_message)
 

	
 
    _session_flash_messages(append=(category, safe_message))
 

	
 

	
 
def pop_flash_messages():
 
    """Return all accumulated messages and delete them from the session.
 

	
 
    The return value is a list of ``Message`` objects.
 
    """
 
    return [_Message(category, message) for category, message in _session_flash_messages(clear=True)]
 

	
 

	
 
age = lambda x, y=False: _age(x, y)
 
capitalize = lambda x: x.capitalize()
 
email = author_email
 
short_id = lambda x: x[:12]
 
hide_credentials = lambda x: ''.join(credentials_filter(x))
 

	
 

	
 
def show_id(cs):
 
    """
 
    Configurable function that shows ID
 
    by default it's r123:fffeeefffeee
 

	
 
    :param cs: changeset instance
 
    """
 
    from kallithea import CONFIG
 
    def_len = safe_int(CONFIG.get('show_sha_length', 12))
 
    show_rev = str2bool(CONFIG.get('show_revision_number', False))
 

	
 
    raw_id = cs.raw_id[:def_len]
 
    if show_rev:
 
        return 'r%s:%s' % (cs.revision, raw_id)
 
    else:
 
        return raw_id
 

	
 

	
 
def fmt_date(date):
 
    if date:
 
        return date.strftime("%Y-%m-%d %H:%M:%S")
 
    return ""
 

	
 

	
 
def is_git(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'git'
 

	
 

	
 
def is_hg(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'hg'
 

	
 

	
 
@cache_region('long_term', 'user_attr_or_none')
 
def user_attr_or_none(author, show_attr):
 
    """Try to match email part of VCS committer string with a local user and return show_attr
 
    - or return None if user not found"""
 
    email = author_email(author)
 
    if email:
 
        from kallithea.model.db import User
 
        user = User.get_by_email(email, cache=True) # cache will only use sql_cache_short
 
        if user is not None:
 
            return getattr(user, show_attr)
 
    return None
 

	
 

	
 
def email_or_none(author):
 
    """Try to match email part of VCS committer string with a local user.
 
    Return primary email of user, email part of the specified author name, or None."""
 
    if not author:
 
        return None
 
    email = user_attr_or_none(author, 'email')
 
    if email is not None:
 
        return email # always use user's main email address - not necessarily the one used to find user
 

	
 
    # extract email from the commit string
kallithea/lib/timerproxy.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 
import time
 

	
 
from sqlalchemy.interfaces import ConnectionProxy
 

	
 

	
 
log = logging.getLogger('timerproxy')
 

	
 
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = xrange(30, 38)
 
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(30, 38)
 

	
 

	
 
def color_sql(sql):
 
    COLOR_SEQ = "\033[1;%dm"
 
    COLOR_SQL = YELLOW
 
    normal = '\x1b[0m'
 
    return ''.join([COLOR_SEQ % COLOR_SQL, sql, normal])
 

	
 

	
 
class TimerProxy(ConnectionProxy):
 

	
 
    def __init__(self):
 
        super(TimerProxy, self).__init__()
 

	
 
    def cursor_execute(self, execute, cursor, statement, parameters,
 
                       context, executemany):
 

	
 
        now = time.time()
 
        try:
 
            log.info(color_sql(">>>>> STARTING QUERY >>>>>"))
 
            return execute(cursor, statement, parameters, context)
 
        finally:
 
            total = time.time() - now
 
            log.info(color_sql("<<<<< TOTAL TIME: %f <<<<<" % total))
kallithea/lib/vcs/utils/progressbar.py
Show inline comments
 
# encoding: UTF-8
 

	
 
from __future__ import print_function
 

	
 
import datetime
 
import string
 
import sys
 

	
 
from kallithea.lib.vcs.utils.filesize import filesizeformat
 

	
 

	
 
class ProgressBarError(Exception):
 
    pass
 

	
 

	
 
class AlreadyFinishedError(ProgressBarError):
 
    pass
 

	
 

	
 
class ProgressBar(object):
 

	
 
    default_elements = ['percentage', 'bar', 'steps']
 

	
 
    def __init__(self, steps=100, stream=None, elements=None):
 
        self.step = 0
 
        self.steps = steps
 
        self.stream = stream or sys.stderr
 
        self.bar_char = '='
 
        self.width = 50
 
        self.separator = ' | '
 
        self.elements = elements or self.default_elements
 
        self.started = None
 
        self.finished = False
 
        self.steps_label = 'Step'
 
        self.time_label = 'Time'
 
        self.eta_label = 'ETA'
 
        self.speed_label = 'Speed'
 
        self.transfer_label = 'Transfer'
 

	
 
    def __str__(self):
 
        return self.get_line()
 

	
 
    def __iter__(self):
 
        start = self.step
 
        end = self.steps + 1
 
        for x in xrange(start, end):
 
        for x in range(start, end):
 
            self.render(x)
 
            yield x
 

	
 
    def get_separator(self):
 
        return self.separator
 

	
 
    def get_bar_char(self):
 
        return self.bar_char
 

	
 
    def get_bar(self):
 
        char = self.get_bar_char()
 
        perc = self.get_percentage()
 
        length = int(self.width * perc / 100)
 
        bar = char * length
 
        bar = bar.ljust(self.width)
 
        return bar
 

	
 
    def get_elements(self):
 
        return self.elements
 

	
 
    def get_template(self):
 
        separator = self.get_separator()
 
        elements = self.get_elements()
 
        return string.Template(separator.join((('$%s' % e) for e in elements)))
 

	
 
    def get_total_time(self, current_time=None):
 
        if current_time is None:
 
            current_time = datetime.datetime.now()
 
        if not self.started:
 
            return datetime.timedelta()
 
        return current_time - self.started
 

	
 
    def get_rendered_total_time(self):
 
        delta = self.get_total_time()
 
        if not delta:
 
            ttime = '-'
 
        else:
 
            ttime = str(delta)
 
        return '%s %s' % (self.time_label, ttime)
 

	
 
    def get_eta(self, current_time=None):
 
        if current_time is None:
 
            current_time = datetime.datetime.now()
 
        if self.step == 0:
 
            return datetime.timedelta()
 
        total_seconds = self.get_total_time().total_seconds()
 
        eta_seconds = total_seconds * self.steps / self.step - total_seconds
 
        return datetime.timedelta(seconds=int(eta_seconds))
 

	
 
    def get_rendered_eta(self):
 
        eta = self.get_eta()
 
        if not eta:
 
            eta = '--:--:--'
 
        else:
 
            eta = str(eta).rjust(8)
 
        return '%s: %s' % (self.eta_label, eta)
 

	
 
    def get_percentage(self):
 
        return float(self.step) / self.steps * 100
 

	
 
    def get_rendered_percentage(self):
 
        perc = self.get_percentage()
 
        return ('%s%%' % (int(perc))).rjust(5)
 

	
 
    def get_rendered_steps(self):
 
        return '%s: %s/%s' % (self.steps_label, self.step, self.steps)
 

	
 
    def get_rendered_speed(self, step=None, total_seconds=None):
 
        if step is None:
 
            step = self.step
 
        if total_seconds is None:
 
            total_seconds = self.get_total_time().total_seconds()
 
        if step <= 0 or total_seconds <= 0:
 
            speed = '-'
 
        else:
 
            speed = filesizeformat(float(step) / total_seconds)
 
        return '%s: %s/s' % (self.speed_label, speed)
 

	
 
    def get_rendered_transfer(self, step=None, steps=None):
 
        if step is None:
 
            step = self.step
 
        if steps is None:
 
            steps = self.steps
 

	
 
        if steps <= 0:
 
            return '%s: -' % self.transfer_label
 
        total = filesizeformat(float(steps))
 
        if step <= 0:
 
            transferred = '-'
 
        else:
 
            transferred = filesizeformat(float(step))
 
        return '%s: %s / %s' % (self.transfer_label, transferred, total)
 

	
 
    def get_context(self):
 
        return {
 
            'percentage': self.get_rendered_percentage(),
 
            'bar': self.get_bar(),
 
            'steps': self.get_rendered_steps(),
 
            'time': self.get_rendered_total_time(),
 
            'eta': self.get_rendered_eta(),
 
            'speed': self.get_rendered_speed(),
 
            'transfer': self.get_rendered_transfer(),
 
        }
 

	
 
    def get_line(self):
 
        template = self.get_template()
 
        context = self.get_context()
 
        return template.safe_substitute(**context)
 

	
 
    def write(self, data):
 
        self.stream.write(data)
 

	
 
    def render(self, step):
 
        if not self.started:
 
            self.started = datetime.datetime.now()
 
        if self.finished:
 
            raise AlreadyFinishedError
 
        self.step = step
 
        self.write('\r%s' % self)
 
        if step == self.steps:
 
            self.finished = True
 
        if step == self.steps:
 
            self.write('\n')
 

	
 

	
 
"""
 
termcolors.py
 

	
 
Grabbed from Django (http://www.djangoproject.com)
 
"""
 

	
 
color_names = ('black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white')
 
foreground = dict([(color_names[x], '3%s' % x) for x in range(8)])
 
background = dict([(color_names[x], '4%s' % x) for x in range(8)])
 

	
 
RESET = '0'
 
opt_dict = {'bold': '1', 'underscore': '4', 'blink': '5', 'reverse': '7', 'conceal': '8'}
 

	
 

	
 
def colorize(text='', opts=(), **kwargs):
 
    """
 
    Returns your text, enclosed in ANSI graphics codes.
 

	
 
    Depends on the keyword arguments 'fg' and 'bg', and the contents of
 
    the opts tuple/list.
 

	
 
    Returns the RESET code if no parameters are given.
 

	
 
    Valid colors:
 
        'black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white'
 

	
 
    Valid options:
 
        'bold'
 
        'underscore'
 
        'blink'
 
        'reverse'
 
        'conceal'
 
        'noreset' - string will not be auto-terminated with the RESET code
 

	
 
    Examples:
 
        colorize('hello', fg='red', bg='blue', opts=('blink',))
 
        colorize()
 
        colorize('goodbye', opts=('underscore',))
 
        print colorize('first line', fg='red', opts=('noreset',))
 
        print 'this should be red too'
 
        print colorize('and so should this')
 
        print 'this should not be red'
 
    """
 
    code_list = []
 
    if text == '' and len(opts) == 1 and opts[0] == 'reset':
 
        return '\x1b[%sm' % RESET
 
    for k, v in kwargs.items():
 
        if k == 'fg':
 
            code_list.append(foreground[v])
 
        elif k == 'bg':
 
            code_list.append(background[v])
 
    for o in opts:
 
        if o in opt_dict:
 
            code_list.append(opt_dict[o])
 
    if 'noreset' not in opts:
 
        text = text + '\x1b[%sm' % RESET
 
    return ('\x1b[%sm' % ';'.join(code_list)) + text
 

	
 

	
 
def make_style(opts=(), **kwargs):
 
    """
 
    Returns a function with default parameters for colorize()
 

	
 
    Example:
 
        bold_red = make_style(opts=('bold',), fg='red')
 
        print bold_red('hello')
 
        KEYWORD = make_style(fg='yellow')
 
        COMMENT = make_style(fg='blue', opts=('bold',))
 
    """
 
    return lambda text: colorize(text, opts, **kwargs)
 

	
 

	
 
NOCOLOR_PALETTE = 'nocolor'
 
DARK_PALETTE = 'dark'
 
LIGHT_PALETTE = 'light'
 

	
 
PALETTES = {
 
    NOCOLOR_PALETTE: {
 
        'ERROR':        {},
 
        'NOTICE':       {},
 
        'SQL_FIELD':    {},
 
        'SQL_COLTYPE':  {},
 
        'SQL_KEYWORD':  {},
 
        'SQL_TABLE':    {},
 
        'HTTP_INFO':         {},
 
        'HTTP_SUCCESS':      {},
 
        'HTTP_REDIRECT':     {},
 
        'HTTP_NOT_MODIFIED': {},
 
        'HTTP_BAD_REQUEST':  {},
 
        'HTTP_NOT_FOUND':    {},
 
        'HTTP_SERVER_ERROR': {},
 
    },
 
    DARK_PALETTE: {
 
        'ERROR':        { 'fg': 'red', 'opts': ('bold',) },
 
        'NOTICE':       { 'fg': 'red' },
 
        'SQL_FIELD':    { 'fg': 'green', 'opts': ('bold',) },
 
        'SQL_COLTYPE':  { 'fg': 'green' },
 
        'SQL_KEYWORD':  { 'fg': 'yellow' },
 
        'SQL_TABLE':    { 'opts': ('bold',) },
 
        'HTTP_INFO':         { 'opts': ('bold',) },
 
        'HTTP_SUCCESS':      { },
 
        'HTTP_REDIRECT':     { 'fg': 'green' },
 
        'HTTP_NOT_MODIFIED': { 'fg': 'cyan' },
 
        'HTTP_BAD_REQUEST':  { 'fg': 'red', 'opts': ('bold',) },
 
        'HTTP_NOT_FOUND':    { 'fg': 'yellow' },
 
        'HTTP_SERVER_ERROR': { 'fg': 'magenta', 'opts': ('bold',) },
 
    },
 
    LIGHT_PALETTE: {
 
        'ERROR':        { 'fg': 'red', 'opts': ('bold',) },
 
        'NOTICE':       { 'fg': 'red' },
 
        'SQL_FIELD':    { 'fg': 'green', 'opts': ('bold',) },
 
        'SQL_COLTYPE':  { 'fg': 'green' },
 
        'SQL_KEYWORD':  { 'fg': 'blue' },
 
        'SQL_TABLE':    { 'opts': ('bold',) },
 
        'HTTP_INFO':         { 'opts': ('bold',) },
 
        'HTTP_SUCCESS':      { },
 
        'HTTP_REDIRECT':     { 'fg': 'green', 'opts': ('bold',) },
 
        'HTTP_NOT_MODIFIED': { 'fg': 'green' },
 
        'HTTP_BAD_REQUEST':  { 'fg': 'red', 'opts': ('bold',) },
 
        'HTTP_NOT_FOUND':    { 'fg': 'red' },
 
        'HTTP_SERVER_ERROR': { 'fg': 'magenta', 'opts': ('bold',) },
 
    }
 
}
 
DEFAULT_PALETTE = DARK_PALETTE
 

	
 
# ---------------------------- #
 
# --- End of termcolors.py --- #
 
# ---------------------------- #
 

	
 

	
 
class ColoredProgressBar(ProgressBar):
 

	
 
    BAR_COLORS = (
 
        (10, 'red'),
 
        (30, 'magenta'),
 
        (50, 'yellow'),
 
        (99, 'green'),
 
        (100, 'blue'),
 
    )
 

	
 
    def get_line(self):
 
        line = super(ColoredProgressBar, self).get_line()
 
        perc = self.get_percentage()
 
        if perc > 100:
 
            color = 'blue'
 
        for max_perc, color in self.BAR_COLORS:
 
            if perc <= max_perc:
 
                break
 
        return colorize(line, fg=color)
 

	
 

	
 
class AnimatedProgressBar(ProgressBar):
 

	
 
    def get_bar_char(self):
 
        chars = '-/|\\'
 
        if self.step >= self.steps:
 
            return '='
 
        return chars[self.step % len(chars)]
 

	
 

	
 
class BarOnlyProgressBar(ProgressBar):
 

	
 
    default_elements = ['bar', 'steps']
 

	
 
    def get_bar(self):
 
        bar = super(BarOnlyProgressBar, self).get_bar()
 
        perc = self.get_percentage()
 
        perc_text = '%s%%' % int(perc)
 
        text = (' %s%% ' % (perc_text)).center(self.width, '=')
 
        L = text.find(' ')
 
        R = text.rfind(' ')
 
        bar = ' '.join((bar[:L], perc_text, bar[R:]))
 
        return bar
 

	
 

	
 
class AnimatedColoredProgressBar(AnimatedProgressBar,
 
                                 ColoredProgressBar):
 
    pass
 

	
 

	
 
class BarOnlyColoredProgressBar(ColoredProgressBar,
 
                                BarOnlyProgressBar):
 
    pass
 

	
 

	
 
def main():
 
    import time
 

	
 
    print("Standard progress bar...")
 
    bar = ProgressBar(30)
 
    for x in xrange(1, 31):
 
    for x in range(1, 31):
 
        bar.render(x)
 
        time.sleep(0.02)
 
    bar.stream.write('\n')
 
    print()
 

	
 
    print("Empty bar...")
 
    bar = ProgressBar(50)
 
    bar.render(0)
 
    print()
 
    print()
 

	
 
    print("Colored bar...")
 
    bar = ColoredProgressBar(20)
 
    for x in bar:
 
        time.sleep(0.01)
 
    print()
 

	
 
    print("Animated char bar...")
 
    bar = AnimatedProgressBar(20)
 
    for x in bar:
 
        time.sleep(0.01)
 
    print()
 

	
 
    print("Animated + colored char bar...")
 
    bar = AnimatedColoredProgressBar(20)
 
    for x in bar:
 
        time.sleep(0.01)
 
    print()
 

	
 
    print("Bar only ...")
 
    bar = BarOnlyProgressBar(20)
 
    for x in bar:
 
        time.sleep(0.01)
 
    print()
 

	
 
    print("Colored, longer bar-only, eta, total time ...")
 
    bar = BarOnlyColoredProgressBar(40)
 
    bar.width = 60
 
    bar.elements += ['time', 'eta']
 
    for x in bar:
 
        time.sleep(0.01)
 
    print()
 
    print()
 

	
 
    print("File transfer bar, breaks after 2 seconds ...")
 
    total_bytes = 1024 * 1024 * 2
 
    bar = ProgressBar(total_bytes)
 
    bar.width = 50
 
    bar.elements.remove('steps')
 
    bar.elements += ['transfer', 'time', 'eta', 'speed']
 
    for x in xrange(0, bar.steps, 1024):
 
    for x in range(0, bar.steps, 1024):
 
        bar.render(x)
 
        time.sleep(0.01)
 
        now = datetime.datetime.now()
 
        if now - bar.started >= datetime.timedelta(seconds=2):
 
            break
 
    print()
 
    print()
 

	
 

	
 
if __name__ == '__main__':
 
    main()
kallithea/model/gist.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.gist
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
gist model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: May 9, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import os
 
import random
 
import shutil
 
import time
 
import traceback
 

	
 
from kallithea.lib import ext_json
 
from kallithea.lib.utils2 import AttributeDict, ascii_bytes, safe_int, safe_unicode, time_to_datetime
 
from kallithea.model.db import Gist, Session, User
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.scm import ScmModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
GIST_STORE_LOC = '.rc_gist_store'
 
GIST_METADATA_FILE = '.rc_gist_metadata'
 

	
 

	
 
def make_gist_access_id():
 
    """Generate a random, URL safe, almost certainly unique gist identifier."""
 
    rnd = random.SystemRandom() # use cryptographically secure system PRNG
 
    alphabet = '23456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghjklmnpqrstuvwxyz'
 
    length = 20
 
    return u''.join(rnd.choice(alphabet) for _ in xrange(length))
 
    return u''.join(rnd.choice(alphabet) for _ in range(length))
 

	
 

	
 
class GistModel(object):
 

	
 
    def __delete_gist(self, gist):
 
        """
 
        removes gist from filesystem
 

	
 
        :param gist: gist object
 
        """
 
        root_path = RepoModel().repos_path
 
        rm_path = os.path.join(root_path, GIST_STORE_LOC, gist.gist_access_id)
 
        log.info("Removing %s", rm_path)
 
        shutil.rmtree(rm_path)
 

	
 
    def _store_metadata(self, repo, gist_id, gist_access_id, user_id, gist_type,
 
                        gist_expires):
 
        """
 
        store metadata inside the gist, this can be later used for imports
 
        or gist identification
 
        """
 
        metadata = {
 
            'metadata_version': '1',
 
            'gist_db_id': gist_id,
 
            'gist_access_id': gist_access_id,
 
            'gist_owner_id': user_id,
 
            'gist_type': gist_type,
 
            'gist_expires': gist_expires,
 
            'gist_updated': time.time(),
 
        }
 
        with open(os.path.join(repo.path, '.hg', GIST_METADATA_FILE), 'wb') as f:
 
            f.write(ascii_bytes(ext_json.dumps(metadata)))
 

	
 
    def get_gist(self, gist):
 
        return Gist.guess_instance(gist)
 

	
 
    def get_gist_files(self, gist_access_id, revision=None):
 
        """
 
        Get files for given gist
 

	
 
        :param gist_access_id:
 
        """
 
        repo = Gist.get_by_access_id(gist_access_id)
 
        cs = repo.scm_instance.get_changeset(revision)
 
        return cs, [n for n in cs.get_node('/')]
 

	
 
    def create(self, description, owner, ip_addr, gist_mapping,
 
               gist_type=Gist.GIST_PUBLIC, lifetime=-1):
 
        """
 

	
 
        :param description: description of the gist
 
        :param owner: user who created this gist
 
        :param gist_mapping: mapping {filename:{'content':content},...}
 
        :param gist_type: type of gist private/public
 
        :param lifetime: in minutes, -1 == forever
 
        """
 
        owner = User.guess_instance(owner)
 
        gist_access_id = make_gist_access_id()
 
        lifetime = safe_int(lifetime, -1)
 
        gist_expires = time.time() + (lifetime * 60) if lifetime != -1 else -1
 
        log.debug('set GIST expiration date to: %s',
 
                  time_to_datetime(gist_expires)
 
                   if gist_expires != -1 else 'forever')
 
        # create the Database version
 
        gist = Gist()
 
        gist.gist_description = description
 
        gist.gist_access_id = gist_access_id
 
        gist.owner_id = owner.user_id
 
        gist.gist_expires = gist_expires
 
        gist.gist_type = safe_unicode(gist_type)
 
        Session().add(gist)
 
        Session().flush() # make database assign gist.gist_id
 
        if gist_type == Gist.GIST_PUBLIC:
 
            # use DB ID for easy to use GIST ID
 
            gist.gist_access_id = unicode(gist.gist_id)
 

	
 
        log.debug('Creating new %s GIST repo %s', gist_type, gist.gist_access_id)
 
        repo = RepoModel()._create_filesystem_repo(
 
            repo_name=gist.gist_access_id, repo_type='hg', repo_group=GIST_STORE_LOC)
 

	
 
        processed_mapping = {}
 
        for filename in gist_mapping:
 
            if filename != os.path.basename(filename):
 
                raise Exception('Filename cannot be inside a directory')
 

	
 
            content = gist_mapping[filename]['content']
 
            # TODO: expand support for setting explicit lexers
 
#             if lexer is None:
 
#                 try:
 
#                     guess_lexer = pygments.lexers.guess_lexer_for_filename
 
#                     lexer = guess_lexer(filename,content)
 
#                 except pygments.util.ClassNotFound:
 
#                     lexer = 'text'
 
            processed_mapping[filename] = {'content': content}
 

	
 
        # now create single multifile commit
 
        message = 'added file'
 
        message += 's: ' if len(processed_mapping) > 1 else ': '
 
        message += ', '.join([x for x in processed_mapping])
 

	
 
        # fake Kallithea Repository object
 
        fake_repo = AttributeDict(dict(
 
            repo_name=os.path.join(GIST_STORE_LOC, gist.gist_access_id),
 
            scm_instance_no_cache=lambda: repo,
 
        ))
 
        ScmModel().create_nodes(
 
            user=owner.user_id,
 
            ip_addr=ip_addr,
 
            repo=fake_repo,
 
            message=message,
 
            nodes=processed_mapping,
 
            trigger_push_hook=False
 
        )
 

	
 
        self._store_metadata(repo, gist.gist_id, gist.gist_access_id,
 
                             owner.user_id, gist.gist_type, gist.gist_expires)
 
        return gist
 

	
 
    def delete(self, gist, fs_remove=True):
 
        gist = Gist.guess_instance(gist)
 
        try:
 
            Session().delete(gist)
 
            if fs_remove:
 
                self.__delete_gist(gist)
 
            else:
 
                log.debug('skipping removal from filesystem')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def update(self, gist, description, owner, ip_addr, gist_mapping, gist_type,
 
               lifetime):
 
        gist = Gist.guess_instance(gist)
 
        gist_repo = gist.scm_instance
 

	
 
        lifetime = safe_int(lifetime, -1)
 
        if lifetime == 0:  # preserve old value
 
            gist_expires = gist.gist_expires
 
        else:
 
            gist_expires = time.time() + (lifetime * 60) if lifetime != -1 else -1
 

	
 
        # calculate operation type based on given data
 
        gist_mapping_op = {}
 
        for k, v in gist_mapping.items():
 
            # add, mod, del
 
            if not v['org_filename'] and v['filename']:
 
                op = 'add'
 
            elif v['org_filename'] and not v['filename']:
 
                op = 'del'
 
            else:
 
                op = 'mod'
 

	
 
            v['op'] = op
 
            gist_mapping_op[k] = v
 

	
 
        gist.gist_description = description
 
        gist.gist_expires = gist_expires
 
        gist.owner = owner
 
        gist.gist_type = gist_type
 

	
 
        message = 'updated file'
 
        message += 's: ' if len(gist_mapping) > 1 else ': '
 
        message += ', '.join([x for x in gist_mapping])
 

	
 
        # fake Kallithea Repository object
 
        fake_repo = AttributeDict(dict(
 
            repo_name=os.path.join(GIST_STORE_LOC, gist.gist_access_id),
 
            scm_instance_no_cache=lambda: gist_repo,
 
        ))
 

	
 
        self._store_metadata(gist_repo, gist.gist_id, gist.gist_access_id,
 
                             owner.user_id, gist.gist_type, gist.gist_expires)
 

	
 
        ScmModel().update_nodes(
 
            user=owner.user_id,
 
            ip_addr=ip_addr,
 
            repo=fake_repo,
 
            message=message,
 
            nodes=gist_mapping_op,
 
            trigger_push_hook=False
 
        )
 

	
 
        return gist
kallithea/tests/other/test_vcs_operations.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Test suite for vcs push/pull operations.
 

	
 
The tests need Git > 1.8.1.
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Dec 30, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
from __future__ import print_function
 

	
 
import json
 
import os
 
import re
 
import tempfile
 
import time
 
import urllib2
 
from subprocess import PIPE, Popen
 
from tempfile import _RandomNameSequence
 

	
 
import pytest
 

	
 
from kallithea import CONFIG
 
from kallithea.lib.utils2 import ascii_bytes
 
from kallithea.model.db import CacheInvalidation, Repository, Ui, User, UserIpMap, UserLog
 
from kallithea.model.meta import Session
 
from kallithea.model.ssh_key import SshKeyModel
 
from kallithea.model.user import UserModel
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
DEBUG = True
 
HOST = '127.0.0.1:4999'  # test host
 

	
 
fixture = Fixture()
 

	
 

	
 
# Parameterize different kinds of VCS testing - both the kind of VCS and the
 
# access method (HTTP/SSH)
 

	
 
# Mixin for using HTTP and SSH URLs
 
class HttpVcsTest(object):
 
    @staticmethod
 
    def repo_url_param(webserver, repo_name, **kwargs):
 
        return webserver.repo_url(repo_name, **kwargs)
 

	
 
class SshVcsTest(object):
 
    public_keys = {
 
        base.TEST_USER_REGULAR_LOGIN: u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUQ== kallithea@localhost',
 
        base.TEST_USER_ADMIN_LOGIN: u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQC6Ycnc2oUZHQnQwuqgZqTTdMDZD7ataf3JM7oG2Fw8JR6cdmz4QZLe5mfDwaFwG2pWHLRpVqzfrD/Pn3rIO++bgCJH5ydczrl1WScfryV1hYMJ/4EzLGM657J1/q5EI+b9SntKjf4ax+KP322L0TNQGbZUHLbfG2MwHMrYBQpHUq== kallithea@localhost',
 
    }
 

	
 
    @classmethod
 
    def repo_url_param(cls, webserver, repo_name, username=base.TEST_USER_ADMIN_LOGIN, password=base.TEST_USER_ADMIN_PASS, client_ip=base.IP_ADDR):
 
        user = User.get_by_username(username)
 
        if user.ssh_keys:
 
            ssh_key = user.ssh_keys[0]
 
        else:
 
            sshkeymodel = SshKeyModel()
 
            ssh_key = sshkeymodel.create(user, u'test key', cls.public_keys[user.username])
 
            Session().commit()
 

	
 
        return cls._ssh_param(repo_name, user, ssh_key, client_ip)
 

	
 
# Mixins for using Mercurial and Git
 
class HgVcsTest(object):
 
    repo_type = 'hg'
 
    repo_name = base.HG_REPO
 

	
 
class GitVcsTest(object):
 
    repo_type = 'git'
 
    repo_name = base.GIT_REPO
 

	
 
# Combine mixins to give the combinations we want to parameterize tests with
 
class HgHttpVcsTest(HgVcsTest, HttpVcsTest):
 
    pass
 

	
 
class GitHttpVcsTest(GitVcsTest, HttpVcsTest):
 
    pass
 

	
 
class HgSshVcsTest(HgVcsTest, SshVcsTest):
 
    @staticmethod
 
    def _ssh_param(repo_name, user, ssh_key, client_ip):
 
        # Specify a custom ssh command on the command line
 
        return r"""--config ui.ssh="bash -c 'SSH_ORIGINAL_COMMAND=\"\$2\" SSH_CONNECTION=\"%s 1024 127.0.0.1 22\" kallithea-cli ssh-serve -c %s %s %s' --" ssh://someuser@somehost/%s""" % (
 
            client_ip,
 
            CONFIG['__file__'],
 
            user.user_id,
 
            ssh_key.user_ssh_key_id,
 
            repo_name)
 

	
 
class GitSshVcsTest(GitVcsTest, SshVcsTest):
 
    @staticmethod
 
    def _ssh_param(repo_name, user, ssh_key, client_ip):
 
        # Set a custom ssh command in the global environment
 
        os.environ['GIT_SSH_COMMAND'] = r"""bash -c 'SSH_ORIGINAL_COMMAND="$2" SSH_CONNECTION="%s 1024 127.0.0.1 22" kallithea-cli ssh-serve -c %s %s %s' --""" % (
 
            client_ip,
 
            CONFIG['__file__'],
 
            user.user_id,
 
            ssh_key.user_ssh_key_id)
 
        return "ssh://someuser@somehost/%s""" % repo_name
 

	
 
parametrize_vcs_test = base.parametrize('vt', [
 
    HgHttpVcsTest,
 
    GitHttpVcsTest,
 
    HgSshVcsTest,
 
    GitSshVcsTest,
 
])
 
parametrize_vcs_test_hg = base.parametrize('vt', [
 
    HgHttpVcsTest,
 
    HgSshVcsTest,
 
])
 
parametrize_vcs_test_http = base.parametrize('vt', [
 
    HgHttpVcsTest,
 
    GitHttpVcsTest,
 
])
 

	
 
class Command(object):
 

	
 
    def __init__(self, cwd):
 
        self.cwd = cwd
 

	
 
    def execute(self, *args, **environ):
 
        """
 
        Runs command on the system with given ``args`` using simple space
 
        join without safe quoting.
 
        """
 
        command = ' '.join(args)
 
        ignoreReturnCode = environ.pop('ignoreReturnCode', False)
 
        if DEBUG:
 
            print('*** CMD %s ***' % command)
 
        testenv = dict(os.environ)
 
        testenv['LANG'] = 'en_US.UTF-8'
 
        testenv['LANGUAGE'] = 'en_US:en'
 
        testenv['HGPLAIN'] = ''
 
        testenv['HGRCPATH'] = ''
 
        testenv.update(environ)
 
        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd, env=testenv)
 
        stdout, stderr = p.communicate()
 
        if DEBUG:
 
            if stdout:
 
                print('stdout:', stdout)
 
            if stderr:
 
                print('stderr:', stderr)
 
        if not ignoreReturnCode:
 
            assert p.returncode == 0
 
        return stdout, stderr
 

	
 

	
 
def _get_tmp_dir(prefix='vcs_operations-', suffix=''):
 
    return tempfile.mkdtemp(dir=base.TESTS_TMP_PATH, prefix=prefix, suffix=suffix)
 

	
 

	
 
def _add_files(vcs, dest_dir, files_no=3):
 
    """
 
    Generate some files, add it to dest_dir repo and push back
 
    vcs is git or hg and defines what VCS we want to make those files for
 

	
 
    :param vcs:
 
    :param dest_dir:
 
    """
 
    added_file = '%ssetup.py' % next(_RandomNameSequence())
 
    open(os.path.join(dest_dir, added_file), 'a').close()
 
    Command(dest_dir).execute(vcs, 'add', added_file)
 

	
 
    email = 'me@example.com'
 
    if os.name == 'nt':
 
        author_str = 'User <%s>' % email
 
    else:
 
        author_str = 'User ǝɯɐᴎ <%s>' % email
 
    for i in xrange(files_no):
 
    for i in range(files_no):
 
        cmd = """echo "added_line%s" >> %s""" % (i, added_file)
 
        Command(dest_dir).execute(cmd)
 
        if vcs == 'hg':
 
            cmd = """hg commit -m "committed new %s" -u "%s" "%s" """ % (
 
                i, author_str, added_file
 
            )
 
        elif vcs == 'git':
 
            cmd = """git commit -m "committed new %s" --author "%s" "%s" """ % (
 
                i, author_str, added_file
 
            )
 
        # git commit needs EMAIL on some machines
 
        Command(dest_dir).execute(cmd, EMAIL=email)
 

	
 
def _add_files_and_push(webserver, vt, dest_dir, clone_url, ignoreReturnCode=False, files_no=3):
 
    _add_files(vt.repo_type, dest_dir, files_no=files_no)
 
    # PUSH it back
 
    stdout = stderr = None
 
    if vt.repo_type == 'hg':
 
        stdout, stderr = Command(dest_dir).execute('hg push -f --verbose', clone_url, ignoreReturnCode=ignoreReturnCode)
 
    elif vt.repo_type == 'git':
 
        stdout, stderr = Command(dest_dir).execute('git push -f --verbose', clone_url, "master", ignoreReturnCode=ignoreReturnCode)
 

	
 
    return stdout, stderr
 

	
 

	
 
def _check_outgoing(vcs, cwd, clone_url):
 
    if vcs == 'hg':
 
        # hg removes the password from default URLs, so we have to provide it here via the clone_url
 
        return Command(cwd).execute('hg -q outgoing', clone_url, ignoreReturnCode=True)
 
    elif vcs == 'git':
 
        Command(cwd).execute('git remote update')
 
        return Command(cwd).execute('git log origin/master..master')
 

	
 

	
 
def set_anonymous_access(enable=True):
 
    user = User.get_default_user()
 
    user.active = enable
 
    Session().commit()
 
    if enable != User.get_default_user().active:
 
        raise Exception('Cannot set anonymous access')
 

	
 

	
 
#==============================================================================
 
# TESTS
 
#==============================================================================
 

	
 

	
 
def _check_proper_git_push(stdout, stderr):
 
    assert 'fatal' not in stderr
 
    assert 'rejected' not in stderr
 
    assert 'Pushing to' in stderr
 
    assert 'master -> master' in stderr
 

	
 

	
 
@pytest.mark.usefixtures("test_context_fixture")
 
class TestVCSOperations(base.TestController):
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        # DISABLE ANONYMOUS ACCESS
 
        set_anonymous_access(False)
 

	
 
    @pytest.fixture()
 
    def testhook_cleanup(self):
 
        yield
 
        # remove hook
 
        for hook in ['prechangegroup', 'pretxnchangegroup', 'preoutgoing', 'changegroup', 'outgoing', 'incoming']:
 
            entry = Ui.get_by_key('hooks', '%s.testhook' % hook)
 
            if entry:
 
                Session().delete(entry)
 
        Session().commit()
 

	
 
    @pytest.fixture(scope="module")
 
    def testfork(self):
 
        # create fork so the repo stays untouched
 
        git_fork_name = u'%s_fork%s' % (base.GIT_REPO, next(_RandomNameSequence()))
 
        fixture.create_fork(base.GIT_REPO, git_fork_name)
 
        hg_fork_name = u'%s_fork%s' % (base.HG_REPO, next(_RandomNameSequence()))
 
        fixture.create_fork(base.HG_REPO, hg_fork_name)
 
        return {'git': git_fork_name, 'hg': hg_fork_name}
 

	
 
    @parametrize_vcs_test
 
    def test_clone_repo_by_admin(self, webserver, vt):
 
        clone_url = vt.repo_url_param(webserver, vt.repo_name)
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir())
 

	
 
        if vt.repo_type == 'git':
 
            assert 'Cloning into' in stdout + stderr
 
            assert stderr == '' or stdout == ''
 
        elif vt.repo_type == 'hg':
 
            assert 'requesting all changes' in stdout
 
            assert 'adding changesets' in stdout
 
            assert 'adding manifests' in stdout
 
            assert 'adding file changes' in stdout
 
            assert stderr == ''
 

	
 
    @parametrize_vcs_test_http
 
    def test_clone_wrong_credentials(self, webserver, vt):
 
        clone_url = vt.repo_url_param(webserver, vt.repo_name, password='bad!')
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        if vt.repo_type == 'git':
 
            assert 'fatal: Authentication failed' in stderr
 
        elif vt.repo_type == 'hg':
 
            assert 'abort: authorization failed' in stderr
 

	
 
    def test_clone_git_dir_as_hg(self, webserver):
 
        clone_url = HgHttpVcsTest.repo_url_param(webserver, base.GIT_REPO)
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'HTTP Error 404: Not Found' in stderr or "not a valid repository" in stdout and 'abort:' in stderr
 

	
 
    def test_clone_hg_repo_as_git(self, webserver):
 
        clone_url = GitHttpVcsTest.repo_url_param(webserver, base.HG_REPO)
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'not found' in stderr
 

	
 
    @parametrize_vcs_test
 
    def test_clone_non_existing_path(self, webserver, vt):
 
        clone_url = vt.repo_url_param(webserver, 'trololo')
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        if vt.repo_type == 'git':
 
            assert 'not found' in stderr or 'abort: Access to %r denied' % 'trololo' in stderr
 
        elif vt.repo_type == 'hg':
 
            assert 'HTTP Error 404: Not Found' in stderr or 'abort: no suitable response from remote hg' in stderr and 'remote: abort: Access to %r denied' % 'trololo' in stdout
 

	
 
    @parametrize_vcs_test
 
    def test_push_new_repo(self, webserver, vt):
 
        # Clear the log so we know what is added
 
        UserLog.query().delete()
 
        Session().commit()
 

	
 
        # Create an empty server repo using the API
 
        repo_name = u'new_%s_%s' % (vt.repo_type, next(_RandomNameSequence()))
 
        usr = User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
 
        params = {
 
            "id": 7,
 
            "api_key": usr.api_key,
 
            "method": 'create_repo',
 
            "args": dict(repo_name=repo_name,
 
                         owner=base.TEST_USER_ADMIN_LOGIN,
 
                         repo_type=vt.repo_type),
 
        }
 
        req = urllib2.Request(
 
            'http://%s:%s/_admin/api' % webserver.server_address,
 
            data=ascii_bytes(json.dumps(params)),
 
            headers={'content-type': 'application/json'})
 
        response = urllib2.urlopen(req)
 
        result = json.loads(response.read())
 
        # Expect something like:
 
        # {u'result': {u'msg': u'Created new repository `new_XXX`', u'task': None, u'success': True}, u'id': 7, u'error': None}
 
        assert result[u'result'][u'success']
 

	
 
        # Create local clone of the empty server repo
 
        local_clone_dir = _get_tmp_dir()
 
        clone_url = vt.repo_url_param(webserver, repo_name)
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, local_clone_dir)
 

	
 
        # Make 3 commits and push to the empty server repo.
 
        # The server repo doesn't have any other heads than the
 
        # refs/heads/master we are pushing, but the `git log` in the push hook
 
        # should still list the 3 commits.
 
        stdout, stderr = _add_files_and_push(webserver, vt, local_clone_dir, clone_url=clone_url)
 
        if vt.repo_type == 'git':
 
            _check_proper_git_push(stdout, stderr)
 
        elif vt.repo_type == 'hg':
 
            assert 'pushing to ' in stdout
 
            assert 'remote: added ' in stdout
 

	
 
        # Verify that we got the right events in UserLog. Expect something like:
 
        # <UserLog('id:new_git_XXX:started_following_repo')>
 
        # <UserLog('id:new_git_XXX:user_created_repo')>
 
        # <UserLog('id:new_git_XXX:pull')>
 
        # <UserLog('id:new_git_XXX:push:aed9d4c1732a1927da3be42c47eb9afdc200d427,d38b083a07af10a9f44193486959a96a23db78da,4841ff9a2b385bec995f4679ef649adb3f437622')>
 
        action_parts = [ul.action.split(':', 1) for ul in UserLog.query().order_by(UserLog.user_log_id)]
 
        assert [(t[0], (t[1].count(',') + 1) if len(t) == 2 else 0) for t in action_parts] == ([
 
            (u'started_following_repo', 0),
 
            (u'user_created_repo', 0),
 
            (u'pull', 0),
 
            (u'push', 3)]
 
            if vt.repo_type == 'git' else [
 
            (u'started_following_repo', 0),
 
            (u'user_created_repo', 0),
 
            # (u'pull', 0), # Mercurial outgoing hook is not called for empty clones
 
            (u'push', 3)])
 

	
 
    @parametrize_vcs_test
 
    def test_push_new_file(self, webserver, testfork, vt):
 
        UserLog.query().delete()
 
        Session().commit()
 

	
 
        dest_dir = _get_tmp_dir()
 
        clone_url = vt.repo_url_param(webserver, vt.repo_name)
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
kallithea/tests/vcs/test_archives.py
Show inline comments
 
import datetime
 
import io
 
import os
 
import tarfile
 
import tempfile
 
import zipfile
 

	
 
import pytest
 

	
 
from kallithea.lib.vcs.exceptions import VCSError
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import TESTS_TMP_PATH
 

	
 

	
 
class ArchivesTestCaseMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
        for x in range(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('%d/file_%d.txt' % (x, x),
 
                        content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test_archive_zip(self):
 
        path = tempfile.mkstemp(dir=TESTS_TMP_PATH, prefix='test_archive_zip-')[1]
 
        with open(path, 'wb') as f:
 
            self.tip.fill_archive(stream=f, kind='zip', prefix='repo')
 
        out = zipfile.ZipFile(path)
 

	
 
        for x in xrange(5):
 
        for x in range(5):
 
            node_path = '%d/file_%d.txt' % (x, x)
 
            decompressed = out.read('repo/' + node_path)
 
            assert decompressed == self.tip.get_node(node_path).content
 

	
 
    def test_archive_tgz(self):
 
        path = tempfile.mkstemp(dir=TESTS_TMP_PATH, prefix='test_archive_tgz-')[1]
 
        with open(path, 'wb') as f:
 
            self.tip.fill_archive(stream=f, kind='tgz', prefix='repo')
 
        outdir = tempfile.mkdtemp(dir=TESTS_TMP_PATH, prefix='test_archive_tgz-', suffix='-outdir')
 

	
 
        outfile = tarfile.open(path, 'r|gz')
 
        outfile.extractall(outdir)
 

	
 
        for x in xrange(5):
 
        for x in range(5):
 
            node_path = '%d/file_%d.txt' % (x, x)
 
            assert open(os.path.join(outdir, 'repo/' + node_path), 'rb').read() == self.tip.get_node(node_path).content
 

	
 
    def test_archive_tbz2(self):
 
        path = tempfile.mkstemp(dir=TESTS_TMP_PATH, prefix='test_archive_tbz2-')[1]
 
        with open(path, 'w+b') as f:
 
            self.tip.fill_archive(stream=f, kind='tbz2', prefix='repo')
 
        outdir = tempfile.mkdtemp(dir=TESTS_TMP_PATH, prefix='test_archive_tbz2-', suffix='-outdir')
 

	
 
        outfile = tarfile.open(path, 'r|bz2')
 
        outfile.extractall(outdir)
 

	
 
        for x in xrange(5):
 
        for x in range(5):
 
            node_path = '%d/file_%d.txt' % (x, x)
 
            assert open(os.path.join(outdir, 'repo/' + node_path), 'rb').read() == self.tip.get_node(node_path).content
 

	
 
    def test_archive_default_stream(self):
 
        tmppath = tempfile.mkstemp(dir=TESTS_TMP_PATH, prefix='test_archive_default_stream-')[1]
 
        with open(tmppath, 'wb') as stream:
 
            self.tip.fill_archive(stream=stream)
 
        mystream = io.BytesIO()
 
        self.tip.fill_archive(stream=mystream)
 
        mystream.seek(0)
 
        with open(tmppath, 'rb') as f:
 
            file_content = f.read()
 
            stringio_content = mystream.read()
 
            # the gzip header contains a MTIME header
 
            # because is takes a little bit of time from one fill_archive call to the next
 
            # this part may differ so don't include that part in the comparison
 
            assert file_content[:4] == stringio_content[:4]
 
            assert file_content[8:] == stringio_content[8:]
 

	
 
    def test_archive_wrong_kind(self):
 
        with pytest.raises(VCSError):
 
            self.tip.fill_archive(kind='wrong kind')
 

	
 
    def test_archive_empty_prefix(self):
 
        with pytest.raises(VCSError):
 
            self.tip.fill_archive(prefix='')
 

	
 
    def test_archive_prefix_with_leading_slash(self):
 
        with pytest.raises(VCSError):
 
            self.tip.fill_archive(prefix='/any')
 

	
 

	
 
class TestGitArchive(ArchivesTestCaseMixin):
 
    backend_alias = 'git'
 

	
 

	
 
class TestHgArchive(ArchivesTestCaseMixin):
 
    backend_alias = 'hg'
kallithea/tests/vcs/test_changesets.py
Show inline comments
 
# encoding: utf-8
 

	
 
import datetime
 

	
 
import pytest
 

	
 
from kallithea.lib import vcs
 
from kallithea.lib.vcs.backends.base import BaseChangeset
 
from kallithea.lib.vcs.exceptions import BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError
 
from kallithea.lib.vcs.nodes import AddedFileNodesGenerator, ChangedFileNodesGenerator, FileNode, RemovedFileNodesGenerator
 
from kallithea.tests.vcs.base import _BackendTestMixin
 

	
 

	
 
class TestBaseChangeset(object):
 

	
 
    def test_as_dict(self):
 
        changeset = BaseChangeset()
 
        changeset.raw_id = 'RAW_ID'
 
        changeset.short_id = 'SHORT_ID'
 
        changeset.revision = 1009
 
        changeset.date = datetime.datetime(2011, 1, 30, 1, 45)
 
        changeset.message = 'Message of a commit'
 
        changeset.author = 'Joe Doe <joe.doe@example.com>'
 
        changeset.added = [FileNode('foo/bar/baz'), FileNode(u'foobar'), FileNode(u'blåbærgrød')]
 
        changeset.changed = []
 
        changeset.removed = []
 
        assert changeset.as_dict() == {
 
            'raw_id': 'RAW_ID',
 
            'short_id': 'SHORT_ID',
 
            'revision': 1009,
 
            'date': datetime.datetime(2011, 1, 30, 1, 45),
 
            'message': 'Message of a commit',
 
            'author': {
 
                'name': 'Joe Doe',
 
                'email': 'joe.doe@example.com',
 
            },
 
            'added': ['foo/bar/baz', 'foobar', u'bl\xe5b\xe6rgr\xf8d'],
 
            'changed': [],
 
            'removed': [],
 
        }
 

	
 

	
 
class _ChangesetsWithCommitsTestCaseixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
        for x in range(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test_new_branch(self):
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
        )
 
        assert 'foobar' in self.repo.branches
 
        assert foobar_tip.branch == 'foobar'
 
        assert foobar_tip.branches == ['foobar']
 
        # 'foobar' should be the only branch that contains the new commit
 
        branch_tips = list(self.repo.branches.values())
 
        assert branch_tips.count(str(foobar_tip.raw_id)) == 1
 

	
 
    def test_new_head_in_default_branch(self):
 
        tip = self.repo.get_changeset()
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
            parents=[tip],
 
        )
 
        self.imc.change(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\nand more...\n'))
 
        newtip = self.imc.commit(
 
            message=u'At default branch',
 
            author=u'joe',
 
            branch=foobar_tip.branch,
 
            parents=[foobar_tip],
 
        )
 

	
 
        newest_tip = self.imc.commit(
 
            message=u'Merged with %s' % foobar_tip.raw_id,
 
            author=u'joe',
 
            branch=self.backend_class.DEFAULT_BRANCH_NAME,
 
            parents=[newtip, foobar_tip],
 
        )
 

	
 
        assert newest_tip.branch == self.backend_class.DEFAULT_BRANCH_NAME
 
        assert newest_tip.branches == [self.backend_class.DEFAULT_BRANCH_NAME]
 

	
 
    def test_get_changesets_respects_branch_name(self):
 
        tip = self.repo.get_changeset()
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        doc_changeset = self.imc.commit(
 
            message=u'New branch: docs',
 
            author=u'joe',
 
            branch='docs',
 
        )
 
        self.imc.add(vcs.nodes.FileNode('newfile', content=''))
 
        self.imc.commit(
 
            message=u'Back in default branch',
 
            author=u'joe',
 
            parents=[tip],
 
        )
 
        default_branch_changesets = self.repo.get_changesets(
 
            branch_name=self.repo.DEFAULT_BRANCH_NAME)
 
        assert doc_changeset not in default_branch_changesets
 

	
 
    def test_get_changeset_by_branch(self):
 
        for branch, sha in self.repo.branches.items():
 
            assert sha == self.repo.get_changeset(branch).raw_id
 

	
 
    def test_get_changeset_by_tag(self):
 
        for tag, sha in self.repo.tags.items():
 
            assert sha == self.repo.get_changeset(tag).raw_id
 

	
 
    def test_get_changeset_parents(self):
 
        for test_rev in [1, 2, 3]:
 
            sha = self.repo.get_changeset(test_rev-1)
 
            assert [sha] == self.repo.get_changeset(test_rev).parents
 

	
 
    def test_get_changeset_children(self):
 
        for test_rev in [1, 2, 3]:
 
            sha = self.repo.get_changeset(test_rev+1)
 
            assert [sha] == self.repo.get_changeset(test_rev).children
 

	
 

	
 
class _ChangesetsTestCaseMixin(_BackendTestMixin):
 
    recreate_repo_per_test = False
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
        for x in range(5):
 
            yield {
 
                'message': u'Commit %d' % x,
 
                'author': u'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test_simple(self):
 
        tip = self.repo.get_changeset()
 
        assert tip.date == datetime.datetime(2010, 1, 3, 20)
 

	
 
    def test_get_changesets_is_ordered_by_date(self):
 
        changesets = list(self.repo.get_changesets())
 
        ordered_by_date = sorted(changesets,
 
            key=lambda cs: cs.date)
 

	
 
        assert changesets == ordered_by_date
 

	
 
    def test_get_changesets_respects_start(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(start=second_id))
 
        assert len(changesets) == 4
 

	
 
    def test_get_changesets_numerical_id_respects_start(self):
 
        second_id = 1
 
        changesets = list(self.repo.get_changesets(start=second_id))
 
        assert len(changesets) == 4
 

	
 
    def test_get_changesets_includes_start_changeset(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(start=second_id))
 
        assert changesets[0].raw_id == second_id
 

	
 
    def test_get_changesets_respects_end(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(end=second_id))
 
        assert changesets[-1].raw_id == second_id
 
        assert len(changesets) == 2
 

	
 
    def test_get_changesets_numerical_id_respects_end(self):
 
        second_id = 1
 
        changesets = list(self.repo.get_changesets(end=second_id))
 
        assert changesets.index(changesets[-1]) == second_id
 
        assert len(changesets) == 2
 

	
 
    def test_get_changesets_respects_both_start_and_end(self):
 
        second_id = self.repo.revisions[1]
 
        third_id = self.repo.revisions[2]
 
        changesets = list(self.repo.get_changesets(start=second_id,
 
            end=third_id))
 
        assert len(changesets) == 2
 

	
 
    def test_get_changesets_numerical_id_respects_both_start_and_end(self):
 
        changesets = list(self.repo.get_changesets(start=2, end=3))
 
        assert len(changesets) == 2
 

	
 
    def test_get_changesets_on_empty_repo_raises_EmptyRepository_error(self):
 
        repo = self.setup_empty_repo(self.backend_class)
 
        with pytest.raises(EmptyRepositoryError):
 
            list(repo.get_changesets(start='foobar'))
 

	
 
    def test_get_changesets_includes_end_changeset(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(end=second_id))
 
        assert changesets[-1].raw_id == second_id
 

	
 
    def test_get_changesets_respects_start_date(self):
 
        start_date = datetime.datetime(2010, 2, 1)
 
        for cs in self.repo.get_changesets(start_date=start_date):
 
            assert cs.date >= start_date
 

	
 
    def test_get_changesets_respects_end_date(self):
 
        start_date = datetime.datetime(2010, 1, 1)
 
        end_date = datetime.datetime(2010, 2, 1)
 
        for cs in self.repo.get_changesets(start_date=start_date,
 
                                           end_date=end_date):
 
            assert cs.date >= start_date
 
            assert cs.date <= end_date
 

	
 
    def test_get_changesets_respects_start_date_and_end_date(self):
 
        end_date = datetime.datetime(2010, 2, 1)
 
        for cs in self.repo.get_changesets(end_date=end_date):
 
            assert cs.date <= end_date
 

	
 
    def test_get_changesets_respects_reverse(self):
 
        changesets_id_list = [cs.raw_id for cs in
 
            self.repo.get_changesets(reverse=True)]
 
        assert changesets_id_list == list(reversed(self.repo.revisions))
 

	
 
    def test_get_filenodes_generator(self):
 
        tip = self.repo.get_changeset()
 
        filepaths = [node.path for node in tip.get_filenodes_generator()]
 
        assert filepaths == ['file_%d.txt' % x for x in xrange(5)]
 
        assert filepaths == ['file_%d.txt' % x for x in range(5)]
 

	
 
    def test_size(self):
 
        tip = self.repo.get_changeset()
 
        size = 5 * len('Foobar N') # Size of 5 files
 
        assert tip.size == size
 

	
 
    def test_author(self):
 
        tip = self.repo.get_changeset()
 
        assert tip.author == u'Joe Doe <joe.doe@example.com>'
 

	
 
    def test_author_name(self):
 
        tip = self.repo.get_changeset()
 
        assert tip.author_name == u'Joe Doe'
 

	
 
    def test_author_email(self):
 
        tip = self.repo.get_changeset()
 
        assert tip.author_email == u'joe.doe@example.com'
 

	
 
    def test_get_changesets_raise_changesetdoesnotexist_for_wrong_start(self):
 
        with pytest.raises(ChangesetDoesNotExistError):
 
            list(self.repo.get_changesets(start='foobar'))
 

	
 
    def test_get_changesets_raise_changesetdoesnotexist_for_wrong_end(self):
 
        with pytest.raises(ChangesetDoesNotExistError):
 
            list(self.repo.get_changesets(end='foobar'))
 

	
 
    def test_get_changesets_raise_branchdoesnotexist_for_wrong_branch_name(self):
 
        with pytest.raises(BranchDoesNotExistError):
 
            list(self.repo.get_changesets(branch_name='foobar'))
 

	
 
    def test_get_changesets_raise_repositoryerror_for_wrong_start_end(self):
 
        start = self.repo.revisions[-1]
 
        end = self.repo.revisions[0]
 
        with pytest.raises(RepositoryError):
 
            list(self.repo.get_changesets(start=start, end=end))
 

	
 
    def test_get_changesets_numerical_id_reversed(self):
 
        with pytest.raises(RepositoryError):
 
            [x for x in self.repo.get_changesets(start=3, end=2)]
 

	
 
    def test_get_changesets_numerical_id_respects_both_start_and_end_last(self):
 
        with pytest.raises(RepositoryError):
 
            last = len(self.repo.revisions)
 
            list(self.repo.get_changesets(start=last-1, end=last-2))
 

	
 
    def test_get_changesets_numerical_id_last_zero_error(self):
 
        with pytest.raises(RepositoryError):
 
            last = len(self.repo.revisions)
 
            list(self.repo.get_changesets(start=last-1, end=0))
 

	
 

	
 
class _ChangesetsChangesTestCaseMixin(_BackendTestMixin):
 
    recreate_repo_per_test = False
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return [
 
            {
 
                'message': u'Initial',
 
                'author': u'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foo/bar', content='foo'),
 
                    FileNode('foo/bał', content='foo'),
 
                    FileNode('foobar', content='foo'),
 
                    FileNode('qwe', content='foo'),
 
                ],
 
            },
 
            {
 
                'message': u'Massive changes',
 
                'author': u'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 22),
 
                'added': [FileNode('fallout', content='War never changes')],
 
                'changed': [
 
                    FileNode('foo/bar', content='baz'),
 
                    FileNode('foobar', content='baz'),
 
                ],
 
                'removed': [FileNode('qwe')],
 
            },
 
        ]
 

	
 
    def test_initial_commit(self):
 
        changeset = self.repo.get_changeset(0)
 
        assert sorted(list(changeset.added)) == sorted([
 
            changeset.get_node('foo/bar'),
 
            changeset.get_node('foo/bał'),
 
            changeset.get_node('foobar'),
 
            changeset.get_node('qwe'),
 
        ])
 
        assert list(changeset.changed) == []
 
        assert list(changeset.removed) == []
 
        assert u'foo/ba\u0142' in changeset.as_dict()['added']
 
        assert u'foo/ba\u0142' in changeset.__json__(with_file_list=True)['added']
 

	
 
    def test_head_added(self):
 
        changeset = self.repo.get_changeset()
 
        assert isinstance(changeset.added, AddedFileNodesGenerator)
 
        assert list(changeset.added) == [
 
            changeset.get_node('fallout'),
 
        ]
 
        assert isinstance(changeset.changed, ChangedFileNodesGenerator)
 
        assert list(changeset.changed) == [
 
            changeset.get_node('foo/bar'),
 
            changeset.get_node('foobar'),
 
        ]
 
        assert isinstance(changeset.removed, RemovedFileNodesGenerator)
 
        assert len(changeset.removed) == 1
 
        assert list(changeset.removed)[0].path == 'qwe'
 

	
 
    def test_get_filemode(self):
 
        changeset = self.repo.get_changeset()
 
        assert 33188 == changeset.get_file_mode('foo/bar')
 

	
 
    def test_get_filemode_non_ascii(self):
 
        changeset = self.repo.get_changeset()
 
        assert 33188 == changeset.get_file_mode('foo/bał')
 
        assert 33188 == changeset.get_file_mode(u'foo/bał')
 

	
 

	
 
class TestGitChangesetsWithCommits(_ChangesetsWithCommitsTestCaseixin):
 
    backend_alias = 'git'
 

	
 

	
 
class TestGitChangesets(_ChangesetsTestCaseMixin):
 
    backend_alias = 'git'
 

	
 

	
 
class TestGitChangesetsChanges(_ChangesetsChangesTestCaseMixin):
 
    backend_alias = 'git'
 

	
 

	
 
class TestHgChangesetsWithCommits(_ChangesetsWithCommitsTestCaseixin):
 
    backend_alias = 'hg'
 

	
 

	
 
class TestHgChangesets(_ChangesetsTestCaseMixin):
 
    backend_alias = 'hg'
 

	
 

	
 
class TestHgChangesetsChanges(_ChangesetsChangesTestCaseMixin):
 
    backend_alias = 'hg'
kallithea/tests/vcs/test_getitem.py
Show inline comments
 
import datetime
 

	
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.tests.vcs.base import _BackendTestMixin
 

	
 

	
 
class GetitemTestCaseMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
        for x in range(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test__getitem__last_item_is_tip(self):
 
        assert self.repo[-1] == self.repo.get_changeset()
 

	
 
    def test__getitem__returns_correct_items(self):
 
        changesets = [self.repo[x] for x in xrange(len(self.repo.revisions))]
 
        changesets = [self.repo[x] for x in range(len(self.repo.revisions))]
 
        assert changesets == list(self.repo.get_changesets())
 

	
 

	
 
class TestGitGetitem(GetitemTestCaseMixin):
 
    backend_alias = 'git'
 

	
 

	
 
class TestHgGetitem(GetitemTestCaseMixin):
 
    backend_alias = 'hg'
kallithea/tests/vcs/test_getslice.py
Show inline comments
 
import datetime
 

	
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.tests.vcs.base import _BackendTestMixin
 

	
 

	
 
class GetsliceTestCaseMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
        for x in range(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test__getslice__last_item_is_tip(self):
 
        assert list(self.repo[-1:])[0] == self.repo.get_changeset()
 

	
 
    def test__getslice__respects_start_index(self):
 
        assert list(self.repo[2:]) == [self.repo.get_changeset(rev) for rev in self.repo.revisions[2:]]
 

	
 
    def test__getslice__respects_negative_start_index(self):
 
        assert list(self.repo[-2:]) == [self.repo.get_changeset(rev) for rev in self.repo.revisions[-2:]]
 

	
 
    def test__getslice__respects_end_index(self):
 
        assert list(self.repo[:2]) == [self.repo.get_changeset(rev) for rev in self.repo.revisions[:2]]
 

	
 
    def test__getslice__respects_negative_end_index(self):
 
        assert list(self.repo[:-2]) == [self.repo.get_changeset(rev) for rev in self.repo.revisions[:-2]]
 

	
 

	
 
class TestGitGetslice(GetsliceTestCaseMixin):
 
    backend_alias = 'git'
 

	
 

	
 
class TestHgGetslice(GetsliceTestCaseMixin):
 
    backend_alias = 'hg'
kallithea/tests/vcs/test_inmemchangesets.py
Show inline comments
 
@@ -111,223 +111,223 @@ class InMemoryChangesetTestMixin(_Backen
 
            self.imc.add(node)
 
        message = u'Added: %s' % ', '.join((node.path for node in self.nodes))
 
        author = unicode(self.__class__)
 
        changeset = self.imc.commit(message=message, author=author)
 

	
 
        newtip = self.repo.get_changeset()
 
        assert changeset == newtip
 
        assert rev_count + 1 == len(self.repo.revisions)
 
        assert newtip.message == message
 
        assert newtip.author == author
 
        assert not any((
 
            self.imc.added,
 
            self.imc.changed,
 
            self.imc.removed
 
        ))
 
        for node in to_add:
 
            assert newtip.get_node(node.path).content == node.content
 

	
 
    def test_add_raise_already_added(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        with pytest.raises(NodeAlreadyAddedError):
 
            self.imc.add(node)
 

	
 
    def test_check_integrity_raise_already_exist(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.imc.commit(message=u'Added foobar', author=unicode(self))
 
        self.imc.add(node)
 
        with pytest.raises(NodeAlreadyExistsError):
 
            self.imc.commit(message='new message',
 
                            author=str(self))
 

	
 
    def test_change(self):
 
        self.imc.add(FileNode('foo/bar/baz', content='foo'))
 
        self.imc.add(FileNode('foo/fbar', content='foobar'))
 
        tip = self.imc.commit(u'Initial', u'joe.doe@example.com')
 

	
 
        # Change node's content
 
        node = FileNode('foo/bar/baz', content='My **changed** content')
 
        self.imc.change(node)
 
        self.imc.commit(u'Changed %s' % node.path, u'joe.doe@example.com')
 

	
 
        newtip = self.repo.get_changeset()
 
        assert tip != newtip
 
        assert tip.raw_id != newtip.raw_id
 
        assert newtip.get_node('foo/bar/baz').content == b'My **changed** content'
 

	
 
    def test_change_non_ascii(self):
 
        to_add = [
 
            FileNode('żółwik/zwierzątko', content='ćććć'),
 
            FileNode(u'żółwik/zwierzątko_uni', content=u'ćććć'),
 
        ]
 
        for node in to_add:
 
            self.imc.add(node)
 

	
 
        tip = self.imc.commit(u'Initial', u'joe.doe@example.com')
 

	
 
        # Change node's content
 
        node = FileNode('żółwik/zwierzątko', content='My **changed** content')
 
        self.imc.change(node)
 
        self.imc.commit(u'Changed %s' % safe_unicode(node.path),
 
                        u'joe.doe@example.com')
 

	
 
        node = FileNode(u'żółwik/zwierzątko_uni', content=u'My **changed** content')
 
        self.imc.change(node)
 
        self.imc.commit(u'Changed %s' % safe_unicode(node.path),
 
                        u'joe.doe@example.com')
 

	
 
        newtip = self.repo.get_changeset()
 
        assert tip != newtip
 
        assert tip.raw_id != newtip.raw_id
 

	
 
        assert newtip.get_node('żółwik/zwierzątko').content == b'My **changed** content'
 
        assert newtip.get_node('żółwik/zwierzątko_uni').content == b'My **changed** content'
 

	
 
    def test_change_raise_empty_repository(self):
 
        node = FileNode('foobar')
 
        with pytest.raises(EmptyRepositoryError):
 
            self.imc.change(node)
 

	
 
    def test_check_integrity_change_raise_node_does_not_exist(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.imc.commit(message=u'Added foobar', author=unicode(self))
 
        node = FileNode('not-foobar', content='')
 
        self.imc.change(node)
 
        with pytest.raises(NodeDoesNotExistError):
 
            self.imc.commit(message='Changed not existing node', author=str(self))
 

	
 
    def test_change_raise_node_already_changed(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.imc.commit(message=u'Added foobar', author=unicode(self))
 
        node = FileNode('foobar', content='more baz')
 
        self.imc.change(node)
 
        with pytest.raises(NodeAlreadyChangedError):
 
            self.imc.change(node)
 

	
 
    def test_check_integrity_change_raise_node_not_changed(self):
 
        self.test_add()  # Performs first commit
 

	
 
        node = FileNode(self.nodes[0].path, content=self.nodes[0].content)
 
        self.imc.change(node)
 
        with pytest.raises(NodeNotChangedError):
 
            self.imc.commit(
 
                message=u'Trying to mark node as changed without touching it',
 
                author=unicode(self)
 
            )
 

	
 
    def test_change_raise_node_already_removed(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.imc.commit(message=u'Added foobar', author=unicode(self))
 
        self.imc.remove(FileNode('foobar'))
 
        with pytest.raises(NodeAlreadyRemovedError):
 
            self.imc.change(node)
 

	
 
    def test_remove(self):
 
        self.test_add()  # Performs first commit
 

	
 
        tip = self.repo.get_changeset()
 
        node = self.nodes[0]
 
        assert node.content == tip.get_node(node.path).content
 
        self.imc.remove(node)
 
        self.imc.commit(message=u'Removed %s' % node.path, author=unicode(self))
 

	
 
        newtip = self.repo.get_changeset()
 
        assert tip != newtip
 
        assert tip.raw_id != newtip.raw_id
 
        with pytest.raises(NodeDoesNotExistError):
 
            newtip.get_node(node.path)
 

	
 
    def test_remove_last_file_from_directory(self):
 
        node = FileNode('omg/qwe/foo/bar', content='foobar')
 
        self.imc.add(node)
 
        self.imc.commit(u'added', u'joe doe')
 

	
 
        self.imc.remove(node)
 
        tip = self.imc.commit(u'removed', u'joe doe')
 
        with pytest.raises(NodeDoesNotExistError):
 
            tip.get_node('omg/qwe/foo/bar')
 

	
 
    def test_remove_raise_node_does_not_exist(self):
 
        self.imc.remove(self.nodes[0])
 
        with pytest.raises(NodeDoesNotExistError):
 
            self.imc.commit(
 
                message='Trying to remove node at empty repository',
 
                author=str(self)
 
            )
 

	
 
    def test_check_integrity_remove_raise_node_does_not_exist(self):
 
        self.test_add()  # Performs first commit
 

	
 
        node = FileNode('no-such-file')
 
        self.imc.remove(node)
 
        with pytest.raises(NodeDoesNotExistError):
 
            self.imc.commit(
 
                message=u'Trying to remove not existing node',
 
                author=unicode(self)
 
            )
 

	
 
    def test_remove_raise_node_already_removed(self):
 
        self.test_add() # Performs first commit
 

	
 
        node = FileNode(self.nodes[0].path)
 
        self.imc.remove(node)
 
        with pytest.raises(NodeAlreadyRemovedError):
 
            self.imc.remove(node)
 

	
 
    def test_remove_raise_node_already_changed(self):
 
        self.test_add()  # Performs first commit
 

	
 
        node = FileNode(self.nodes[0].path, content='Bending time')
 
        self.imc.change(node)
 
        with pytest.raises(NodeAlreadyChangedError):
 
            self.imc.remove(node)
 

	
 
    def test_reset(self):
 
        self.imc.add(FileNode('foo', content='bar'))
 
        #self.imc.change(FileNode('baz', content='new'))
 
        #self.imc.remove(FileNode('qwe'))
 
        self.imc.reset()
 
        assert not any((
 
            self.imc.added,
 
            self.imc.changed,
 
            self.imc.removed
 
        ))
 

	
 
    def test_multiple_commits(self):
 
        N = 3  # number of commits to perform
 
        last = None
 
        for x in xrange(N):
 
        for x in range(N):
 
            fname = 'file%s' % str(x).rjust(5, '0')
 
            content = 'foobar\n' * x
 
            node = FileNode(fname, content=content)
 
            self.imc.add(node)
 
            commit = self.imc.commit(u"Commit no. %s" % (x + 1), author=u'vcs')
 
            assert last != commit
 
            last = commit
 

	
 
        # Check commit number for same repo
 
        assert len(self.repo.revisions) == N
 

	
 
        # Check commit number for recreated repo
 
        assert len(self.repo.revisions) == N
 

	
 
    def test_date_attr(self):
 
        node = FileNode('foobar.txt', content='Foobared!')
 
        self.imc.add(node)
 
        date = datetime.datetime(1985, 1, 30, 1, 45)
 
        commit = self.imc.commit(u"Committed at time when I was born ;-)",
 
            author=u'lb <lb@example.com>', date=date)
 

	
 
        assert commit.date == date
 

	
 

	
 
class TestGitInMemoryChangeset(InMemoryChangesetTestMixin):
 
    backend_alias = 'git'
 

	
 

	
 
class TestHgInMemoryChangeset(InMemoryChangesetTestMixin):
 
    backend_alias = 'hg'
0 comments (0 inline, 0 general)