Changeset - d2901d906ef3
[Not reviewed]
beta
0 4 0
Marcin Kuzminski - 13 years ago 2012-07-18 18:42:08
marcin@python-works.com
Fixed issue #501 error on setting set_as_fork to same repo
- also fixed html fill select set
- wrote set of tests for that option
4 files changed with 63 insertions and 6 deletions:
0 comments (0 inline, 0 general)
rhodecode/controllers/admin/repos.py
Show inline comments
 
@@ -75,98 +75,100 @@ class ReposController(BaseController):
 

	
 
    def __load_data(self, repo_name=None):
 
        """
 
        Load defaults settings for edit, and update
 

	
 
        :param repo_name:
 
        """
 
        self.__load_defaults()
 

	
 
        c.repo_info = db_repo = Repository.get_by_repo_name(repo_name)
 
        repo = db_repo.scm_instance
 

	
 
        if c.repo_info is None:
 
            h.flash(_('%s repository is not mapped to db perhaps'
 
                      ' it was created or renamed from the filesystem'
 
                      ' please run the application again'
 
                      ' in order to rescan repositories') % repo_name,
 
                      category='error')
 

	
 
            return redirect(url('repos'))
 

	
 
        choices, c.landing_revs = ScmModel().get_repo_landing_revs(c.repo_info)
 
        c.landing_revs_choices = choices
 

	
 
        c.default_user_id = User.get_by_username('default').user_id
 
        c.in_public_journal = UserFollowing.query()\
 
            .filter(UserFollowing.user_id == c.default_user_id)\
 
            .filter(UserFollowing.follows_repository == c.repo_info).scalar()
 

	
 
        if c.repo_info.stats:
 
            # this is on what revision we ended up so we add +1 for count
 
            last_rev = c.repo_info.stats.stat_on_revision + 1
 
        else:
 
            last_rev = 0
 
        c.stats_revision = last_rev
 

	
 
        c.repo_last_rev = repo.count() if repo.revisions else 0
 

	
 
        if last_rev == 0 or c.repo_last_rev == 0:
 
            c.stats_percentage = 0
 
        else:
 
            c.stats_percentage = '%.2f' % ((float((last_rev)) /
 
                                            c.repo_last_rev) * 100)
 

	
 
        defaults = RepoModel()._get_defaults(repo_name)
 

	
 
        c.repos_list = [('', _('--REMOVE FORK--'))]
 
        c.repos_list += [(x.repo_id, x.repo_name) for x in
 
                   Repository.query().order_by(Repository.repo_name).all()]
 
                    Repository.query().order_by(Repository.repo_name).all()
 
                    if x.repo_id != c.repo_info.repo_id]
 

	
 
        defaults['id_fork_of'] = db_repo.fork.repo_id if db_repo.fork else ''
 
        return defaults
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def index(self, format='html'):
 
        """GET /repos: All items in the collection"""
 
        # url('repos')
 

	
 
        c.repos_list = ScmModel().get_repos(Repository.query()
 
                                            .order_by(Repository.repo_name)
 
                                            .all(), sort_key='name_sort')
 
        return render('admin/repos/repos.html')
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
 
    def create(self):
 
        """
 
        POST /repos: Create a new item"""
 
        # url('repos')
 

	
 
        self.__load_defaults()
 
        form_result = {}
 
        try:
 
            form_result = RepoForm(repo_groups=c.repo_groups_choices,
 
                                   landing_revs=c.landing_revs_choices)()\
 
                            .to_python(dict(request.POST))
 
            RepoModel().create(form_result, self.rhodecode_user.user_id)
 
            if form_result['clone_uri']:
 
                h.flash(_('created repository %s from %s') \
 
                    % (form_result['repo_name'], form_result['clone_uri']),
 
                    category='success')
 
            else:
 
                h.flash(_('created repository %s') % form_result['repo_name'],
 
                    category='success')
 

	
 
            if request.POST.get('user_created'):
 
                # created by regular non admin user
 
                action_logger(self.rhodecode_user, 'user_created_repo',
 
                              form_result['repo_name_full'], self.ip_addr,
 
                              self.sa)
 
            else:
 
                action_logger(self.rhodecode_user, 'admin_created_repo',
 
                              form_result['repo_name_full'], self.ip_addr,
 
                              self.sa)
 
            Session.commit()
 
        except formencode.Invalid, errors:
 

	
 
            c.new_repo = errors.value['repo_name']
 

	
 
            if request.POST.get('user_created'):
 
@@ -372,75 +374,75 @@ class ReposController(BaseController):
 
        cur_token = request.POST.get('auth_token')
 
        token = get_token()
 
        if cur_token == token:
 
            try:
 
                repo_id = Repository.get_by_repo_name(repo_name).repo_id
 
                user_id = User.get_by_username('default').user_id
 
                self.scm_model.toggle_following_repo(repo_id, user_id)
 
                h.flash(_('Updated repository visibility in public journal'),
 
                        category='success')
 
                Session.commit()
 
            except:
 
                h.flash(_('An error occurred during setting this'
 
                          ' repository in public journal'),
 
                        category='error')
 

	
 
        else:
 
            h.flash(_('Token mismatch'), category='error')
 
        return redirect(url('edit_repo', repo_name=repo_name))
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def repo_pull(self, repo_name):
 
        """
 
        Runs task to update given repository with remote changes,
 
        ie. make pull on remote location
 

	
 
        :param repo_name:
 
        """
 
        try:
 
            ScmModel().pull_changes(repo_name, self.rhodecode_user.username)
 
            h.flash(_('Pulled from remote location'), category='success')
 
        except Exception, e:
 
            h.flash(_('An error occurred during pull from remote location'),
 
                    category='error')
 

	
 
        return redirect(url('edit_repo', repo_name=repo_name))
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def repo_as_fork(self, repo_name):
 
        """
 
        Mark given repository as a fork of another
 

	
 
        :param repo_name:
 
        """
 
        try:
 
            fork_id = request.POST.get('id_fork_of')
 
            repo = ScmModel().mark_as_fork(repo_name, fork_id,
 
                                    self.rhodecode_user.username)
 
            fork = repo.fork.repo_name if repo.fork else _('Nothing')
 
            Session.commit()
 
            h.flash(_('Marked repo %s as fork of %s') % (repo_name,fork),
 
            Session().commit()
 
            h.flash(_('Marked repo %s as fork of %s') % (repo_name, fork),
 
                    category='success')
 
        except Exception, e:
 
            raise
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during this operation'),
 
                    category='error')
 

	
 
        return redirect(url('edit_repo', repo_name=repo_name))
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def show(self, repo_name, format='html'):
 
        """GET /repos/repo_name: Show a specific item"""
 
        # url('repo', repo_name=ID)
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def edit(self, repo_name, format='html'):
 
        """GET /repos/repo_name/edit: Form to edit an existing item"""
 
        # url('edit_repo', repo_name=ID)
 
        defaults = self.__load_data(repo_name)
 

	
 
        return htmlfill.render(
 
            render('admin/repos/repo_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False
 
        )
rhodecode/model/scm.py
Show inline comments
 
@@ -335,96 +335,98 @@ class ScmModel(BaseModel):
 

	
 
        try:
 
            f = UserFollowing()
 
            f.user_id = user_id
 
            f.follows_user_id = follow_user_id
 
            self.sa.add(f)
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def is_following_repo(self, repo_name, user_id, cache=False):
 
        r = self.sa.query(Repository)\
 
            .filter(Repository.repo_name == repo_name).scalar()
 

	
 
        f = self.sa.query(UserFollowing)\
 
            .filter(UserFollowing.follows_repository == r)\
 
            .filter(UserFollowing.user_id == user_id).scalar()
 

	
 
        return f is not None
 

	
 
    def is_following_user(self, username, user_id, cache=False):
 
        u = User.get_by_username(username)
 

	
 
        f = self.sa.query(UserFollowing)\
 
            .filter(UserFollowing.follows_user == u)\
 
            .filter(UserFollowing.user_id == user_id).scalar()
 

	
 
        return f is not None
 

	
 
    def get_followers(self, repo):
 
        repo = self._get_repo(repo)
 

	
 
        return self.sa.query(UserFollowing)\
 
                .filter(UserFollowing.follows_repository == repo).count()
 

	
 
    def get_forks(self, repo):
 
        repo = self._get_repo(repo)
 
        return self.sa.query(Repository)\
 
                .filter(Repository.fork == repo).count()
 

	
 
    def get_pull_requests(self, repo):
 
        repo = self._get_repo(repo)
 
        return self.sa.query(PullRequest)\
 
                .filter(PullRequest.other_repo == repo).count()
 

	
 
    def mark_as_fork(self, repo, fork, user):
 
        repo = self.__get_repo(repo)
 
        fork = self.__get_repo(fork)
 
        if fork and repo.repo_id == fork.repo_id:
 
            raise Exception("Cannot set repository as fork of itself")
 
        repo.fork = fork
 
        self.sa.add(repo)
 
        return repo
 

	
 
    def pull_changes(self, repo, username):
 
        dbrepo = self.__get_repo(repo)
 
        clone_uri = dbrepo.clone_uri
 
        if not clone_uri:
 
            raise Exception("This repository doesn't have a clone uri")
 

	
 
        repo = dbrepo.scm_instance
 
        try:
 
            extras = {
 
                'ip': '',
 
                'username': username,
 
                'action': 'push_remote',
 
                'repository': dbrepo.repo_name,
 
                'scm': repo.alias,
 
            }
 
            Repository.inject_ui(repo, extras=extras)
 

	
 
            if repo.alias == 'git':
 
                repo.fetch(clone_uri)
 
            else:
 
                repo.pull(clone_uri)
 
            self.mark_for_invalidation(dbrepo.repo_name)
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def commit_change(self, repo, repo_name, cs, user, author, message,
 
                      content, f_path):
 

	
 
        if repo.alias == 'hg':
 
            from rhodecode.lib.vcs.backends.hg import \
 
                MercurialInMemoryChangeset as IMC
 
        elif repo.alias == 'git':
 
            from rhodecode.lib.vcs.backends.git import \
 
                GitInMemoryChangeset as IMC
 

	
 
        # decoding here will force that we have proper encoded values
 
        # in any other case this will throw exceptions and deny commit
 
        content = safe_str(content)
 
        path = safe_str(f_path)
 
        # message and author needs to be unicode
 
        # proper backend should then translate that into required type
 
        message = safe_unicode(message)
 
        author = safe_unicode(author)
rhodecode/tests/__init__.py
Show inline comments
 
@@ -115,50 +115,50 @@ def get_new_dir(title):
 
    from rhodecode.tests.vcs.utils import get_normalized_path
 
    name = TEST_REPO_PREFIX
 
    if title:
 
        name = '-'.join((name, title))
 
    hex = hashlib.sha1(str(time.time())).hexdigest()
 
    name = '-'.join((name, hex))
 
    path = os.path.join(TEST_DIR, name)
 
    return get_normalized_path(path)
 

	
 

	
 
class TestController(TestCase):
 

	
 
    def __init__(self, *args, **kwargs):
 
        wsgiapp = pylons.test.pylonsapp
 
        config = wsgiapp.config
 

	
 
        self.app = TestApp(wsgiapp)
 
        url._push_object(URLGenerator(config['routes.map'], environ))
 
        self.Session = Session
 
        self.index_location = config['app_conf']['index_dir']
 
        TestCase.__init__(self, *args, **kwargs)
 

	
 
    def log_user(self, username=TEST_USER_ADMIN_LOGIN,
 
                 password=TEST_USER_ADMIN_PASS):
 
        self._logged_username = username
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': username,
 
                                  'password': password})
 

	
 
        if 'invalid user name' in response.body:
 
            self.fail('could not login using %s %s' % (username, password))
 

	
 
        self.assertEqual(response.status, '302 Found')
 
        ses = response.session['rhodecode_user']
 
        self.assertEqual(ses.get('username'), username)
 
        response = response.follow()
 
        self.assertEqual(ses.get('is_authenticated'), True)
 

	
 
        return response.session['rhodecode_user']
 

	
 
    def _get_logged_user(self):
 
        return User.get_by_username(self._logged_username)
 

	
 
    def checkSessionFlash(self, response, msg):
 
        self.assertTrue('flash' in response.session)
 
        if not msg in response.session['flash'][0][1]:
 
            self.fail(
 
                'msg `%s` not found in session flash: got `%s` instead' % (
 
                      msg, response.session['flash'][0][1])
 
                      msg, response.session['flash'])
 
            )
rhodecode/tests/functional/test_admin_settings.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
from rhodecode.lib.auth import get_crypt_password, check_password
 
from rhodecode.model.db import User, RhodeCodeSetting
 
from rhodecode.model.db import User, RhodeCodeSetting, Repository
 
from rhodecode.tests import *
 
from rhodecode.lib import helpers as h
 
from rhodecode.model.user import UserModel
 
from rhodecode.model.scm import ScmModel
 

	
 

	
 
class TestAdminSettingsController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url('admin_settings'))
 
        # Test response...
 

	
 
    def test_index_as_xml(self):
 
        response = self.app.get(url('formatted_admin_settings', format='xml'))
 

	
 
    def test_create(self):
 
        response = self.app.post(url('admin_settings'))
 

	
 
    def test_new(self):
 
        response = self.app.get(url('admin_new_setting'))
 

	
 
    def test_new_as_xml(self):
 
        response = self.app.get(url('formatted_admin_new_setting', format='xml'))
 

	
 
    def test_update(self):
 
        response = self.app.put(url('admin_setting', setting_id=1))
 

	
 
    def test_update_browser_fakeout(self):
 
        response = self.app.post(url('admin_setting', setting_id=1), params=dict(_method='put'))
 

	
 
    def test_delete(self):
 
        response = self.app.delete(url('admin_setting', setting_id=1))
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('admin_setting', setting_id=1), params=dict(_method='delete'))
 

	
 
    def test_show(self):
 
        response = self.app.get(url('admin_setting', setting_id=1))
 

	
 
    def test_show_as_xml(self):
 
        response = self.app.get(url('formatted_admin_setting', setting_id=1, format='xml'))
 

	
 
    def test_edit(self):
 
        response = self.app.get(url('admin_edit_setting', setting_id=1))
 

	
 
    def test_edit_as_xml(self):
 
        response = self.app.get(url('formatted_admin_edit_setting',
 
                                    setting_id=1, format='xml'))
 

	
 
    def test_ga_code_active(self):
 
        self.log_user()
 
        old_title = 'RhodeCode'
 
@@ -166,48 +167,100 @@ class TestAdminSettingsController(TestCo
 
                #my account cannot deactivate account
 
                params['active'] = True
 
            if name == 'admin':
 
                #my account cannot make you an admin !
 
                params['admin'] = False
 

	
 
            self.assertEqual(params, updated_params)
 

	
 
        finally:
 
            UserModel().delete('testme')
 

	
 
    def test_my_account_update_err_email_exists(self):
 
        self.log_user()
 

	
 
        new_email = 'test_regular@mail.com'  # already exisitn email
 
        response = self.app.put(url('admin_settings_my_account_update'),
 
                                params=dict(
 
                                    username='test_admin',
 
                                    new_password='test12',
 
                                    password_confirmation='test122',
 
                                    firstname='NewName',
 
                                    lastname='NewLastname',
 
                                    email=new_email,)
 
                                )
 

	
 
        response.mustcontain('This e-mail address is already taken')
 

	
 
    def test_my_account_update_err(self):
 
        self.log_user('test_regular2', 'test12')
 

	
 
        new_email = 'newmail.pl'
 
        response = self.app.post(url('admin_settings_my_account_update'),
 
                                 params=dict(
 
                                            _method='put',
 
                                            username='test_admin',
 
                                            new_password='test12',
 
                                            password_confirmation='test122',
 
                                            firstname='NewName',
 
                                            lastname='NewLastname',
 
                                            email=new_email,)
 
                                 )
 

	
 
        response.mustcontain('An email address must contain a single @')
 
        from rhodecode.model import validators
 
        msg = validators.ValidUsername(edit=False,
 
                                    old_data={})._messages['username_exists']
 
        msg = h.html_escape(msg % {'username': 'test_admin'})
 
        response.mustcontain(u"%s" % msg)
 

	
 
    def test_set_repo_fork_has_no_self_id(self):
 
        self.log_user()
 
        repo = Repository.get_by_repo_name(HG_REPO)
 
        response = self.app.get(url('edit_repo', repo_name=HG_REPO))
 
        opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id
 
        assert opt not in response.body
 

	
 
    def test_set_fork_of_repo(self):
 
        self.log_user()
 
        repo = Repository.get_by_repo_name(HG_REPO)
 
        repo2 = Repository.get_by_repo_name(GIT_REPO)
 
        response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
 
                                 params=dict(
 
                                    id_fork_of=repo2.repo_id
 
                                 ))
 
        repo = Repository.get_by_repo_name(HG_REPO)
 
        repo2 = Repository.get_by_repo_name(GIT_REPO)
 
        self.checkSessionFlash(response,
 
        'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))
 

	
 
        assert repo.fork == repo2
 
        response = response.follow()
 
        # check if given repo is selected
 

	
 
        opt = """<option value="%s" selected="selected">%s</option>""" % (
 
                                                repo2.repo_id, repo2.repo_name)
 
        response.mustcontain(opt)
 

	
 
        # clean session flash
 
        #response = self.app.get(url('edit_repo', repo_name=HG_REPO))
 

	
 
        ## mark it as None
 
        response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
 
                                 params=dict(
 
                                    id_fork_of=None
 
                                 ))
 
        repo = Repository.get_by_repo_name(HG_REPO)
 
        repo2 = Repository.get_by_repo_name(GIT_REPO)
 
        self.checkSessionFlash(response,
 
        'Marked repo %s as fork of %s' % (repo.repo_name, "Nothing"))
 
        assert repo.fork == None
 

	
 
    def test_set_fork_of_same_repo(self):
 
        self.log_user()
 
        repo = Repository.get_by_repo_name(HG_REPO)
 
        response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
 
                                 params=dict(
 
                                    id_fork_of=repo.repo_id
 
                                 ))
 
        self.checkSessionFlash(response,
 
                               'An error occurred during this operation')
 
\ No newline at end of file
0 comments (0 inline, 0 general)