diff --git a/rhodecode/tests/vcs/utils.py b/rhodecode/tests/vcs/utils.py new file mode 100644 --- /dev/null +++ b/rhodecode/tests/vcs/utils.py @@ -0,0 +1,97 @@ +""" +Utilities for tests only. These are not or should not be used normally - +functions here are crafted as we don't want to use ``vcs`` to verify tests. +""" +import os +import re +import sys + +from subprocess import Popen + + +class VCSTestError(Exception): + pass + + +def run_command(cmd, args): + """ + Runs command on the system with given ``args``. + """ + command = ' '.join((cmd, args)) + p = Popen(command, shell=True) + status = os.waitpid(p.pid, 0)[1] + return status + + +def eprint(msg): + """ + Prints given ``msg`` into sys.stderr as nose test runner hides all output + from sys.stdout by default and if we want to pipe stream somewhere we don't + need those verbose messages anyway. + Appends line break. + """ + sys.stderr.write(msg) + sys.stderr.write('\n') + + +class SCMFetcher(object): + + def __init__(self, alias, test_repo_path, remote_repo, clone_cmd): + """ + :param clone_cmd: command which would clone remote repository; pass + only first bits - remote path and destination would be appended + using ``remote_repo`` and ``test_repo_path`` + """ + self.alias = alias + self.test_repo_path = test_repo_path + self.remote_repo = remote_repo + self.clone_cmd = clone_cmd + + def setup(self): + if not os.path.isdir(self.test_repo_path): + self.fetch_repo() + + def fetch_repo(self): + """ + Tries to fetch repository from remote path. + """ + remote = self.remote_repo + eprint("Fetching repository %s into %s" % (remote, self.test_repo_path)) + run_command(self.clone_cmd, '%s %s' % (remote, self.test_repo_path)) + + +def get_normalized_path(path): + """ + If given path exists, new path would be generated and returned. Otherwise + same whats given is returned. Assumes that there would be no more than + 10000 same named files. + """ + if os.path.exists(path): + dir, basename = os.path.split(path) + splitted_name = basename.split('.') + if len(splitted_name) > 1: + ext = splitted_name[-1] + else: + ext = None + name = '.'.join(splitted_name[:-1]) + matcher = re.compile(r'^.*-(\d{5})$') + start = 0 + m = matcher.match(name) + if not m: + # Haven't append number yet so return first + newname = '%s-00000' % name + newpath = os.path.join(dir, newname) + if ext: + newpath = '.'.join((newpath, ext)) + return get_normalized_path(newpath) + else: + start = int(m.group(1)[-5:]) + 1 + for x in xrange(start, 10000): + newname = name[:-5] + str(x).rjust(5, '0') + newpath = os.path.join(dir, newname) + if ext: + newpath = '.'.join((newpath, ext)) + if not os.path.exists(newpath): + return newpath + raise VCSTestError("Couldn't compute new path for %s" % path) + return path