# -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
Test suite for vcs push/pull operations.

The tests need Git > 1.8.1.

This file was forked by the Kallithea project in July 2014.
Original author and date, and relevant copyright and licensing information is below:
:created_on: Dec 30, 2010
:author: marcink
:copyright: (c) 2013 RhodeCode GmbH, and others.
:license: GPLv3, see LICENSE.md for more details.

"""

import os
import re
import tempfile
import time
import pytest

from tempfile import _RandomNameSequence
from subprocess import Popen, PIPE

from kallithea.tests.base import *
from kallithea.tests.fixture import Fixture
from kallithea.model.db import User, Repository, UserIpMap, CacheInvalidation
from kallithea.model.meta import Session
from kallithea.model.repo import RepoModel
from kallithea.model.user import UserModel

DEBUG = True
HOST = '127.0.0.1:4999'  # test host

fixture = Fixture()


class Command(object):

    def __init__(self, cwd):
        self.cwd = cwd

    def execute(self, cmd, *args, **environ):
        """
        Runs command on the system with given ``args``.
        """

        command = cmd + ' ' + ' '.join(args)
        ignoreReturnCode = environ.pop('ignoreReturnCode', False)
        if DEBUG:
            print '*** CMD %s ***' % command
        testenv = dict(os.environ)
        testenv['LANG'] = 'en_US.UTF-8'
        testenv['LANGUAGE'] = 'en_US:en'
        testenv['HGPLAIN'] = ''
        testenv['HGRCPATH'] = ''
        testenv.update(environ)
        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd, env=testenv)
        stdout, stderr = p.communicate()
        if DEBUG:
            if stdout:
                print 'stdout:', repr(stdout)
            if stderr:
                print 'stderr:', repr(stderr)
        if not ignoreReturnCode:
            assert p.returncode == 0
        return stdout, stderr


def _get_tmp_dir(prefix='vcs_operations-', suffix=''):
    return tempfile.mkdtemp(dir=TESTS_TMP_PATH, prefix=prefix, suffix=suffix)


def _add_files(vcs, dest_dir, files_no=3):
    """
    Generate some files, add it to dest_dir repo and push back
    vcs is git or hg and defines what VCS we want to make those files for

    :param vcs:
    :param dest_dir:
    """
    added_file = '%ssetup.py' % _RandomNameSequence().next()
    open(os.path.join(dest_dir, added_file), 'a').close()
    Command(dest_dir).execute('%s add %s' % (vcs, added_file))

    email = 'me@example.com'
    if os.name == 'nt':
        author_str = 'User <%s>' % email
    else:
        author_str = 'User ǝɯɐᴎ <%s>' % email
    for i in xrange(files_no):
        cmd = """echo "added_line%s" >> %s""" % (i, added_file)
        Command(dest_dir).execute(cmd)
        if vcs == 'hg':
            cmd = """hg commit -m "committed new %s" -u "%s" "%s" """ % (
                i, author_str, added_file
            )
        elif vcs == 'git':
            cmd = """git commit -m "committed new %s" --author "%s" "%s" """ % (
                i, author_str, added_file
            )
        # git commit needs EMAIL on some machines
        Command(dest_dir).execute(cmd, EMAIL=email)

def _add_files_and_push(webserver, vcs, dest_dir, ignoreReturnCode=False, files_no=3,
                            clone_url=None, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS):
    _add_files(vcs, dest_dir, files_no=files_no)
    # PUSH it back
    _REPO = None
    if vcs == 'hg':
        _REPO = HG_REPO
    elif vcs == 'git':
        _REPO = GIT_REPO

    if clone_url is None:
        clone_url = webserver.repo_url(_REPO, username=username, password=password)

    stdout = stderr = None
    if vcs == 'hg':
        stdout, stderr = Command(dest_dir).execute('hg push --verbose', clone_url, ignoreReturnCode=ignoreReturnCode)
    elif vcs == 'git':
        stdout, stderr = Command(dest_dir).execute('git push --verbose', clone_url, "master", ignoreReturnCode=ignoreReturnCode)

    return stdout, stderr


def set_anonymous_access(enable=True):
    user = User.get_default_user()
    user.active = enable
    Session().commit()
    print '\tanonymous access is now:', enable
    if enable != User.get_default_user().active:
        raise Exception('Cannot set anonymous access')


#==============================================================================
# TESTS
#==============================================================================


def _check_proper_git_push(stdout, stderr):
    # WTF Git stderr is output ?!
    assert 'fatal' not in stderr
    assert 'rejected' not in stderr
    assert 'Pushing to' in stderr
    assert 'master -> master' in stderr


@pytest.mark.usefixtures("test_context_fixture")
class TestVCSOperations(TestController):

    @classmethod
    def setup_class(cls):
        # DISABLE ANONYMOUS ACCESS
        set_anonymous_access(False)

    def setup_method(self, method):
        r = Repository.get_by_repo_name(GIT_REPO)
        Repository.unlock(r)
        r.enable_locking = False
        Session().commit()

        r = Repository.get_by_repo_name(HG_REPO)
        Repository.unlock(r)
        r.enable_locking = False
        Session().commit()

    def test_clone_hg_repo_by_admin(self, webserver):
        clone_url = webserver.repo_url(HG_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir())

        assert 'requesting all changes' in stdout
        assert 'adding changesets' in stdout
        assert 'adding manifests' in stdout
        assert 'adding file changes' in stdout

        assert stderr == ''

    def test_clone_git_repo_by_admin(self, webserver):
        clone_url = webserver.repo_url(GIT_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir())

        assert 'Cloning into' in stdout + stderr
        assert stderr == '' or stdout == ''

    def test_clone_wrong_credentials_hg(self, webserver):
        clone_url = webserver.repo_url(HG_REPO, password='bad!')
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
        assert 'abort: authorization failed' in stderr

    def test_clone_wrong_credentials_git(self, webserver):
        clone_url = webserver.repo_url(GIT_REPO, password='bad!')
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
        assert 'fatal: Authentication failed' in stderr

    def test_clone_git_dir_as_hg(self, webserver):
        clone_url = webserver.repo_url(GIT_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
        assert 'HTTP Error 404: Not Found' in stderr

    def test_clone_hg_repo_as_git(self, webserver):
        clone_url = webserver.repo_url(HG_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
        assert 'not found' in stderr

    def test_clone_non_existing_path_hg(self, webserver):
        clone_url = webserver.repo_url('trololo')
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
        assert 'HTTP Error 404: Not Found' in stderr

    def test_clone_non_existing_path_git(self, webserver):
        clone_url = webserver.repo_url('trololo')
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
        assert 'not found' in stderr

    def test_push_new_file_hg(self, webserver):
        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(HG_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)

        fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
        fixture.create_fork(HG_REPO, fork_name)
        clone_url = webserver.repo_url(fork_name)
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, clone_url=clone_url)

        assert 'pushing to' in stdout
        assert 'Repository size' in stdout
        assert 'Last revision is now' in stdout

    def test_push_new_file_git(self, webserver):
        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(GIT_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)

        # commit some stuff into this repo
        fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
        fixture.create_fork(GIT_REPO, fork_name)
        clone_url = webserver.repo_url(fork_name)
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, clone_url=clone_url)
        print [(x.repo_full_path,x.repo_path) for x in Repository.query()] # TODO: what is this for
        _check_proper_git_push(stdout, stderr)

    def test_push_invalidates_cache_hg(self, webserver):
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
                                               == HG_REPO).scalar()
        if not key:
            key = CacheInvalidation(HG_REPO, HG_REPO)
            Session().add(key)

        key.cache_active = True
        Session().commit()

        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(HG_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)

        fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
        fixture.create_fork(HG_REPO, fork_name)
        clone_url = webserver.repo_url(fork_name)
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, files_no=1, clone_url=clone_url)

        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
                                               == fork_name).all()
        assert key == []

    def test_push_invalidates_cache_git(self, webserver):
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
                                               == GIT_REPO).scalar()
        if not key:
            key = CacheInvalidation(GIT_REPO, GIT_REPO)
            Session().add(key)

        key.cache_active = True
        Session().commit()

        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(GIT_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)

        # commit some stuff into this repo
        fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
        fixture.create_fork(GIT_REPO, fork_name)
        clone_url = webserver.repo_url(fork_name)
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, files_no=1, clone_url=clone_url)
        _check_proper_git_push(stdout, stderr)

        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
                                               == fork_name).all()
        assert key == []

    def test_push_wrong_credentials_hg(self, webserver):
        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(HG_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)

        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, username='bad',
                                             password='name', ignoreReturnCode=True)

        assert 'abort: authorization failed' in stderr

    def test_push_wrong_credentials_git(self, webserver):
        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(GIT_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)

        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, username='bad',
                                             password='name', ignoreReturnCode=True)

        assert 'fatal: Authentication failed' in stderr

    def test_push_with_readonly_credentials_hg(self, webserver):
        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(HG_REPO, username=TEST_USER_REGULAR_LOGIN, password=TEST_USER_REGULAR_PASS)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)

        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, username=TEST_USER_REGULAR_LOGIN,
                                             password=TEST_USER_REGULAR_PASS, ignoreReturnCode=True)

        assert 'abort: HTTP Error 403: Forbidden' in stderr

    def test_push_with_readonly_credentials_git(self, webserver):
        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(GIT_REPO, username=TEST_USER_REGULAR_LOGIN, password=TEST_USER_REGULAR_PASS)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)

        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, username=TEST_USER_REGULAR_LOGIN,
                                             password=TEST_USER_REGULAR_PASS, ignoreReturnCode=True)

        assert 'The requested URL returned error: 403' in stderr

    def test_push_back_to_wrong_url_hg(self, webserver):
        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(HG_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)

        stdout, stderr = _add_files_and_push(
            webserver, 'hg', dest_dir, clone_url='http://%s:%s/tmp' % (
                webserver.server_address[0], webserver.server_address[1]),
            ignoreReturnCode=True)

        assert 'HTTP Error 404: Not Found' in stderr

    def test_push_back_to_wrong_url_git(self, webserver):
        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(GIT_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)

        stdout, stderr = _add_files_and_push(
            webserver, 'git', dest_dir, clone_url='http://%s:%s/tmp' % (
                webserver.server_address[0], webserver.server_address[1]),
            ignoreReturnCode=True)

        assert 'not found' in stderr

    def test_clone_and_create_lock_hg(self, webserver):
        # enable locking
        r = Repository.get_by_repo_name(HG_REPO)
        r.enable_locking = True
        Session().commit()
        # clone
        clone_url = webserver.repo_url(HG_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir())

        # check if lock was made
        r = Repository.get_by_repo_name(HG_REPO)
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id

    def test_clone_and_create_lock_git(self, webserver):
        # enable locking
        r = Repository.get_by_repo_name(GIT_REPO)
        r.enable_locking = True
        Session().commit()
        # clone
        clone_url = webserver.repo_url(GIT_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir())

        # check if lock was made
        r = Repository.get_by_repo_name(GIT_REPO)
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id

    def test_clone_after_repo_was_locked_hg(self, webserver):
        # lock repo
        r = Repository.get_by_repo_name(HG_REPO)
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
        # pull fails since repo is locked
        clone_url = webserver.repo_url(HG_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
        assert msg in stderr

    def test_clone_after_repo_was_locked_git(self, webserver):
        # lock repo
        r = Repository.get_by_repo_name(GIT_REPO)
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
        # pull fails since repo is locked
        clone_url = webserver.repo_url(GIT_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
        msg = ("""The requested URL returned error: 423""")
        assert msg in stderr

    def test_push_on_locked_repo_by_other_user_hg(self, webserver):
        # clone some temp
        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(HG_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)

        # lock repo
        r = Repository.get_by_repo_name(HG_REPO)
        # let this user actually push !
        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
                                          perm='repository.write')
        Session().commit()
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)

        # push fails repo is locked by other user !
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir,
                                             username=TEST_USER_REGULAR_LOGIN,
                                             password=TEST_USER_REGULAR_PASS,
                                             ignoreReturnCode=True)
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
        assert msg in stderr

    def test_push_on_locked_repo_by_other_user_git(self, webserver):
        # Note: Git hooks must be executable on unix. This test will thus fail
        # for example on Linux if /tmp is mounted noexec.

        # clone some temp
        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(GIT_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)

        # lock repo
        r = Repository.get_by_repo_name(GIT_REPO)
        # let this user actually push !
        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
                                          perm='repository.write')
        Session().commit()
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)

        # push fails repo is locked by other user !
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir,
                                             username=TEST_USER_REGULAR_LOGIN,
                                             password=TEST_USER_REGULAR_PASS,
                                             ignoreReturnCode=True)
        err = 'Repository `%s` locked by user `%s`' % (GIT_REPO, TEST_USER_ADMIN_LOGIN)
        assert err in stderr

        # TODO: fix this somehow later on Git, Git is stupid and even if we throw
        # back 423 to it, it makes ANOTHER request and we fail there with 405 :/

        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
                % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
        #msg = "405 Method Not Allowed"
        #assert msg in stderr

    def test_push_unlocks_repository_hg(self, webserver):
        # enable locking
        fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
        fixture.create_fork(HG_REPO, fork_name)
        r = Repository.get_by_repo_name(fork_name)
        r.enable_locking = True
        Session().commit()
        # clone some temp
        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(fork_name)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, dest_dir)

        # check for lock repo after clone
        r = Repository.get_by_repo_name(fork_name)
        uid = User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
        assert r.locked[0] == uid

        # push is ok and repo is now unlocked
        stdout, stderr = _add_files_and_push(webserver, 'hg', dest_dir, clone_url=clone_url)
        assert str('remote: Released lock on repo `%s`' % fork_name) in stdout
        # we need to cleanup the Session Here !
        Session.remove()
        r = Repository.get_by_repo_name(fork_name)
        assert r.locked == [None, None]

    # TODO: fix me ! somehow during tests hooks don't get called on Git
    def test_push_unlocks_repository_git(self, webserver):
        # enable locking
        fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
        fixture.create_fork(GIT_REPO, fork_name)
        r = Repository.get_by_repo_name(fork_name)
        r.enable_locking = True
        Session().commit()
        # clone some temp
        dest_dir = _get_tmp_dir()
        clone_url = webserver.repo_url(fork_name)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, dest_dir)

        # check for lock repo after clone
        r = Repository.get_by_repo_name(fork_name)
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id

        # push is ok and repo is now unlocked
        stdout, stderr = _add_files_and_push(webserver, 'git', dest_dir, clone_url=clone_url)
        _check_proper_git_push(stdout, stderr)

        assert ('remote: Released lock on repo `%s`' % fork_name) in stderr
        # we need to cleanup the Session Here !
        Session.remove()
        r = Repository.get_by_repo_name(fork_name)
        assert r.locked == [None, None]

    def test_ip_restriction_hg(self, webserver):
        user_model = UserModel()
        try:
            user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
            Session().commit()
            clone_url = webserver.repo_url(HG_REPO)
            stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
            assert 'abort: HTTP Error 403: Forbidden' in stderr
        finally:
            # release IP restrictions
            for ip in UserIpMap.query():
                UserIpMap.delete(ip.ip_id)
            Session().commit()

        # IP permissions are cached, need to wait for the cache in the server process to expire
        time.sleep(1.5)

        clone_url = webserver.repo_url(HG_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir())

        assert 'requesting all changes' in stdout
        assert 'adding changesets' in stdout
        assert 'adding manifests' in stdout
        assert 'adding file changes' in stdout

        assert stderr == ''

    def test_ip_restriction_git(self, webserver):
        user_model = UserModel()
        try:
            user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
            Session().commit()
            clone_url = webserver.repo_url(GIT_REPO)
            stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
            # The message apparently changed in Git 1.8.3, so match it loosely.
            assert re.search(r'\b403\b', stderr)
        finally:
            # release IP restrictions
            for ip in UserIpMap.query():
                UserIpMap.delete(ip.ip_id)
            Session().commit()

        # IP permissions are cached, need to wait for the cache in the server process to expire
        time.sleep(1.5)

        clone_url = webserver.repo_url(GIT_REPO)
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir())

        assert 'Cloning into' in stdout + stderr
        assert stderr == '' or stdout == ''
