Changeset - 6af08d44daa8
[Not reviewed]
default
0 2 0
Mads Kiilerich - 7 years ago 2018-08-08 02:23:11
mads@kiilerich.com
git: fix push to empty repo (Issue 323)

Git would fail to log revisions when the list of heads to exclude included an
empty string (in place of the pushed ref).

To avoid that, skip the skipped revision instead of making it an empty string.
`git log --not` works fine without providing any revisions to "not".

Verify in test_push_new_repo_git that it actually logged the push.
2 files changed with 7 insertions and 7 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/hooks.py
Show inline comments
 
@@ -354,118 +354,118 @@ def log_delete_user(user_dict, deleted_b
 
     'active',
 
     'password',
 
     'emails',
 
     'inherit_default_permissions'
 

	
 
    """
 
    from kallithea import EXTENSIONS
 
    callback = getattr(EXTENSIONS, 'DELETE_USER_HOOK', None)
 
    if callable(callback):
 
        return callback(deleted_by=deleted_by, **user_dict)
 

	
 
    return 0
 

	
 

	
 
def _hook_environment(repo_path):
 
    """
 
    Create a light-weight environment for stand-alone scripts and return an UI and the
 
    db repository.
 

	
 
    Git hooks are executed as subprocess of Git while Kallithea is waiting, and
 
    they thus need enough info to be able to create an app environment and
 
    connect to the database.
 
    """
 
    from paste.deploy import appconfig
 
    from sqlalchemy import engine_from_config
 
    from kallithea.config.environment import load_environment
 
    from kallithea.model.base import init_model
 

	
 
    extras = _extract_extras()
 
    ini_file_path = extras['config']
 
    #logging.config.fileConfig(ini_file_path) # Note: we are in a different process - don't use configured logging
 
    app_conf = appconfig('config:%s' % ini_file_path)
 
    conf = load_environment(app_conf.global_conf, app_conf.local_conf)
 

	
 
    setup_cache_regions(conf)
 

	
 
    engine = engine_from_config(conf, 'sqlalchemy.')
 
    init_model(engine)
 

	
 
    repo_path = safe_unicode(repo_path)
 
    # fix if it's not a bare repo
 
    if repo_path.endswith(os.sep + '.git'):
 
        repo_path = repo_path[:-5]
 

	
 
    repo = Repository.get_by_full_path(repo_path)
 
    if not repo:
 
        raise OSError('Repository %s not found in database'
 
                      % (safe_str(repo_path)))
 

	
 
    baseui = make_ui('db')
 
    return baseui, repo
 

	
 

	
 
def handle_git_pre_receive(repo_path, git_stdin_lines):
 
    """Called from Git pre-receive hook"""
 
    baseui, repo = _hook_environment(repo_path)
 
    scm_repo = repo.scm_instance
 
    push_lock_handling(baseui, scm_repo)
 
    return 0
 

	
 

	
 
def handle_git_post_receive(repo_path, git_stdin_lines):
 
    """Called from Git post-receive hook"""
 
    baseui, repo = _hook_environment(repo_path)
 

	
 
    # the post push hook should never use the cached instance
 
    scm_repo = repo.scm_instance_no_cache()
 

	
 
    _hooks = dict(baseui.configitems('hooks')) or {}
 
    # if push hook is enabled via web interface
 
    if _hooks.get(Ui.HOOK_PUSH_LOG):
 
        rev_data = []
 
        for l in git_stdin_lines:
 
            old_rev, new_rev, ref = l.strip().split(' ')
 
            _ref_data = ref.split('/')
 
            if _ref_data[1] in ['tags', 'heads']:
 
                rev_data.append({'old_rev': old_rev,
 
                                 'new_rev': new_rev,
 
                                 'ref': ref,
 
                                 'type': _ref_data[1],
 
                                 'name': '/'.join(_ref_data[2:])})
 

	
 
        git_revs = []
 
        for push_ref in rev_data:
 
            _type = push_ref['type']
 
            if _type == 'heads':
 
                if push_ref['old_rev'] == EmptyChangeset().raw_id:
 
                    # update the symbolic ref if we push new repo
 
                    if scm_repo.is_empty():
 
                        scm_repo._repo.refs.set_symbolic_ref('HEAD',
 
                                            'refs/heads/%s' % push_ref['name'])
 

	
 
                    # build exclude list without the ref
 
                    cmd = ['for-each-ref', '--format=%(refname)', 'refs/heads/*']
 
                    stdout, stderr = scm_repo.run_git_command(cmd)
 
                    ref = push_ref['ref']
 
                    heads = [head if head != ref else '' for head in stdout.splitlines()]
 
                    heads = [head for head in stdout.splitlines() if head != ref]
 
                    # now list the git revs while excluding from the list
 
                    cmd = ['log', push_ref['new_rev'], '--reverse', '--pretty=format:%H']
 
                    cmd.append('--not')
 
                    cmd.extend(heads)
 
                    cmd.extend(heads) # empty list is ok
 
                    stdout, stderr = scm_repo.run_git_command(cmd)
 
                    git_revs += stdout.splitlines()
 

	
 
                elif push_ref['new_rev'] == EmptyChangeset().raw_id:
 
                    # delete branch case
 
                    git_revs += ['delete_branch=>%s' % push_ref['name']]
 
                else:
 
                    cmd = ['log', '%(old_rev)s..%(new_rev)s' % push_ref,
 
                           '--reverse', '--pretty=format:%H']
 
                    stdout, stderr = scm_repo.run_git_command(cmd)
 
                    git_revs += stdout.splitlines()
 

	
 
            elif _type == 'tags':
 
                git_revs += ['tag=>%s' % push_ref['name']]
 

	
 
        log_push_action(baseui, scm_repo, _git_revs=git_revs)
 
    return 0
kallithea/tests/other/test_vcs_operations.py
Show inline comments
 
@@ -197,206 +197,206 @@ class TestVCSOperations(TestController):
 
                Session().delete(entry)
 
        Session().commit()
 

	
 
    @pytest.fixture(scope="module")
 
    def testfork(self):
 
        # create fork so the repo stays untouched
 
        git_fork_name = u'%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(GIT_REPO, git_fork_name)
 
        hg_fork_name = u'%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(HG_REPO, hg_fork_name)
 
        return {'git': git_fork_name, 'hg': hg_fork_name}
 

	
 
    def test_clone_hg_repo_by_admin(self, webserver):
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir())
 

	
 
        assert 'requesting all changes' in stdout
 
        assert 'adding changesets' in stdout
 
        assert 'adding manifests' in stdout
 
        assert 'adding file changes' in stdout
 

	
 
        assert stderr == ''
 

	
 
    def test_clone_git_repo_by_admin(self, webserver):
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir())
 

	
 
        assert 'Cloning into' in stdout + stderr
 
        assert stderr == '' or stdout == ''
 

	
 
    def test_clone_wrong_credentials_hg(self, webserver):
 
        clone_url = webserver.repo_url(HG_REPO, password='bad!')
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'abort: authorization failed' in stderr
 

	
 
    def test_clone_wrong_credentials_git(self, webserver):
 
        clone_url = webserver.repo_url(GIT_REPO, password='bad!')
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'fatal: Authentication failed' in stderr
 

	
 
    def test_clone_git_dir_as_hg(self, webserver):
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_clone_hg_repo_as_git(self, webserver):
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'not found' in stderr
 

	
 
    def test_clone_non_existing_path_hg(self, webserver):
 
        clone_url = webserver.repo_url('trololo')
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_clone_non_existing_path_git(self, webserver):
 
        clone_url = webserver.repo_url('trololo')
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'not found' in stderr
 

	
 
    def test_push_new_repo_git(self, webserver):
 
        # Clear the log so we know what is added
 
        UserLog.query().delete()
 
        Session().commit()
 

	
 
        # Create an empty server repo using the API
 
        repo_name = u'new_git_%s' % _RandomNameSequence().next()
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        params = {
 
            "id": 7,
 
            "api_key": usr.api_key,
 
            "method": 'create_repo',
 
            "args": dict(repo_name=repo_name,
 
                         owner=TEST_USER_ADMIN_LOGIN,
 
                         repo_type='git'),
 
        }
 
        req = urllib2.Request(
 
            'http://%s:%s/_admin/api' % webserver.server_address,
 
            data=json.dumps(params),
 
            headers={'content-type': 'application/json'})
 
        response = urllib2.urlopen(req)
 
        result = json.loads(response.read())
 
        # Expect something like:
 
        # {u'result': {u'msg': u'Created new repository `new_git_XXX`', u'task': None, u'success': True}, u'id': 7, u'error': None}
 
        assert result[u'result'][u'success']
 

	
 
        # Create local clone of the empty server repo
 
        local_clone_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(repo_name)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, local_clone_dir, ignoreReturnCode=True)
 

	
 
        # Make 3 commits and push to the empty server repo.
 
        # The server repo doesn't have any other heads than the
 
        # refs/heads/master we are pushing, but the `git log` in the push hook
 
        # should still list the 3 commits.
 
        stdout, stderr = _add_files_and_push(webserver, 'git', local_clone_dir, clone_url=clone_url)
 
        # FIXME: the push kind of failed with something like:
 
        # remote: fatal: ambiguous argument '': unknown revision or path not in the working tree.
 
        assert 'remote: fatal' in stderr
 
        _check_proper_git_push(stdout, stderr)
 

	
 
        # Verify that we got the right events in UserLog. Expect something like:
 
        # <UserLog('id:new_git_XXX:started_following_repo')>
 
        # <UserLog('id:new_git_XXX:user_created_repo')>
 
        # <UserLog('id:new_git_XXX:pull')>
 
        # - but no push logging
 
        # <UserLog('id:new_git_XXX:push:aed9d4c1732a1927da3be42c47eb9afdc200d427,d38b083a07af10a9f44193486959a96a23db78da,4841ff9a2b385bec995f4679ef649adb3f437622')>
 
        uls = list(UserLog.query().order_by(UserLog.user_log_id))
 
        assert len(uls) == 3
 
        assert len(uls) == 4
 
        assert uls[0].action == 'started_following_repo'
 
        assert uls[1].action == 'user_created_repo'
 
        assert uls[2].action == 'pull'
 
        assert uls[3].action.startswith(u'push:')
 
        assert uls[3].action.count(',') == 2 # expect 3 commits
 

	
 
    def test_push_new_file_hg(self, webserver, testfork):
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        clone_url = webserver.repo_url(testfork['hg'])
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, clone_url=clone_url)
 

	
 
        assert 'pushing to' in stdout
 
        assert 'Repository size' in stdout
 
        assert 'Last revision is now' in stdout
 

	
 
    def test_push_new_file_git(self, webserver, testfork):
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        # commit some stuff into this repo
 
        clone_url = webserver.repo_url(testfork['git'])
 
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, clone_url=clone_url)
 
        print [(x.repo_full_path,x.repo_path) for x in Repository.query()] # TODO: what is this for
 
        _check_proper_git_push(stdout, stderr)
 

	
 
    def test_push_invalidates_cache_hg(self, webserver, testfork):
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               == HG_REPO).scalar()
 
        if not key:
 
            key = CacheInvalidation(HG_REPO, HG_REPO)
 
            Session().add(key)
 

	
 
        key.cache_active = True
 
        Session().commit()
 

	
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(testfork['hg'])
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, files_no=1, clone_url=clone_url)
 

	
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               == testfork['hg']).all()
 
        assert key == []
 

	
 
    def test_push_invalidates_cache_git(self, webserver, testfork):
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               == GIT_REPO).scalar()
 
        if not key:
 
            key = CacheInvalidation(GIT_REPO, GIT_REPO)
 
            Session().add(key)
 

	
 
        key.cache_active = True
 
        Session().commit()
 

	
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(testfork['git'])
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        # commit some stuff into this repo
 
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, files_no=1, clone_url=clone_url)
 
        _check_proper_git_push(stdout, stderr)
 

	
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               == testfork['git']).all()
 
        assert key == []
 

	
 
    def test_push_wrong_credentials_hg(self, webserver):
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, username='bad',
 
                                             password='name', ignoreReturnCode=True)
 

	
 
        assert 'abort: authorization failed' in stderr
 

	
 
    def test_push_wrong_credentials_git(self, webserver):
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, username='bad',
 
                                             password='name', ignoreReturnCode=True)
 

	
 
        assert 'fatal: Authentication failed' in stderr
 

	
 
    def test_push_with_readonly_credentials_hg(self, webserver):
 
        dest_dir = _get_tmp_dir()
 
        clone_url = webserver.repo_url(HG_REPO, username=TEST_USER_REGULAR_LOGIN, password=TEST_USER_REGULAR_PASS)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, username=TEST_USER_REGULAR_LOGIN,
 
                                             password=TEST_USER_REGULAR_PASS, ignoreReturnCode=True)
 

	
 
        assert 'abort: HTTP Error 403: Forbidden' in stderr
 

	
0 comments (0 inline, 0 general)