Changeset - 2576a20d94ca
[Not reviewed]
beta
0 4 0
Marcin Kuzminski - 13 years ago 2013-05-12 00:41:38
marcin@python-works.com
Gist: don't allow files inside directories when creating gists
4 files changed with 35 insertions and 3 deletions:
0 comments (0 inline, 0 general)
rhodecode/model/forms.py
Show inline comments
 
@@ -331,104 +331,105 @@ def DefaultPermissionsForm(repo_perms_ch
 

	
 
        default_register = v.OneOf(register_choices)
 
        default_extern_activate = v.OneOf(extern_activate_choices)
 
    return _DefaultPermissionsForm
 

	
 

	
 
def CustomDefaultPermissionsForm():
 
    class _CustomDefaultPermissionsForm(formencode.Schema):
 
        filter_extra_fields = True
 
        allow_extra_fields = True
 
        inherit_default_permissions = v.StringBoolean(if_missing=False)
 

	
 
        create_repo_perm = v.StringBoolean(if_missing=False)
 
        create_user_group_perm = v.StringBoolean(if_missing=False)
 
        #create_repo_group_perm Impl. later
 

	
 
        fork_repo_perm = v.StringBoolean(if_missing=False)
 

	
 
    return _CustomDefaultPermissionsForm
 

	
 

	
 
def DefaultsForm(edit=False, old_data={}, supported_backends=BACKENDS.keys()):
 
    class _DefaultsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        default_repo_type = v.OneOf(supported_backends)
 
        default_repo_private = v.StringBoolean(if_missing=False)
 
        default_repo_enable_statistics = v.StringBoolean(if_missing=False)
 
        default_repo_enable_downloads = v.StringBoolean(if_missing=False)
 
        default_repo_enable_locking = v.StringBoolean(if_missing=False)
 

	
 
    return _DefaultsForm
 

	
 

	
 
def LdapSettingsForm(tls_reqcert_choices, search_scope_choices,
 
                     tls_kind_choices):
 
    class _LdapSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        #pre_validators = [LdapLibValidator]
 
        ldap_active = v.StringBoolean(if_missing=False)
 
        ldap_host = v.UnicodeString(strip=True,)
 
        ldap_port = v.Number(strip=True,)
 
        ldap_tls_kind = v.OneOf(tls_kind_choices)
 
        ldap_tls_reqcert = v.OneOf(tls_reqcert_choices)
 
        ldap_dn_user = v.UnicodeString(strip=True,)
 
        ldap_dn_pass = v.UnicodeString(strip=True,)
 
        ldap_base_dn = v.UnicodeString(strip=True,)
 
        ldap_filter = v.UnicodeString(strip=True,)
 
        ldap_search_scope = v.OneOf(search_scope_choices)
 
        ldap_attr_login = v.AttrLoginValidator()(not_empty=True)
 
        ldap_attr_firstname = v.UnicodeString(strip=True,)
 
        ldap_attr_lastname = v.UnicodeString(strip=True,)
 
        ldap_attr_email = v.UnicodeString(strip=True,)
 

	
 
    return _LdapSettingsForm
 

	
 

	
 
def UserExtraEmailForm():
 
    class _UserExtraEmailForm(formencode.Schema):
 
        email = All(v.UniqSystemEmail(), v.Email(not_empty=True))
 
    return _UserExtraEmailForm
 

	
 

	
 
def UserExtraIpForm():
 
    class _UserExtraIpForm(formencode.Schema):
 
        ip = v.ValidIp()(not_empty=True)
 
    return _UserExtraIpForm
 

	
 

	
 
def PullRequestForm(repo_id):
 
    class _PullRequestForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 

	
 
        user = v.UnicodeString(strip=True, required=True)
 
        org_repo = v.UnicodeString(strip=True, required=True)
 
        org_ref = v.UnicodeString(strip=True, required=True)
 
        other_repo = v.UnicodeString(strip=True, required=True)
 
        other_ref = v.UnicodeString(strip=True, required=True)
 
        revisions = All(#v.NotReviewedRevisions(repo_id)(),
 
                        v.UniqueList(not_empty=True))
 
        review_members = v.UniqueList(not_empty=True)
 

	
 
        pullrequest_title = v.UnicodeString(strip=True, required=True, min=3)
 
        pullrequest_desc = v.UnicodeString(strip=True, required=False)
 

	
 
        ancestor_rev = v.UnicodeString(strip=True, required=True)
 
        merge_rev = v.UnicodeString(strip=True, required=True)
 

	
 
    return _PullRequestForm
 

	
 

	
 
def GistForm(lifetime_options):
 
    class _GistForm(formencode.Schema):
 

	
 
        filename = v.UnicodeString(strip=True, required=False)
 
        filename = All(v.BasePath()(),
 
                       v.UnicodeString(strip=True, required=False))
 
        description = v.UnicodeString(required=False, if_missing='')
 
        lifetime = v.OneOf(lifetime_options)
 
        content = v.UnicodeString(required=True, not_empty=True)
 
        public = v.UnicodeString(required=False, if_missing='')
 
        private = v.UnicodeString(required=False, if_missing='')
 

	
 
    return _GistForm
rhodecode/model/gist.py
Show inline comments
 
@@ -27,147 +27,150 @@ import os
 
import time
 
import logging
 
import traceback
 
import shutil
 

	
 
from pylons.i18n.translation import _
 
from rhodecode.lib.utils2 import safe_unicode, unique_id, safe_int, \
 
    time_to_datetime, safe_str, AttributeDict
 
from rhodecode.lib.compat import json
 
from rhodecode.lib import helpers as h
 
from rhodecode.model import BaseModel
 
from rhodecode.model.db import Gist
 
from rhodecode.model.repo import RepoModel
 
from rhodecode.model.scm import ScmModel
 

	
 
log = logging.getLogger(__name__)
 

	
 
GIST_STORE_LOC = '.rc_gist_store'
 
GIST_METADATA_FILE = '.rc_gist_metadata'
 

	
 

	
 
class GistModel(BaseModel):
 

	
 
    def _get_gist(self, gist):
 
        """
 
        Helper method to get gist by ID, or gist_access_id as a fallback
 

	
 
        :param gist: GistID, gist_access_id, or Gist instance
 
        """
 
        return self._get_instance(Gist, gist,
 
                                  callback=Gist.get_by_access_id)
 

	
 
    def __delete_gist(self, gist):
 
        """
 
        removes gist from filesystem
 

	
 
        :param gist: gist object
 
        """
 
        root_path = RepoModel().repos_path
 
        rm_path = os.path.join(root_path, GIST_STORE_LOC, gist.gist_access_id)
 
        log.info("Removing %s" % (rm_path))
 
        shutil.rmtree(rm_path)
 

	
 
    def get_gist(self, gist):
 
        return self._get_gist(gist)
 

	
 
    def get_gist_files(self, gist_access_id):
 
        """
 
        Get files for given gist
 

	
 
        :param gist_access_id:
 
        """
 
        repo = Gist.get_by_access_id(gist_access_id)
 
        cs = repo.scm_instance.get_changeset()
 
        return (
 
         cs, [n for n in cs.get_node('/')]
 
        )
 

	
 
    def create(self, description, owner, gist_mapping,
 
               gist_type=Gist.GIST_PUBLIC, lifetime=-1):
 
        """
 

	
 
        :param description: description of the gist
 
        :param owner: user who created this gist
 
        :param gist_mapping: mapping {filename:{'content':content},...}
 
        :param gist_type: type of gist private/public
 
        :param lifetime: in minutes, -1 == forever
 
        """
 
        gist_id = safe_unicode(unique_id(20))
 
        lifetime = safe_int(lifetime, -1)
 
        gist_expires = time.time() + (lifetime * 60) if lifetime != -1 else -1
 
        log.debug('set GIST expiration date to: %s'
 
                  % (time_to_datetime(gist_expires)
 
                   if gist_expires != -1 else 'forever'))
 
        #create the Database version
 
        gist = Gist()
 
        gist.gist_description = description
 
        gist.gist_access_id = gist_id
 
        gist.gist_owner = owner.user_id
 
        gist.gist_expires = gist_expires
 
        gist.gist_type = safe_unicode(gist_type)
 
        self.sa.add(gist)
 
        self.sa.flush()
 
        if gist_type == Gist.GIST_PUBLIC:
 
            # use DB ID for easy to use GIST ID
 
            gist_id = safe_unicode(gist.gist_id)
 
            gist.gist_access_id = gist_id
 
            self.sa.add(gist)
 

	
 
        gist_repo_path = os.path.join(GIST_STORE_LOC, gist_id)
 
        log.debug('Creating new %s GIST repo in %s' % (gist_type, gist_repo_path))
 
        repo = RepoModel()._create_repo(repo_name=gist_repo_path, alias='hg',
 
                                        parent=None)
 

	
 
        processed_mapping = {}
 
        for filename in gist_mapping:
 
            if filename != os.path.basename(filename):
 
                raise Exception('Filename cannot be inside a directory')
 

	
 
            content = gist_mapping[filename]['content']
 
            #TODO: expand support for setting explicit lexers
 
#             if lexer is None:
 
#                 try:
 
#                     lexer = pygments.lexers.guess_lexer_for_filename(filename,content)
 
#                 except pygments.util.ClassNotFound:
 
#                     lexer = 'text'
 
            processed_mapping[filename] = {'content': content}
 

	
 
        # now create single multifile commit
 
        message = 'added file'
 
        message += 's: ' if len(processed_mapping) > 1 else ': '
 
        message += ', '.join([x for x in processed_mapping])
 

	
 
        #fake RhodeCode Repository object
 
        fake_repo = AttributeDict(dict(
 
            repo_name=gist_repo_path,
 
            scm_instance_no_cache=lambda: repo,
 
        ))
 
        ScmModel().create_nodes(
 
            user=owner.user_id, repo=fake_repo,
 
            message=message,
 
            nodes=processed_mapping,
 
            trigger_push_hook=False
 
        )
 
        # store metadata inside the gist, this can be later used for imports
 
        # or gist identification
 
        metadata = {
 
            'gist_db_id': gist.gist_id,
 
            'gist_access_id': gist.gist_access_id,
 
            'gist_owner_id': owner.user_id,
 
            'gist_type': gist.gist_type,
 
            'gist_exipres': gist.gist_expires
 
        }
 
        with open(os.path.join(repo.path, '.hg', GIST_METADATA_FILE), 'wb') as f:
 
            f.write(json.dumps(metadata))
 
        return gist
 

	
 
    def delete(self, gist, fs_remove=True):
 
        gist = self._get_gist(gist)
 

	
 
        try:
 
            self.sa.delete(gist)
 
            if fs_remove:
 
                self.__delete_gist(gist)
 
            else:
 
                log.debug('skipping removal from filesystem')
 

	
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
rhodecode/model/validators.py
Show inline comments
 
@@ -675,135 +675,153 @@ def UniqSystemEmail(old_data={}):
 
                user = User.get_by_email(value, case_insensitive=True)
 
                if user:
 
                    msg = M(self, 'email_taken', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(email=msg)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidSystemEmail():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'non_existing_email': _(u'e-mail "%(email)s" does not exist.')
 
        }
 

	
 
        def _to_python(self, value, state):
 
            return value.lower()
 

	
 
        def validate_python(self, value, state):
 
            user = User.get_by_email(value, case_insensitive=True)
 
            if user is None:
 
                msg = M(self, 'non_existing_email', state, email=value)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(email=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def LdapLibValidator():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 

	
 
        }
 

	
 
        def validate_python(self, value, state):
 
            try:
 
                import ldap
 
                ldap  # pyflakes silence !
 
            except ImportError:
 
                raise LdapImportError()
 

	
 
    return _validator
 

	
 

	
 
def AttrLoginValidator():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_cn':
 
                  _(u'The LDAP Login attribute of the CN must be specified - '
 
                    'this is the name of the attribute that is equivalent '
 
                    'to "username"')
 
        }
 
        messages['empty'] = messages['invalid_cn']
 

	
 
    return _validator
 

	
 

	
 
def NotReviewedRevisions(repo_id):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'rev_already_reviewed':
 
                  _(u'Revisions %(revs)s are already part of pull request '
 
                    'or have set status')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            # check revisions if they are not reviewed, or a part of another
 
            # pull request
 
            statuses = ChangesetStatus.query()\
 
                .filter(ChangesetStatus.revision.in_(value))\
 
                .filter(ChangesetStatus.repo_id == repo_id)\
 
                .all()
 

	
 
            errors = []
 
            for cs in statuses:
 
                if cs.pull_request_id:
 
                    errors.append(['pull_req', cs.revision[:12]])
 
                elif cs.status:
 
                    errors.append(['status', cs.revision[:12]])
 

	
 
            if errors:
 
                revs = ','.join([x[1] for x in errors])
 
                msg = M(self, 'rev_already_reviewed', state, revs=revs)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(revisions=revs)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidIp():
 
    class _validator(CIDR):
 
        messages = dict(
 
            badFormat=_('Please enter a valid IPv4 or IpV6 address'),
 
            illegalBits=_('The network size (bits) must be within the range'
 
                ' of 0-32 (not %(bits)r)'))
 
                ' of 0-32 (not %(bits)r)')
 
        )
 

	
 
        def to_python(self, value, state):
 
            v = super(_validator, self).to_python(value, state)
 
            v = v.strip()
 
            net = ipaddr.IPNetwork(address=v)
 
            if isinstance(net, ipaddr.IPv4Network):
 
                #if IPv4 doesn't end with a mask, add /32
 
                if '/' not in value:
 
                    v += '/32'
 
            if isinstance(net, ipaddr.IPv6Network):
 
                #if IPv6 doesn't end with a mask, add /128
 
                if '/' not in value:
 
                    v += '/128'
 
            return v
 

	
 
        def validate_python(self, value, state):
 
            try:
 
                addr = value.strip()
 
                #this raises an ValueError if address is not IpV4 or IpV6
 
                ipaddr.IPNetwork(address=addr)
 
            except ValueError:
 
                raise formencode.Invalid(self.message('badFormat', state),
 
                                         value, state)
 

	
 
    return _validator
 

	
 

	
 
def FieldKey():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = dict(
 
            badFormat=_('Key name can only consist of letters, '
 
                        'underscore, dash or numbers'),)
 
                        'underscore, dash or numbers')
 
        )
 

	
 
        def validate_python(self, value, state):
 
            if not re.match('[a-zA-Z0-9_-]+$', value):
 
                raise formencode.Invalid(self.message('badFormat', state),
 
                                         value, state)
 
    return _validator
 

	
 

	
 
def BasePath():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = dict(
 
            badPath=_('Filename cannot be inside a directory')
 
        )
 

	
 
        def _to_python(self, value, state):
 
            return value
 

	
 
        def validate_python(self, value, state):
 
            if value != os.path.basename(value):
 
                raise formencode.Invalid(self.message('badPath', state),
 
                                         value, state)
 
    return _validator
rhodecode/tests/functional/test_admin_gists.py
Show inline comments
 
import datetime
 

	
 
from rhodecode.tests import *
 
from rhodecode.model.gist import GistModel
 
from rhodecode.model.meta import Session
 
from rhodecode.model.db import User, Gist
 

	
 

	
 
def _create_gist(f_name, content='some gist', lifetime=-1,
 
                 description='gist-desc', gist_type='public',
 
                 owner=TEST_USER_ADMIN_LOGIN):
 
    gist_mapping = {
 
        f_name: {'content': content}
 
    }
 
    user = User.get_by_username(owner)
 
    gist = GistModel().create(description, owner=user,
 
                       gist_mapping=gist_mapping, gist_type=gist_type,
 
                       lifetime=lifetime)
 
    Session().commit()
 
    return gist
 

	
 

	
 
class TestGistsController(TestController):
 

	
 
    def tearDown(self):
 
        for g in Gist.get_all():
 
            GistModel().delete(g)
 
        Session().commit()
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('gists'))
 
        # Test response...
 
        response.mustcontain('There are no gists yet')
 

	
 
        _create_gist('gist1')
 
        _create_gist('gist2', lifetime=1400)
 
        _create_gist('gist3', description='gist3-desc')
 
        _create_gist('gist4', gist_type='private')
 
        response = self.app.get(url('gists'))
 
        # Test response...
 
        response.mustcontain('gist:1')
 
        response.mustcontain('gist:2')
 
        response.mustcontain('Expires: in 23 hours')  # we don't care about the end
 
        response.mustcontain('gist:3')
 
        response.mustcontain('gist3-desc')
 
        response.mustcontain(no=['gist:4'])
 

	
 
    def test_index_private_gists(self):
 
        self.log_user()
 
        gist = _create_gist('gist5', gist_type='private')
 
        response = self.app.get(url('gists', private=1))
 
        # Test response...
 

	
 
        #and privates
 
        response.mustcontain('gist:%s' % gist.gist_access_id)
 

	
 
    def test_create_missing_description(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
                                 params={'lifetime': -1}, status=200)
 

	
 
        response.mustcontain('Missing value')
 

	
 
    def test_create(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
                                 params={'lifetime': -1,
 
                                         'content': 'gist test',
 
                                         'filename': 'foo',
 
                                         'public': 'public'},
 
                                 status=302)
 
        response = response.follow()
 
        response.mustcontain('added file: foo')
 
        response.mustcontain('gist test')
 
        response.mustcontain('<div class="ui-btn green badge">Public gist</div>')
 

	
 
    def test_create_with_path_with_dirs(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
                                 params={'lifetime': -1,
 
                                         'content': 'gist test',
 
                                         'filename': '/home/foo',
 
                                         'public': 'public'},
 
                                 status=200)
 
        response.mustcontain('Filename cannot be inside a directory')
 

	
 
    def test_access_expired_gist(self):
 
        self.log_user()
 
        gist = _create_gist('never-see-me')
 
        gist.gist_expires = 0  # 1970
 
        Session().add(gist)
 
        Session().commit()
 

	
 
        response = self.app.get(url('gist', id=gist.gist_access_id), status=404)
 

	
 
    def test_create_private(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
                                 params={'lifetime': -1,
 
                                         'content': 'private gist test',
 
                                         'filename': 'private-foo',
 
                                         'private': 'private'},
 
                                 status=302)
 
        response = response.follow()
 
        response.mustcontain('added file: private-foo<')
 
        response.mustcontain('private gist test')
 
        response.mustcontain('<div class="ui-btn yellow badge">Private gist</div>')
 

	
 
    def test_create_with_description(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
                                 params={'lifetime': -1,
 
                                         'content': 'gist test',
 
                                         'filename': 'foo-desc',
 
                                         'description': 'gist-desc',
 
                                         'public': 'public'},
 
                                 status=302)
 
        response = response.follow()
 
        response.mustcontain('added file: foo-desc')
 
        response.mustcontain('gist test')
 
        response.mustcontain('gist-desc')
 
        response.mustcontain('<div class="ui-btn green badge">Public gist</div>')
 

	
 
    def test_new(self):
 
        self.log_user()
 
        response = self.app.get(url('new_gist'))
 

	
 
    def test_update(self):
 
        self.skipTest('not implemented')
 
        response = self.app.put(url('gist', id=1))
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        gist = _create_gist('delete-me')
 
        response = self.app.delete(url('gist', id=gist.gist_id))
 
        self.checkSessionFlash(response, 'Deleted gist %s' % gist.gist_id)
 

	
 
    def test_delete_normal_user_his_gist(self):
 
        self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
 
        gist = _create_gist('delete-me', owner=TEST_USER_REGULAR_LOGIN)
 
        response = self.app.delete(url('gist', id=gist.gist_id))
 
        self.checkSessionFlash(response, 'Deleted gist %s' % gist.gist_id)
 

	
 
    def test_delete_normal_user_not_his_own_gist(self):
 
        self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
 
        gist = _create_gist('delete-me')
 
        response = self.app.delete(url('gist', id=gist.gist_id), status=403)
 

	
 
    def test_show(self):
 
        gist = _create_gist('gist-show-me')
 
        response = self.app.get(url('gist', id=gist.gist_access_id))
 
        response.mustcontain('added file: gist-show-me<')
 
        response.mustcontain('test_admin (RhodeCode Admin) - created')
 
        response.mustcontain('gist-desc')
 
        response.mustcontain('<div class="ui-btn green badge">Public gist</div>')
 

	
 
    def test_edit(self):
 
        self.skipTest('not implemented')
 
        response = self.app.get(url('edit_gist', id=1))
0 comments (0 inline, 0 general)