Changeset - 469b16f3979a
[Not reviewed]
default
0 3 0
Mads Kiilerich - 6 years ago 2019-08-06 21:08:02
mads@kiilerich.com
vcs: drop get_total_seconds - we only support Python 2.7 which has timedelta.total_seconds()
3 files changed with 2 insertions and 31 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/vcs/utils/helpers.py
Show inline comments
 
@@ -176,60 +176,48 @@ def parse_datetime(text):
 
        '%m/%d/%Y %H:%M',
 
        '%m/%d/%Y',
 
        '%m/%d/%y %H:%M:%S',
 
        '%m/%d/%y %H:%M',
 
        '%m/%d/%y',
 
    )
 
    for format in INPUT_FORMATS:
 
        try:
 
            return datetime.datetime(*time.strptime(text, format)[:6])
 
        except ValueError:
 
            pass
 

	
 
    # Try descriptive texts
 
    if text == 'tomorrow':
 
        future = datetime.datetime.now() + datetime.timedelta(days=1)
 
        args = future.timetuple()[:3] + (23, 59, 59)
 
        return datetime.datetime(*args)
 
    elif text == 'today':
 
        return datetime.datetime(*datetime.datetime.today().timetuple()[:3])
 
    elif text == 'now':
 
        return datetime.datetime.now()
 
    elif text == 'yesterday':
 
        past = datetime.datetime.now() - datetime.timedelta(days=1)
 
        return datetime.datetime(*past.timetuple()[:3])
 
    else:
 
        days = 0
 
        matched = re.match(
 
            r'^((?P<weeks>\d+) ?w(eeks?)?)? ?((?P<days>\d+) ?d(ays?)?)?$', text)
 
        if matched:
 
            groupdict = matched.groupdict()
 
            if groupdict['days']:
 
                days += int(matched.groupdict()['days'])
 
            if groupdict['weeks']:
 
                days += int(matched.groupdict()['weeks']) * 7
 
            past = datetime.datetime.now() - datetime.timedelta(days=days)
 
            return datetime.datetime(*past.timetuple()[:3])
 

	
 
    raise ValueError('Wrong date: "%s"' % text)
 

	
 

	
 
def get_dict_for_attrs(obj, attrs):
 
    """
 
    Returns dictionary for each attribute from given ``obj``.
 
    """
 
    data = {}
 
    for attr in attrs:
 
        data[attr] = getattr(obj, attr)
 
    return data
 

	
 

	
 
def get_total_seconds(timedelta):
 
    """
 
    Backported for Python 2.5.
 

	
 
    See http://docs.python.org/library/datetime.html.
 
    """
 
    return ((timedelta.microseconds + (
 
            timedelta.seconds +
 
            timedelta.days * 24 * 60 * 60
 
        ) * 10**6) / 10**6)
kallithea/lib/vcs/utils/progressbar.py
Show inline comments
 
# encoding: UTF-8
 
import sys
 
import datetime
 
import string
 

	
 
from kallithea.lib.vcs.utils.filesize import filesizeformat
 
from kallithea.lib.vcs.utils.helpers import get_total_seconds
 

	
 

	
 
class ProgressBarError(Exception):
 
    pass
 

	
 

	
 
class AlreadyFinishedError(ProgressBarError):
 
    pass
 

	
 

	
 
class ProgressBar(object):
 

	
 
    default_elements = ['percentage', 'bar', 'steps']
 

	
 
    def __init__(self, steps=100, stream=None, elements=None):
 
        self.step = 0
 
        self.steps = steps
 
        self.stream = stream or sys.stderr
 
        self.bar_char = '='
 
        self.width = 50
 
        self.separator = ' | '
 
        self.elements = elements or self.default_elements
 
        self.started = None
 
        self.finished = False
 
        self.steps_label = 'Step'
 
        self.time_label = 'Time'
 
        self.eta_label = 'ETA'
 
        self.speed_label = 'Speed'
 
        self.transfer_label = 'Transfer'
 

	
 
    def __str__(self):
 
        return self.get_line()
 

	
 
    def __iter__(self):
 
        start = self.step
 
        end = self.steps + 1
 
        for x in xrange(start, end):
 
            self.render(x)
 
            yield x
 

	
 
    def get_separator(self):
 
        return self.separator
 

	
 
    def get_bar_char(self):
 
        return self.bar_char
 

	
 
    def get_bar(self):
 
        char = self.get_bar_char()
 
        perc = self.get_percentage()
 
        length = int(self.width * perc / 100)
 
        bar = char * length
 
        bar = bar.ljust(self.width)
 
        return bar
 

	
 
    def get_elements(self):
 
        return self.elements
 

	
 
    def get_template(self):
 
        separator = self.get_separator()
 
        elements = self.get_elements()
 
        return string.Template(separator.join((('$%s' % e) for e in elements)))
 

	
 
    def get_total_time(self, current_time=None):
 
        if current_time is None:
 
            current_time = datetime.datetime.now()
 
        if not self.started:
 
            return datetime.timedelta()
 
        return current_time - self.started
 

	
 
    def get_rendered_total_time(self):
 
        delta = self.get_total_time()
 
        if not delta:
 
            ttime = '-'
 
        else:
 
            ttime = str(delta)
 
        return '%s %s' % (self.time_label, ttime)
 

	
 
    def get_eta(self, current_time=None):
 
        if current_time is None:
 
            current_time = datetime.datetime.now()
 
        if self.step == 0:
 
            return datetime.timedelta()
 
        total_seconds = get_total_seconds(self.get_total_time())
 
        total_seconds = self.get_total_time().total_seconds()
 
        eta_seconds = total_seconds * self.steps / self.step - total_seconds
 
        return datetime.timedelta(seconds=int(eta_seconds))
 

	
 
    def get_rendered_eta(self):
 
        eta = self.get_eta()
 
        if not eta:
 
            eta = '--:--:--'
 
        else:
 
            eta = str(eta).rjust(8)
 
        return '%s: %s' % (self.eta_label, eta)
 

	
 
    def get_percentage(self):
 
        return float(self.step) / self.steps * 100
 

	
 
    def get_rendered_percentage(self):
 
        perc = self.get_percentage()
 
        return ('%s%%' % (int(perc))).rjust(5)
 

	
 
    def get_rendered_steps(self):
 
        return '%s: %s/%s' % (self.steps_label, self.step, self.steps)
 

	
 
    def get_rendered_speed(self, step=None, total_seconds=None):
 
        if step is None:
 
            step = self.step
 
        if total_seconds is None:
 
            total_seconds = get_total_seconds(self.get_total_time())
 
            total_seconds = self.get_total_time().total_seconds()
 
        if step <= 0 or total_seconds <= 0:
 
            speed = '-'
 
        else:
 
            speed = filesizeformat(float(step) / total_seconds)
 
        return '%s: %s/s' % (self.speed_label, speed)
 

	
 
    def get_rendered_transfer(self, step=None, steps=None):
 
        if step is None:
 
            step = self.step
 
        if steps is None:
 
            steps = self.steps
 

	
 
        if steps <= 0:
 
            return '%s: -' % self.transfer_label
 
        total = filesizeformat(float(steps))
 
        if step <= 0:
 
            transferred = '-'
 
        else:
 
            transferred = filesizeformat(float(step))
 
        return '%s: %s / %s' % (self.transfer_label, transferred, total)
 

	
 
    def get_context(self):
 
        return {
 
            'percentage': self.get_rendered_percentage(),
 
            'bar': self.get_bar(),
 
            'steps': self.get_rendered_steps(),
 
            'time': self.get_rendered_total_time(),
 
            'eta': self.get_rendered_eta(),
 
            'speed': self.get_rendered_speed(),
 
            'transfer': self.get_rendered_transfer(),
 
        }
 

	
 
    def get_line(self):
 
        template = self.get_template()
 
        context = self.get_context()
 
        return template.safe_substitute(**context)
 

	
 
    def write(self, data):
 
        self.stream.write(data)
 

	
 
    def render(self, step):
 
        if not self.started:
 
            self.started = datetime.datetime.now()
 
        if self.finished:
 
            raise AlreadyFinishedError
 
        self.step = step
 
        self.write('\r%s' % self)
 
        if step == self.steps:
kallithea/tests/vcs/test_utils.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
import os
 
import mock
 
import time
 
import shutil
 
import datetime
 

	
 
import pytest
 

	
 
from kallithea.lib.vcs.utils.paths import get_dirs_for_path
 
from kallithea.lib.vcs.utils.helpers import get_dict_for_attrs
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.utils.helpers import get_scms_for_path
 
from kallithea.lib.vcs.utils.helpers import get_total_seconds
 
from kallithea.lib.vcs.utils.helpers import parse_changesets
 
from kallithea.lib.vcs.utils.helpers import parse_datetime
 
from kallithea.lib.vcs.utils import author_email, author_name
 
from kallithea.lib.vcs.utils.paths import get_user_home
 
from kallithea.lib.vcs.exceptions import VCSError
 

	
 
from kallithea.tests.vcs.conf import TEST_HG_REPO, TEST_GIT_REPO, TESTS_TMP_PATH
 

	
 

	
 
class TestPaths(object):
 

	
 
    def _test_get_dirs_for_path(self, path, expected):
 
        """
 
        Tests if get_dirs_for_path returns same as expected.
 
        """
 
        expected = sorted(expected)
 
        result = sorted(get_dirs_for_path(path))
 
        assert result == expected, \
 
            "%s != %s which was expected result for path %s" % (result, expected, path)
 

	
 
    def test_get_dirs_for_path(self):
 
        path = 'foo/bar/baz/file'
 
        paths_and_results = (
 
            ('foo/bar/baz/file', ['foo', 'foo/bar', 'foo/bar/baz']),
 
            ('foo/bar/', ['foo', 'foo/bar']),
 
            ('foo/bar', ['foo']),
 
        )
 
        for path, expected in paths_and_results:
 
            self._test_get_dirs_for_path(path, expected)
 

	
 
    def test_get_scm(self):
 
        assert ('hg', TEST_HG_REPO) == get_scm(TEST_HG_REPO)
 
        assert ('git', TEST_GIT_REPO) == get_scm(TEST_GIT_REPO)
 

	
 
    def test_get_two_scms_for_path(self):
 
        multialias_repo_path = os.path.join(TESTS_TMP_PATH, 'hg-git-repo-2')
 
        if os.path.isdir(multialias_repo_path):
 
            shutil.rmtree(multialias_repo_path)
 

	
 
        os.mkdir(multialias_repo_path)
 

	
 
        with pytest.raises(VCSError):
 
            get_scm(multialias_repo_path)
 

	
 
    def test_get_scm_error_path(self):
 
        with pytest.raises(VCSError):
 
            get_scm('err')
 

	
 
@@ -186,81 +185,66 @@ class TestAuthorExtractors(object):
 
                    ("Username Last'o'Name", "username.lastname@example.com")),
 
                  ('mrf RFC_SPEC <username+lastname@example.com>',
 
                    ('mrf RFC_SPEC', 'username+lastname@example.com')),
 
                  ('username <user@example.com>',
 
                    ('username', 'user@example.com')),
 
                  ('username <user@example.com',
 
                   ('username', 'user@example.com')),
 
                  ('broken missing@example.com',
 
                   ('broken', 'missing@example.com')),
 
                  ('<justemail@example.com>',
 
                   ('', 'justemail@example.com')),
 
                  ('justname',
 
                   ('justname', '')),
 
                  ('Mr Double Name withemail@example.com ',
 
                   ('Mr Double Name', 'withemail@example.com')),
 
                  (u'John Doe <джондо à éẋàṁṗłê.ç°ḿ>',
 
                   (u'John Doe <\u0434\u0436\u043e\u043d\u0434\u043e \xe0 \xe9\u1e8b\xe0\u1e41\u1e57\u0142\xea.\xe7\xb0\u1e3f>', '')),
 
                  ]
 

	
 
    def test_author_email(self):
 
        for test_str, result in self.TEST_AUTHORS:
 
            assert result[1] == author_email(test_str)
 

	
 
    def test_author_name(self):
 
        for test_str, result in self.TEST_AUTHORS:
 
            assert result[0] == author_name(test_str)
 

	
 

	
 
class TestGetDictForAttrs(object):
 

	
 
    def test_returned_dict_has_expected_attrs(self):
 
        obj = mock.Mock()
 
        obj.NOT_INCLUDED = 'this key/value should not be included'
 
        obj.CONST = True
 
        obj.foo = 'aaa'
 
        obj.attrs = {'foo': 'bar'}
 
        obj.date = datetime.datetime(2010, 12, 31)
 
        obj.count = 1001
 

	
 
        assert get_dict_for_attrs(obj, ['CONST', 'foo', 'attrs', 'date', 'count']) ==  {
 
            'CONST': True,
 
            'foo': 'aaa',
 
            'attrs': {'foo': 'bar'},
 
            'date': datetime.datetime(2010, 12, 31),
 
            'count': 1001,
 
        }
 

	
 

	
 
class TestGetTotalSeconds(object):
 

	
 
    def assertTotalSecondsEqual(self, timedelta, expected_seconds):
 
        result = get_total_seconds(timedelta)
 
        assert result == expected_seconds, \
 
            "We computed %s seconds for %s but expected %s" \
 
            % (result, timedelta, expected_seconds)
 

	
 
    def test_get_total_seconds_returns_proper_value(self):
 
        self.assertTotalSecondsEqual(datetime.timedelta(seconds=1001), 1001)
 

	
 
    def test_get_total_seconds_returns_proper_value_for_partial_seconds(self):
 
        self.assertTotalSecondsEqual(datetime.timedelta(seconds=50.65), 50.65)
 

	
 

	
 
class TestGetUserHome(object):
 

	
 
    @mock.patch.object(os, 'environ', {})
 
    def test_defaults_to_none(self):
 
        assert get_user_home() == ''
 

	
 
    @mock.patch.object(os, 'environ', {'HOME': '/home/foobar'})
 
    def test_unix_like(self):
 
        assert get_user_home() == '/home/foobar'
 

	
 
    @mock.patch.object(os, 'environ', {'USERPROFILE': '/Users/foobar'})
 
    def test_windows_like(self):
 
        assert get_user_home() == '/Users/foobar'
 

	
 
    @mock.patch.object(os, 'environ', {'HOME': '/home/foobar',
 
        'USERPROFILE': '/Users/foobar'})
 
    def test_prefers_home_over_userprofile(self):
 
        assert get_user_home() == '/home/foobar'
0 comments (0 inline, 0 general)