Changeset - 034e4fe1ebb2
[Not reviewed]
beta
0 2 2
Marcin Kuzminski - 13 years ago 2012-06-04 02:56:09
marcin@python-works.com
changed dulwich git interface to gitweb + subprocessio
4 files changed with 590 insertions and 4 deletions:
0 comments (0 inline, 0 general)
docs/changelog.rst
Show inline comments
 
.. _changelog:
 

	
 
=========
 
Changelog
 
=========
 

	
 
1.4.0 (**2012-XX-XX**)
 
----------------------
 

	
 
:status: in-progress
 
:branch: beta
 

	
 
news
 
++++
 
 
 
- new codereview system
 
- email map, allowing users to have multiple email addresses mapped into
 
  their accounts
 
- changed setup-app into setup-rhodecode and added default options to it.
 
- new git repos are created as bare now by default
 
- #464 added links to groups in permission box
 
- #465 mentions autocomplete inside comments boxes
 
- #469 added --update-only option to whoosh to re-index only given list
 
  of repos in index 
 
- rhodecode-api CLI client
 
- new git http protocol replaced buggy dulwich implementation.
 
  Now based on pygrack & gitweb
 

	
 
fixes
 
+++++
 

	
 
- improved translations
 
- fixes issue #455 Creating an archive generates an exception on Windows
 
- fixes #448 Download ZIP archive keeps file in /tmp open and results 
 
  in out of disk space
 
- fixes issue #454 Search results under Windows include proceeding
 
  backslash
 
- fixed issue #450. Rhodecode no longer will crash when bad revision is
 
  present in journal data.
 
- fix for issue #417, git execution was broken on windows for certain
 
  commands.
 
- fixed #413. Don't disable .git directory for bare repos on deleting
 
- fixed issue #459. Changed the way of obtaining logger in reindex task.
 

	
 
1.3.6 (**2012-05-17**)
 
----------------------
 

	
 
news
 
++++
 

	
 
- chinese traditional translation
 
- changed setup-app into setup-rhodecode and added arguments for auto-setup 
 
  mode that doesn't need user interaction 
 

	
 
fixes
 
+++++
 

	
 
- fixed no scm found warning
 
- fixed __future__ import error on rcextensions
 
- made simplejson required lib for speedup on JSON encoding
 
- fixes #449 bad regex could get more than revisions from parsing history
 
- don't clear DB session when CELERY_EAGER is turned ON
 

	
 
1.3.5 (**2012-05-10**)
 
----------------------
 

	
 
news
 
++++
 

	
 
- use ext_json for json module
 
- unified annotation view with file source view
 
- notification improvements, better inbox + css
 
- #419 don't strip passwords for login forms, make rhodecode 
 
  more compatible with LDAP servers
 
- Added HTTP_X_FORWARDED_FOR as another method of extracting 
 
  IP for pull/push logs. - moved all to base controller  
 
- #415: Adding comment to changeset causes reload. 
 
  Comments are now added via ajax and doesn't reload the page
 
- #374 LDAP config is discarded when LDAP can't be activated
 
- limited push/pull operations are now logged for git in the journal
 
- bumped mercurial to 2.2.X series
 
- added support for displaying submodules in file-browser
 
- #421 added bookmarks in changelog view
 

	
 
fixes
 
+++++
 

	
 
- fixed dev-version marker for stable when served from source codes
 
- fixed missing permission checks on show forks page
 
- #418 cast to unicode fixes in notification objects
 
- #426 fixed mention extracting regex
 
- fixed remote-pulling for git remotes remopositories
 
- fixed #434: Error when accessing files or changesets of a git repository 
 
  with submodules
 
- fixed issue with empty APIKEYS for users after registration ref. #438
 
- fixed issue with getting README files from git repositories
 

	
 
1.3.4 (**2012-03-28**)
 
----------------------
 

	
 
news
 
++++
 

	
 
- Whoosh logging is now controlled by the .ini files logging setup
 
- added clone-url into edit form on /settings page
 
- added help text into repo add/edit forms
 
- created rcextensions module with additional mappings (ref #322) and
 
  post push/pull/create repo hooks callbacks
 
- implemented #377 Users view for his own permissions on account page
 
- #399 added inheritance of permissions for users group on repos groups
 
- #401 repository group is automatically pre-selected when adding repos 
 
  inside a repository group
 
- added alternative HTTP 403 response when client failed to authenticate. Helps 
 
  solving issues with Mercurial and LDAP
 
- #402 removed group prefix from repository name when listing repositories 
 
  inside a group
 
- added gravatars into permission view and permissions autocomplete
 
- #347 when running multiple RhodeCode instances, properly invalidates cache 
 
  for all registered servers
 

	
 
fixes
 
+++++
 

	
 
- fixed #390 cache invalidation problems on repos inside group
 
- fixed #385 clone by ID url was loosing proxy prefix in URL
 
- fixed some unicode problems with waitress
 
- fixed issue with escaping < and > in changeset commits
 
- fixed error occurring during recursive group creation in API 
 
  create_repo function
 
- fixed #393 py2.5 fixes for routes url generator
 
- fixed #397 Private repository groups shows up before login
 
- fixed #396 fixed problems with revoking users in nested groups
 
- fixed mysql unicode issues + specified InnoDB as default engine with 
 
  utf8 charset
 
- #406 trim long branch/tag names in changelog to not break UI
 
  
 
1.3.3 (**2012-03-02**)
 
----------------------
 

	
 
news
 
++++
 

	
 

	
 
fixes
 
+++++
 

	
 
- fixed some python2.5 compatibility issues 
 
- fixed issues with removed repos was accidentally added as groups, after
 
  full rescan of paths
 
- fixes #376 Cannot edit user (using container auth)
 
- fixes #378 Invalid image urls on changeset screen with proxy-prefix 
 
  configuration
 
- fixed initial sorting of repos inside repo group
 
- fixes issue when user tried to resubmit same permission into user/user_groups
 
- bumped beaker version that fixes #375 leap error bug
 
- fixed raw_changeset for git. It was generated with hg patch headers
 
- fixed vcs issue with last_changeset for filenodes
 
- fixed missing commit after hook delete
 
- fixed #372 issues with git operation detection that caused a security issue 
 
  for git repos
 

	
 
1.3.2 (**2012-02-28**)
 
----------------------
 

	
 
news
 
++++
 

	
 

	
 
fixes
 
+++++
 

	
 
- fixed git protocol issues with repos-groups
 
- fixed git remote repos validator that prevented from cloning remote git repos
 
- fixes #370 ending slashes fixes for repo and groups
 
- fixes #368 improved git-protocol detection to handle other clients
 
- fixes #366 When Setting Repository Group To Blank Repo Group Wont Be 
 
  Moved To Root
 
- fixes #371 fixed issues with beaker/sqlalchemy and non-ascii cache keys 
 
- fixed #373 missing cascade drop on user_group_to_perm table
 

	
 
1.3.1 (**2012-02-27**)
 
----------------------
 

	
 
news
 
++++
 

	
 

	
 
fixes
 
+++++
 

	
 
- redirection loop occurs when remember-me wasn't checked during login
 
- fixes issues with git blob history generation 
 
- don't fetch branch for git in file history dropdown. Causes unneeded slowness
 

	
 
1.3.0 (**2012-02-26**)
 
----------------------
 

	
 
news
 
++++
 

	
 
- code review, inspired by github code-comments 
 
- #215 rst and markdown README files support
 
- #252 Container-based and proxy pass-through authentication support
 
- #44 branch browser. Filtering of changelog by branches
 
- mercurial bookmarks support
 
- new hover top menu, optimized to add maximum size for important views
 
- configurable clone url template with possibility to specify  protocol like 
 
  ssh:// or http:// and also manually alter other parts of clone_url.
 
- enabled largefiles extension by default
 
- optimized summary file pages and saved a lot of unused space in them
 
- #239 option to manually mark repository as fork
 
- #320 mapping of commit authors to RhodeCode users
 
- #304 hashes are displayed using monospace font    
 
- diff configuration, toggle white lines and context lines
 
- #307 configurable diffs, whitespace toggle, increasing context lines
 
- sorting on branches, tags and bookmarks using YUI datatable
 
- improved file filter on files page
 
- implements #330 api method for listing nodes ar particular revision
 
- #73 added linking issues in commit messages to chosen issue tracker url
rhodecode/lib/middleware/pygrack.py
Show inline comments
 
new file 100644
 
import os
 
import socket
 
import logging
 
import subprocess
 

	
 
from webob import Request, Response, exc
 

	
 
from rhodecode.lib import subprocessio
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class FileWrapper(object):
 

	
 
    def __init__(self, fd, content_length):
 
        self.fd = fd
 
        self.content_length = content_length
 
        self.remain = content_length
 

	
 
    def read(self, size):
 
        if size <= self.remain:
 
            try:
 
                data = self.fd.read(size)
 
            except socket.error:
 
                raise IOError(self)
 
            self.remain -= size
 
        elif self.remain:
 
            data = self.fd.read(self.remain)
 
            self.remain = 0
 
        else:
 
            data = None
 
        return data
 

	
 
    def __repr__(self):
 
        return '<FileWrapper %s len: %s, read: %s>' % (
 
            self.fd, self.content_length, self.content_length - self.remain
 
        )
 

	
 

	
 
class GitRepository(object):
 
    git_folder_signature = set(['config', 'head', 'info', 'objects', 'refs'])
 
    commands = ['git-upload-pack', 'git-receive-pack']
 

	
 
    def __init__(self, repo_name, content_path):
 
        files = set([f.lower() for f in os.listdir(content_path)])
 
        if  not (self.git_folder_signature.intersection(files)
 
                == self.git_folder_signature):
 
            raise OSError('%s missing git signature' % content_path)
 
        self.content_path = content_path
 
        self.valid_accepts = ['application/x-%s-result' %
 
                              c for c in self.commands]
 
        self.repo_name = repo_name
 

	
 
    def _get_fixedpath(self, path):
 
        """
 
        Small fix for repo_path
 

	
 
        :param path:
 
        :type path:
 
        """
 
        return path.split(self.repo_name, 1)[-1].strip('/')
 

	
 
    def inforefs(self, request, environ):
 
        """
 
        WSGI Response producer for HTTP GET Git Smart
 
        HTTP /info/refs request.
 
        """
 

	
 
        git_command = request.GET['service']
 
        if git_command not in self.commands:
 
            log.debug('command %s not allowed' % git_command)
 
            return exc.HTTPMethodNotAllowed()
 

	
 
        # note to self:
 
        # please, resist the urge to add '\n' to git capture and increment
 
        # line count by 1.
 
        # The code in Git client not only does NOT need '\n', but actually
 
        # blows up if you sprinkle "flush" (0000) as "0001\n".
 
        # It reads binary, per number of bytes specified.
 
        # if you do add '\n' as part of data, count it.
 
        smart_server_advert = '# service=%s' % git_command
 
        try:
 
            out = subprocessio.SubprocessIOChunker(
 
                r'git %s --stateless-rpc --advertise-refs "%s"' % (
 
                                git_command[4:], self.content_path),
 
                starting_values=[
 
                    str(hex(len(smart_server_advert) + 4)[2:]
 
                        .rjust(4, '0') + smart_server_advert + '0000')
 
                ]
 
            )
 
        except EnvironmentError, e:
 
            log.exception(e)
 
            raise exc.HTTPExpectationFailed()
 
        resp = Response()
 
        resp.content_type = 'application/x-%s-advertisement' % str(git_command)
 
        resp.app_iter = out
 
        return resp
 

	
 
    def backend(self, request, environ):
 
        """
 
        WSGI Response producer for HTTP POST Git Smart HTTP requests.
 
        Reads commands and data from HTTP POST's body.
 
        returns an iterator obj with contents of git command's
 
        response to stdout
 
        """
 
        git_command = self._get_fixedpath(request.path_info)
 
        if git_command not in self.commands:
 
            log.debug('command %s not allowed' % git_command)
 
            return exc.HTTPMethodNotAllowed()
 

	
 
        if 'CONTENT_LENGTH' in environ:
 
            inputstream = FileWrapper(environ['wsgi.input'],
 
                                      request.content_length)
 
        else:
 
            inputstream = environ['wsgi.input']
 

	
 
        try:
 
            out = subprocessio.SubprocessIOChunker(
 
                r'git %s --stateless-rpc "%s"' % (git_command[4:],
 
                                                  self.content_path),
 
                inputstream=inputstream
 
                )
 
        except EnvironmentError, e:
 
            log.exception(e)
 
            raise exc.HTTPExpectationFailed()
 

	
 
        if git_command in [u'git-receive-pack']:
 
            # updating refs manually after each push.
 
            # Needed for pre-1.7.0.4 git clients using regular HTTP mode.
 
            subprocess.call(u'git --git-dir "%s" '
 
                            'update-server-info' % self.content_path,
 
                            shell=True)
 

	
 
        resp = Response()
 
        resp.content_type = 'application/x-%s-result' % git_command.encode('utf8')
 
        resp.app_iter = out
 
        return resp
 

	
 
    def __call__(self, environ, start_response):
 
        request = Request(environ)
 
        _path = self._get_fixedpath(request.path_info)
 
        if _path.startswith('info/refs'):
 
            app = self.inforefs
 
        elif [a for a in self.valid_accepts if a in request.accept]:
 
            app = self.backend
 
        try:
 
            resp = app(request, environ)
 
        except exc.HTTPException, e:
 
            resp = e
 
            log.exception(e)
 
        except Exception, e:
 
            log.exception(e)
 
            resp = exc.HTTPInternalServerError()
 
        return resp(environ, start_response)
 

	
 

	
 
class GitDirectory(object):
 

	
 
    def __init__(self, repo_root, repo_name):
 
        repo_location = os.path.join(repo_root, repo_name)
 
        if not os.path.isdir(repo_location):
 
            raise OSError(repo_location)
 

	
 
        self.content_path = repo_location
 
        self.repo_name = repo_name
 
        self.repo_location = repo_location
 

	
 
    def __call__(self, environ, start_response):
 
        content_path = self.content_path
 
        try:
 
            app = GitRepository(self.repo_name, content_path)
 
        except (AssertionError, OSError):
 
            if os.path.isdir(os.path.join(content_path, '.git')):
 
                app = GitRepository(os.path.join(content_path, '.git'))
 
            else:
 
                return exc.HTTPNotFound()(environ, start_response)
 
        return app(environ, start_response)
 

	
 

	
 
def make_wsgi_app(repo_name, repo_root):
 
    return GitDirectory(repo_root, repo_name)
rhodecode/lib/middleware/simplegit.py
Show inline comments
 
@@ -29,273 +29,275 @@ import re
 
import logging
 
import traceback
 

	
 
from dulwich import server as dulserver
 

	
 

	
 
class SimpleGitUploadPackHandler(dulserver.UploadPackHandler):
 

	
 
    def handle(self):
 
        write = lambda x: self.proto.write_sideband(1, x)
 

	
 
        graph_walker = dulserver.ProtocolGraphWalker(self,
 
                                                     self.repo.object_store,
 
                                                     self.repo.get_peeled)
 
        objects_iter = self.repo.fetch_objects(
 
          graph_walker.determine_wants, graph_walker, self.progress,
 
          get_tagged=self.get_tagged)
 

	
 
        # Did the process short-circuit (e.g. in a stateless RPC call)? Note
 
        # that the client still expects a 0-object pack in most cases.
 
        if objects_iter is None:
 
            return
 

	
 
        self.progress("counting objects: %d, done.\n" % len(objects_iter))
 
        dulserver.write_pack_objects(dulserver.ProtocolFile(None, write),
 
                                     objects_iter)
 
        messages = []
 
        messages.append('thank you for using rhodecode')
 

	
 
        for msg in messages:
 
            self.progress(msg + "\n")
 
        # we are done
 
        self.proto.write("0000")
 

	
 

	
 
dulserver.DEFAULT_HANDLERS = {
 
  #git-ls-remote, git-clone, git-fetch and git-pull
 
  'git-upload-pack': SimpleGitUploadPackHandler,
 
  #git-push
 
  'git-receive-pack': dulserver.ReceivePackHandler,
 
}
 

	
 
from dulwich.repo import Repo
 
from dulwich.web import make_wsgi_chain
 

	
 
from paste.httpheaders import REMOTE_USER, AUTH_TYPE
 

	
 
from rhodecode.lib.utils2 import safe_str
 
from rhodecode.lib.base import BaseVCSController
 
from rhodecode.lib.auth import get_container_username
 
from rhodecode.lib.utils import is_valid_repo, make_ui
 
from rhodecode.model.db import User
 

	
 
from webob.exc import HTTPNotFound, HTTPForbidden, HTTPInternalServerError
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
GIT_PROTO_PAT = re.compile(r'^/(.+)/(info/refs|git-upload-pack|git-receive-pack)')
 

	
 

	
 
def is_git(environ):
 
    path_info = environ['PATH_INFO']
 
    isgit_path = GIT_PROTO_PAT.match(path_info)
 
    log.debug('pathinfo: %s detected as GIT %s' % (
 
        path_info, isgit_path != None)
 
    )
 
    return isgit_path
 

	
 

	
 
class SimpleGit(BaseVCSController):
 

	
 
    def _handle_request(self, environ, start_response):
 

	
 
        if not is_git(environ):
 
            return self.application(environ, start_response)
 

	
 
        ipaddr = self._get_ip_addr(environ)
 
        username = None
 
        self._git_first_op = False
 
        # skip passing error to error controller
 
        environ['pylons.status_code_redirect'] = True
 

	
 
        #======================================================================
 
        # EXTRACT REPOSITORY NAME FROM ENV
 
        #======================================================================
 
        try:
 
            repo_name = self.__get_repository(environ)
 
            log.debug('Extracted repo name is %s' % repo_name)
 
        except:
 
            return HTTPInternalServerError()(environ, start_response)
 

	
 
        # quick check if that dir exists...
 
        if is_valid_repo(repo_name, self.basepath) is False:
 
            return HTTPNotFound()(environ, start_response)
 

	
 
        #======================================================================
 
        # GET ACTION PULL or PUSH
 
        #======================================================================
 
        action = self.__get_action(environ)
 

	
 
        #======================================================================
 
        # CHECK ANONYMOUS PERMISSION
 
        #======================================================================
 
        if action in ['pull', 'push']:
 
            anonymous_user = self.__get_user('default')
 
            username = anonymous_user.username
 
            anonymous_perm = self._check_permission(action, anonymous_user,
 
                                                    repo_name)
 

	
 
            if anonymous_perm is not True or anonymous_user.active is False:
 
                if anonymous_perm is not True:
 
                    log.debug('Not enough credentials to access this '
 
                              'repository as anonymous user')
 
                if anonymous_user.active is False:
 
                    log.debug('Anonymous access is disabled, running '
 
                              'authentication')
 
                #==============================================================
 
                # DEFAULT PERM FAILED OR ANONYMOUS ACCESS IS DISABLED SO WE
 
                # NEED TO AUTHENTICATE AND ASK FOR AUTH USER PERMISSIONS
 
                #==============================================================
 

	
 
                # Attempting to retrieve username from the container
 
                username = get_container_username(environ, self.config)
 

	
 
                # If not authenticated by the container, running basic auth
 
                if not username:
 
                    self.authenticate.realm = \
 
                        safe_str(self.config['rhodecode_realm'])
 
                    result = self.authenticate(environ)
 
                    if isinstance(result, str):
 
                        AUTH_TYPE.update(environ, 'basic')
 
                        REMOTE_USER.update(environ, result)
 
                        username = result
 
                    else:
 
                        return result.wsgi_application(environ, start_response)
 

	
 
                #==============================================================
 
                # CHECK PERMISSIONS FOR THIS REQUEST USING GIVEN USERNAME
 
                #==============================================================
 
                if action in ['pull', 'push']:
 
                    try:
 
                        user = self.__get_user(username)
 
                        if user is None or not user.active:
 
                            return HTTPForbidden()(environ, start_response)
 
                        username = user.username
 
                    except:
 
                        log.error(traceback.format_exc())
 
                        return HTTPInternalServerError()(environ,
 
                                                         start_response)
 

	
 
                    #check permissions for this repository
 
                    perm = self._check_permission(action, user, repo_name)
 
                    if perm is not True:
 
                        return HTTPForbidden()(environ, start_response)
 
        extras = {
 
            'ip': ipaddr,
 
            'username': username,
 
            'action': action,
 
            'repository': repo_name,
 
            'scm': 'git',
 
        }
 

	
 
        #===================================================================
 
        # GIT REQUEST HANDLING
 
        #===================================================================
 
        repo_path = os.path.join(safe_str(self.basepath), safe_str(repo_name))
 
        log.debug('Repository path is %s' % repo_path)
 

	
 
        baseui = make_ui('db')
 
        self.__inject_extras(repo_path, baseui, extras)
 

	
 
        try:
 
            # invalidate cache on push
 
            if action == 'push':
 
                self._invalidate_cache(repo_name)
 
            self._handle_githooks(repo_name, action, baseui, environ)
 

	
 
            log.info('%s action on GIT repo "%s"' % (action, repo_name))
 
            app = self.__make_app(repo_name, repo_path)
 
            return app(environ, start_response)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            return HTTPInternalServerError()(environ, start_response)
 

	
 
    def __make_app(self, repo_name, repo_path):
 
        """
 
        Make an wsgi application using dulserver
 

	
 
        :param repo_name: name of the repository
 
        :param repo_path: full path to the repository
 
        """
 
        _d = {'/' + repo_name: Repo(repo_path)}
 
        backend = dulserver.DictBackend(_d)
 
        gitserve = make_wsgi_chain(backend)
 

	
 
        return gitserve
 
        from rhodecode.lib.middleware.pygrack import make_wsgi_app
 
        app = make_wsgi_app(
 
            repo_root=os.path.dirname(repo_path),
 
            repo_name=repo_name,
 
        )
 
        return app
 

	
 
    def __get_repository(self, environ):
 
        """
 
        Get's repository name out of PATH_INFO header
 

	
 
        :param environ: environ where PATH_INFO is stored
 
        """
 
        try:
 
            environ['PATH_INFO'] = self._get_by_id(environ['PATH_INFO'])
 
            repo_name = GIT_PROTO_PAT.match(environ['PATH_INFO']).group(1)
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
        return repo_name
 

	
 
    def __get_user(self, username):
 
        return User.get_by_username(username)
 

	
 
    def __get_action(self, environ):
 
        """
 
        Maps git request commands into a pull or push command.
 

	
 
        :param environ:
 
        """
 
        service = environ['QUERY_STRING'].split('=')
 

	
 
        if len(service) > 1:
 
            service_cmd = service[1]
 
            mapping = {
 
                'git-receive-pack': 'push',
 
                'git-upload-pack': 'pull',
 
            }
 
            op = mapping[service_cmd]
 
            self._git_stored_op = op
 
            return op
 
        else:
 
            # try to fallback to stored variable as we don't know if the last
 
            # operation is pull/push
 
            op = getattr(self, '_git_stored_op', 'pull')
 
        return op
 

	
 
    def _handle_githooks(self, repo_name, action, baseui, environ):
 
        from rhodecode.lib.hooks import log_pull_action, log_push_action
 
        service = environ['QUERY_STRING'].split('=')
 
        if len(service) < 2:
 
            return
 

	
 
        from rhodecode.model.db import Repository
 
        _repo = Repository.get_by_repo_name(repo_name)
 
        _repo = _repo.scm_instance
 
        _repo._repo.ui = baseui
 

	
 
        push_hook = 'pretxnchangegroup.push_logger'
 
        pull_hook = 'preoutgoing.pull_logger'
 
        _hooks = dict(baseui.configitems('hooks')) or {}
 
        if action == 'push' and _hooks.get(push_hook):
 
            log_push_action(ui=baseui, repo=_repo._repo)
 
        elif action == 'pull' and _hooks.get(pull_hook):
 
            log_pull_action(ui=baseui, repo=_repo._repo)
 

	
 
    def __inject_extras(self, repo_path, baseui, extras={}):
 
        """
 
        Injects some extra params into baseui instance
 

	
 
        :param baseui: baseui instance
 
        :param extras: dict with extra params to put into baseui
 
        """
 

	
 
        # make our hgweb quiet so it doesn't print output
 
        baseui.setconfig('ui', 'quiet', 'true')
 

	
 
        #inject some additional parameters that will be available in ui
 
        #for hooks
 
        for k, v in extras.items():
 
            baseui.setconfig('rhodecode_extras', k, v)
rhodecode/lib/subprocessio.py
Show inline comments
 
new file 100644
 
'''
 
Module provides a class allowing to wrap communication over subprocess.Popen
 
input, output, error streams into a meaningfull, non-blocking, concurrent
 
stream processor exposing the output data as an iterator fitting to be a
 
return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
 

	
 
Copyright (c) 2011  Daniel Dotsenko <dotsa@hotmail.com>
 

	
 
This file is part of git_http_backend.py Project.
 

	
 
git_http_backend.py Project is free software: you can redistribute it and/or
 
modify it under the terms of the GNU Lesser General Public License as
 
published by the Free Software Foundation, either version 2.1 of the License,
 
or (at your option) any later version.
 

	
 
git_http_backend.py Project is distributed in the hope that it will be useful,
 
but WITHOUT ANY WARRANTY; without even the implied warranty of
 
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 
GNU Lesser General Public License for more details.
 

	
 
You should have received a copy of the GNU Lesser General Public License
 
along with git_http_backend.py Project.
 
If not, see <http://www.gnu.org/licenses/>.
 
'''
 
import os
 
import subprocess
 
import threading
 
from collections import deque
 

	
 

	
 
class StreamFeeder(threading.Thread):
 
    """
 
    Normal writing into pipe-like is blocking once the buffer is filled.
 
    This thread allows a thread to seep data from a file-like into a pipe
 
    without blocking the main thread.
 
    We close inpipe once the end of the source stream is reached.
 
    """
 
    def __init__(self, source):
 
        super(StreamFeeder, self).__init__()
 
        self.daemon = True
 
        filelike = False
 
        self.bytes = b''
 
        if type(source) in (type(''), bytes, bytearray): # string-like
 
            self.bytes = bytes(source)
 
        else: # can be either file pointer or file-like
 
            if type(source) in (int, long): # file pointer it is
 
                ## converting file descriptor (int) stdin into file-like
 
                try:
 
                    source = os.fdopen(source, 'rb', 16384)
 
                except:
 
                    pass
 
            # let's see if source is file-like by now
 
            try:
 
                filelike = source.read
 
            except:
 
                pass
 
        if not filelike and not self.bytes:
 
            raise TypeError("StreamFeeder's source object must be a readable file-like, a file descriptor, or a string-like.")
 
        self.source = source
 
        self.readiface, self.writeiface = os.pipe()
 

	
 
    def run(self):
 
        t = self.writeiface
 
        if self.bytes:
 
            os.write(t, self.bytes)
 
        else:
 
            s = self.source
 
            b = s.read(4096)
 
            while b:
 
                os.write(t, b)
 
                b = s.read(4096)
 
        os.close(t)
 

	
 
    @property
 
    def output(self):
 
        return self.readiface
 

	
 

	
 
class InputStreamChunker(threading.Thread):
 
    def __init__(self, source, target, buffer_size, chunk_size):
 

	
 
        super(InputStreamChunker, self).__init__()
 

	
 
        self.daemon = True  # die die die.
 

	
 
        self.source = source
 
        self.target = target
 
        self.chunk_count_max = int(buffer_size / chunk_size) + 1
 
        self.chunk_size = chunk_size
 

	
 
        self.data_added = threading.Event()
 
        self.data_added.clear()
 

	
 
        self.keep_reading = threading.Event()
 
        self.keep_reading.set()
 

	
 
        self.EOF = threading.Event()
 
        self.EOF.clear()
 

	
 
        self.go = threading.Event()
 
        self.go.set()
 

	
 
    def stop(self):
 
        self.go.clear()
 
        self.EOF.set()
 
        try:
 
            # this is not proper, but is done to force the reader thread let
 
            # go of the input because, if successful, .close() will send EOF
 
            # down the pipe.
 
            self.source.close()
 
        except:
 
            pass
 

	
 
    def run(self):
 
        s = self.source
 
        t = self.target
 
        cs = self.chunk_size
 
        ccm = self.chunk_count_max
 
        kr = self.keep_reading
 
        da = self.data_added
 
        go = self.go
 
        b = s.read(cs)
 
        while b and go.is_set():
 
            if len(t) > ccm:
 
                kr.clear()
 
                kr.wait(2)
 
#                # this only works on 2.7.x and up
 
#                if not kr.wait(10):
 
#                    raise Exception("Timed out while waiting for input to be read.")
 
                # instead we'll use this
 
                if len(t) > ccm + 3:
 
                    raise IOError("Timed out while waiting for input from subprocess.")
 
            t.append(b)
 
            da.set()
 
            b = s.read(cs)
 
        self.EOF.set()
 
        da.set()  # for cases when done but there was no input.
 

	
 

	
 
class BufferedGenerator():
 
    '''
 
    Class behaves as a non-blocking, buffered pipe reader.
 
    Reads chunks of data (through a thread)
 
    from a blocking pipe, and attaches these to an array (Deque) of chunks.
 
    Reading is halted in the thread when max chunks is internally buffered.
 
    The .next() may operate in blocking or non-blocking fashion by yielding
 
    '' if no data is ready
 
    to be sent or by not returning until there is some data to send
 
    When we get EOF from underlying source pipe we raise the marker to raise
 
    StopIteration after the last chunk of data is yielded.
 
    '''
 

	
 
    def __init__(self, source, buffer_size=65536, chunk_size=4096,
 
                 starting_values=[], bottomless=False):
 

	
 
        if bottomless:
 
            maxlen = int(buffer_size / chunk_size)
 
        else:
 
            maxlen = None
 

	
 
        self.data = deque(starting_values, maxlen)
 

	
 
        self.worker = InputStreamChunker(source, self.data, buffer_size,
 
                                         chunk_size)
 
        if starting_values:
 
            self.worker.data_added.set()
 
        self.worker.start()
 

	
 
    ####################
 
    # Generator's methods
 
    ####################
 

	
 
    def __iter__(self):
 
        return self
 

	
 
    def next(self):
 
        while not len(self.data) and not self.worker.EOF.is_set():
 
            self.worker.data_added.clear()
 
            self.worker.data_added.wait(0.2)
 
        if len(self.data):
 
            self.worker.keep_reading.set()
 
            return bytes(self.data.popleft())
 
        elif self.worker.EOF.is_set():
 
            raise StopIteration
 

	
 
    def throw(self, type, value=None, traceback=None):
 
        if not self.worker.EOF.is_set():
 
            raise type(value)
 

	
 
    def start(self):
 
        self.worker.start()
 

	
 
    def stop(self):
 
        self.worker.stop()
 

	
 
    def close(self):
 
        try:
 
            self.worker.stop()
 
            self.throw(GeneratorExit)
 
        except (GeneratorExit, StopIteration):
 
            pass
 

	
 
    def __del__(self):
 
        self.close()
 

	
 
    ####################
 
    # Threaded reader's infrastructure.
 
    ####################
 
    @property
 
    def input(self):
 
        return self.worker.w
 

	
 
    @property
 
    def data_added_event(self):
 
        return self.worker.data_added
 

	
 
    @property
 
    def data_added(self):
 
        return self.worker.data_added.is_set()
 

	
 
    @property
 
    def reading_paused(self):
 
        return not self.worker.keep_reading.is_set()
 

	
 
    @property
 
    def done_reading_event(self):
 
        '''
 
        Done_reding does not mean that the iterator's buffer is empty.
 
        Iterator might have done reading from underlying source, but the read
 
        chunks might still be available for serving through .next() method.
 

	
 
        @return An Event class instance.
 
        '''
 
        return self.worker.EOF
 

	
 
    @property
 
    def done_reading(self):
 
        '''
 
        Done_reding does not mean that the iterator's buffer is empty.
 
        Iterator might have done reading from underlying source, but the read
 
        chunks might still be available for serving through .next() method.
 

	
 
        @return An Bool value.
 
        '''
 
        return self.worker.EOF.is_set()
 

	
 
    @property
 
    def length(self):
 
        '''
 
        returns int.
 

	
 
        This is the lenght of the que of chunks, not the length of
 
        the combined contents in those chunks.
 

	
 
        __len__() cannot be meaningfully implemented because this
 
        reader is just flying throuh a bottomless pit content and
 
        can only know the lenght of what it already saw.
 

	
 
        If __len__() on WSGI server per PEP 3333 returns a value,
 
        the responce's length will be set to that. In order not to
 
        confuse WSGI PEP3333 servers, we will not implement __len__
 
        at all.
 
        '''
 
        return len(self.data)
 

	
 
    def prepend(self, x):
 
        self.data.appendleft(x)
 

	
 
    def append(self, x):
 
        self.data.append(x)
 

	
 
    def extend(self, o):
 
        self.data.extend(o)
 

	
 
    def __getitem__(self, i):
 
        return self.data[i]
 

	
 

	
 
class SubprocessIOChunker():
 
    '''
 
    Processor class wrapping handling of subprocess IO.
 

	
 
    In a way, this is a "communicate()" replacement with a twist.
 

	
 
    - We are multithreaded. Writing in and reading out, err are all sep threads.
 
    - We support concurrent (in and out) stream processing.
 
    - The output is not a stream. It's a queue of read string (bytes, not unicode)
 
      chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
 
    - We are non-blocking in more respects than communicate()
 
      (reading from subprocess out pauses when internal buffer is full, but
 
       does not block the parent calling code. On the flip side, reading from
 
       slow-yielding subprocess may block the iteration until data shows up. This
 
       does not block the parallel inpipe reading occurring parallel thread.)
 

	
 
    The purpose of the object is to allow us to wrap subprocess interactions into
 
    and interable that can be passed to a WSGI server as the application's return
 
    value. Because of stream-processing-ability, WSGI does not have to read ALL
 
    of the subprocess's output and buffer it, before handing it to WSGI server for
 
    HTTP response. Instead, the class initializer reads just a bit of the stream
 
    to figure out if error ocurred or likely to occur and if not, just hands the
 
    further iteration over subprocess output to the server for completion of HTTP
 
    response.
 

	
 
    The real or perceived subprocess error is trapped and raised as one of
 
    EnvironmentError family of exceptions
 

	
 
    Example usage:
 
    #    try:
 
    #        answer = SubprocessIOChunker(
 
    #            cmd,
 
    #            input,
 
    #            buffer_size = 65536,
 
    #            chunk_size = 4096
 
    #            )
 
    #    except (EnvironmentError) as e:
 
    #        print str(e)
 
    #        raise e
 
    #
 
    #    return answer
 

	
 

	
 
    '''
 
    def __init__(self, cmd, inputstream=None, buffer_size=65536,
 
                 chunk_size=4096, starting_values=[]):
 
        '''
 
        Initializes SubprocessIOChunker
 

	
 
        @param cmd A Subprocess.Popen style "cmd". Can be string or array of strings
 
        @param inputstream (Default: None) A file-like, string, or file pointer.
 
        @param buffer_size (Default: 65536) A size of total buffer per stream in bytes.
 
        @param chunk_size (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
 
        @param starting_values (Default: []) An array of strings to put in front of output que.
 
        '''
 

	
 
        if inputstream:
 
            input_streamer = StreamFeeder(inputstream)
 
            input_streamer.start()
 
            inputstream = input_streamer.output
 

	
 
        _p = subprocess.Popen(cmd,
 
            bufsize=-1,
 
            shell=True,
 
            stdin=inputstream,
 
            stdout=subprocess.PIPE,
 
            stderr=subprocess.PIPE
 
            )
 

	
 
        bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size, starting_values)
 
        bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
 

	
 
        while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
 
            # doing this until we reach either end of file, or end of buffer.
 
            bg_out.data_added_event.wait(1)
 
            bg_out.data_added_event.clear()
 

	
 
        # at this point it's still ambiguous if we are done reading or just full buffer.
 
        # Either way, if error (returned by ended process, or implied based on
 
        # presence of stuff in stderr output) we error out.
 
        # Else, we are happy.
 
        _returncode = _p.poll()
 
        if _returncode or (_returncode == None and bg_err.length):
 
            try:
 
                _p.terminate()
 
            except:
 
                pass
 
            bg_out.stop()
 
            bg_err.stop()
 
            raise EnvironmentError("Subprocess exited due to an error.\n" + "".join(bg_err))
 

	
 
        self.process = _p
 
        self.output = bg_out
 
        self.error = bg_err
 

	
 
    def __iter__(self):
 
        return self
 

	
 
    def next(self):
 
        if self.process.poll():
 
            raise EnvironmentError("Subprocess exited due to an error:\n" + ''.join(self.error))
 
        return self.output.next()
 

	
 
    def throw(self, type, value=None, traceback=None):
 
        if self.output.length or not self.output.done_reading:
 
            raise type(value)
 

	
 
    def close(self):
 
        try:
 
            self.process.terminate()
 
        except:
 
            pass
 
        try:
 
            self.output.close()
 
        except:
 
            pass
 
        try:
 
            self.error.close()
 
        except:
 
            pass
 

	
 
    def __del__(self):
 
        self.close()
0 comments (0 inline, 0 general)