Changeset - 54bc7a89f090
[Not reviewed]
beta
0 3 0
Marcin Kuzminski - 13 years ago 2013-05-11 22:43:54
marcin@python-works.com
gists: add some API related code improvements
3 files changed with 50 insertions and 7 deletions:
0 comments (0 inline, 0 general)
rhodecode/model/db.py
Show inline comments
 
@@ -2107,64 +2107,107 @@ class UserNotification(Base, BaseModel):
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'}
 
    )
 
    user_id = Column('user_id', Integer(), ForeignKey('users.user_id'), primary_key=True)
 
    notification_id = Column("notification_id", Integer(), ForeignKey('notifications.notification_id'), primary_key=True)
 
    read = Column('read', Boolean, default=False)
 
    sent_on = Column('sent_on', DateTime(timezone=False), nullable=True, unique=None)
 

	
 
    user = relationship('User', lazy="joined")
 
    notification = relationship('Notification', lazy="joined",
 
                                order_by=lambda: Notification.created_on.desc(),)
 

	
 
    def mark_as_read(self):
 
        self.read = True
 
        Session().add(self)
 

	
 

	
 
class Gist(Base, BaseModel):
 
    __tablename__ = 'gists'
 
    __table_args__ = (
 
        Index('g_gist_access_id_idx', 'gist_access_id'),
 
        Index('g_created_on_idx', 'created_on'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'}
 
    )
 
    GIST_PUBLIC = u'public'
 
    GIST_PRIVATE = u'private'
 

	
 
    gist_id = Column('gist_id', Integer(), primary_key=True)
 
    gist_access_id = Column('gist_access_id', UnicodeText(1024))
 
    gist_description = Column('gist_description', UnicodeText(1024))
 
    gist_owner = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=True)
 
    gist_expires = Column('gist_expires', Float(), nullable=False)
 
    gist_type = Column('gist_type', Unicode(128), nullable=False)
 
    created_on = Column('created_on', DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    modified_at = Column('modified_at', DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    owner = relationship('User')
 

	
 
    @classmethod
 
    def get_or_404(cls, id_):
 
        res = cls.query().filter(cls.gist_access_id == id_).scalar()
 
        if not res:
 
            raise HTTPNotFound
 
        return res
 

	
 
    @classmethod
 
    def get_by_access_id(cls, gist_access_id):
 
        return cls.query().filter(cls.gist_access_id==gist_access_id).scalar()
 
        return cls.query().filter(cls.gist_access_id == gist_access_id).scalar()
 

	
 
    def gist_url(self):
 
        from pylons import url
 
        return url('gist', id=self.gist_access_id, qualified=True)
 

	
 
    @classmethod
 
    def base_path(cls):
 
        """
 
        Returns base path when all gists are stored
 

	
 
        :param cls:
 
        """
 
        from rhodecode.model.gist import GIST_STORE_LOC
 
        q = Session().query(RhodeCodeUi)\
 
            .filter(RhodeCodeUi.ui_key == URL_SEP)
 
        q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
 
        return os.path.join(q.one().ui_value, GIST_STORE_LOC)
 

	
 
    def get_api_data(self):
 
        """
 
        Common function for generating gist related data for API
 
        """
 
        gist = self
 
        data = dict(
 
            gist_id=gist.gist_id,
 
            type=gist.gist_type,
 
            access_id=gist.gist_access_id,
 
            description=gist.gist_description,
 
            url=gist.gist_url(),
 
            expires=gist.gist_expires,
 
            created_on=gist.created_on,
 
        )
 
        return data
 

	
 
    def __json__(self):
 
        data = dict(
 
        )
 
        data.update(self.get_api_data())
 
        return data
 
    ## SCM functions
 

	
 
    @property
 
    def scm_instance(self):
 
        from rhodecode.lib.vcs import get_repo
 
        base_path = self.base_path()
 
        return get_repo(os.path.join(*map(safe_str,
 
                                          [base_path, self.gist_access_id])))
 

	
 

	
 
class DbMigrateVersion(Base, BaseModel):
 
    __tablename__ = 'db_migrate_version'
 
    __table_args__ = (
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'},
 
    )
 
    repository_id = Column('repository_id', String(250), primary_key=True)
 
    repository_path = Column('repository_path', Text)
 
    version = Column('version', Integer)
rhodecode/model/gist.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.model.gist
 
    ~~~~~~~~~~~~~~~~~~~~
 

	
 
    gist model for RhodeCode
 

	
 
    :created_on: May 9, 2013
 
    :author: marcink
 
    :copyright: (C) 2011-2013 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
from __future__ import with_statement
 
import os
 
import time
 
import logging
 
import traceback
 
import shutil
 

	
 
from pylons.i18n.translation import _
 
from rhodecode.lib.utils2 import safe_unicode, unique_id, safe_int, \
 
    time_to_datetime, safe_str, AttributeDict
 
from rhodecode.lib.compat import json
 
from rhodecode.lib import helpers as h
 
from rhodecode.model import BaseModel
 
from rhodecode.model.db import Gist
 
from rhodecode.model.repo import RepoModel
 
from rhodecode.model.scm import ScmModel
 
from rhodecode.lib.vcs import get_repo
 

	
 
log = logging.getLogger(__name__)
 

	
 
GIST_STORE_LOC = '.rc_gist_store'
 
GIST_METADATA_FILE = '.rc_gist_metadata'
 

	
 

	
 
class GistModel(BaseModel):
 

	
 
    def _get_gist(self, gist):
 
        """
 
        Helper method to get gist by ID, or gist_access_id as a fallback
 

	
 
        :param gist: GistID, gist_access_id, or Gist instance
 
        """
 
        return self._get_instance(Gist, gist,
 
                                  callback=Gist.get_by_access_id)
 

	
 
    def __delete_gist(self, gist):
 
        """
 
        removes gist from filesystem
 

	
 
        :param gist: gist object
 
        """
 
        root_path = RepoModel().repos_path
 
        rm_path = os.path.join(root_path, GIST_STORE_LOC, gist.gist_access_id)
 
        log.info("Removing %s" % (rm_path))
 
        shutil.rmtree(rm_path)
 

	
 
    def get_gist(self, gist):
 
        return self._get_gist(gist)
 

	
 
    def get_gist_files(self, gist_access_id):
 
        """
 
        Get files for given gist
 

	
 
        :param gist_access_id:
 
        """
 
        root_path = RepoModel().repos_path
 
        r = get_repo(os.path.join(*map(safe_str,
 
                                [root_path, GIST_STORE_LOC, gist_access_id])))
 
        cs = r.get_changeset()
 
        repo = Gist.get_by_access_id(gist_access_id)
 
        cs = repo.scm_instance.get_changeset()
 
        return (
 
         cs, [n for n in cs.get_node('/')]
 
        )
 

	
 
    def create(self, description, owner, gist_mapping,
 
               gist_type=Gist.GIST_PUBLIC, lifetime=-1):
 
        """
 

	
 
        :param description: description of the gist
 
        :param owner: user who created this gist
 
        :param gist_mapping: mapping {filename:{'content':content},...}
 
        :param gist_type: type of gist private/public
 
        :param lifetime: in minutes, -1 == forever
 
        """
 
        gist_id = safe_unicode(unique_id(20))
 
        lifetime = safe_int(lifetime, -1)
 
        gist_expires = time.time() + (lifetime * 60) if lifetime != -1 else -1
 
        log.debug('set GIST expiration date to: %s'
 
                  % (time_to_datetime(gist_expires)
 
                   if gist_expires != -1 else 'forever'))
 
        #create the Database version
 
        gist = Gist()
 
        gist.gist_description = description
 
        gist.gist_access_id = gist_id
 
        gist.gist_owner = owner.user_id
 
        gist.gist_expires = gist_expires
 
        gist.gist_type = safe_unicode(gist_type)
 
        self.sa.add(gist)
 
        self.sa.flush()
 
        if gist_type == Gist.GIST_PUBLIC:
 
            # use DB ID for easy to use GIST ID
 
            gist_id = safe_unicode(gist.gist_id)
 
            gist.gist_access_id = gist_id
 
            self.sa.add(gist)
 

	
 
        gist_repo_path = os.path.join(GIST_STORE_LOC, gist_id)
 
        log.debug('Creating new %s GIST repo in %s' % (gist_type, gist_repo_path))
 
        repo = RepoModel()._create_repo(repo_name=gist_repo_path, alias='hg',
 
                                        parent=None)
 

	
 
        processed_mapping = {}
 
        for filename in gist_mapping:
 
            content = gist_mapping[filename]['content']
 
            #TODO: expand support for setting explicit lexers
 
#             if lexer is None:
 
#                 try:
 
#                     lexer = pygments.lexers.guess_lexer_for_filename(filename,content)
 
#                 except pygments.util.ClassNotFound:
rhodecode/tests/functional/test_admin_gists.py
Show inline comments
 
@@ -71,55 +71,55 @@ class TestGistsController(TestController
 
        response.mustcontain('added file: foo')
 
        response.mustcontain('gist test')
 
        response.mustcontain('<div class="ui-btn green badge">Public gist</div>')
 

	
 
    def test_create_private(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
                                 params={'lifetime': -1,
 
                                         'content': 'private gist test',
 
                                         'filename': 'private-foo',
 
                                         'private': 'private'},
 
                                 status=302)
 
        response = response.follow()
 
        response.mustcontain('added file: private-foo<')
 
        response.mustcontain('private gist test')
 
        response.mustcontain('<div class="ui-btn yellow badge">Private gist</div>')
 

	
 
    def test_create_with_description(self):
 
        self.log_user()
 
        response = self.app.post(url('gists'),
 
                                 params={'lifetime': -1,
 
                                         'content': 'gist test',
 
                                         'filename': 'foo-desc',
 
                                         'description': 'gist-desc',
 
                                         'public': 'public'},
 
                                 status=302)
 
        response = response.follow()
 
        response.mustcontain('added file: foo-desc')
 
        response.mustcontain('gist test')
 
        response.mustcontain('gist-desc')
 
        response.mustcontain('<div class="ui-btn green badge">Public gist</div>')
 

	
 
    def test_new(self):
 
        self.log_user()
 
        response = self.app.get(url('new_gist'))
 

	
 
    def test_update(self):
 
        self.skipTest('not implemented')
 
        response = self.app.put(url('gist', id=1))
 

	
 
    def test_delete(self):
 
        self.skipTest('not implemented')
 
        response = self.app.delete(url('gist', id=1))
 

	
 
    def test_show(self):
 
        gist = _create_gist('gist-show-me')
 
        response = self.app.get(url('gist', id=gist.gist_access_id))
 
        response.mustcontain('added file: gist-show-me<')
 
        response.mustcontain('test_admin (RhodeCode Admin) - created just now')
 
        response.mustcontain('test_admin (RhodeCode Admin) - created')
 
        response.mustcontain('gist-desc')
 
        response.mustcontain('<div class="ui-btn green badge">Public gist</div>')
 

	
 
    def test_edit(self):
 
        self.skipTest('not implemented')
 
        response = self.app.get(url('edit_gist', id=1))
0 comments (0 inline, 0 general)