Changeset - 71fc1c98e02a
[Not reviewed]
beta
0 1 0
Marcin Kuzminski - 13 years ago 2012-07-07 19:35:18
marcin@python-works.com
dont format errors to string in subprocessio
1 file changed with 4 insertions and 2 deletions:
0 comments (0 inline, 0 general)
rhodecode/lib/subprocessio.py
Show inline comments
 
@@ -274,130 +274,132 @@ class BufferedGenerator():
 
        self.data.extend(o)
 

	
 
    def __getitem__(self, i):
 
        return self.data[i]
 

	
 

	
 
class SubprocessIOChunker(object):
 
    '''
 
    Processor class wrapping handling of subprocess IO.
 

	
 
    In a way, this is a "communicate()" replacement with a twist.
 

	
 
    - We are multithreaded. Writing in and reading out, err are all sep threads.
 
    - We support concurrent (in and out) stream processing.
 
    - The output is not a stream. It's a queue of read string (bytes, not unicode)
 
      chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
 
    - We are non-blocking in more respects than communicate()
 
      (reading from subprocess out pauses when internal buffer is full, but
 
       does not block the parent calling code. On the flip side, reading from
 
       slow-yielding subprocess may block the iteration until data shows up. This
 
       does not block the parallel inpipe reading occurring parallel thread.)
 

	
 
    The purpose of the object is to allow us to wrap subprocess interactions into
 
    and interable that can be passed to a WSGI server as the application's return
 
    value. Because of stream-processing-ability, WSGI does not have to read ALL
 
    of the subprocess's output and buffer it, before handing it to WSGI server for
 
    HTTP response. Instead, the class initializer reads just a bit of the stream
 
    to figure out if error ocurred or likely to occur and if not, just hands the
 
    further iteration over subprocess output to the server for completion of HTTP
 
    response.
 

	
 
    The real or perceived subprocess error is trapped and raised as one of
 
    EnvironmentError family of exceptions
 

	
 
    Example usage:
 
    #    try:
 
    #        answer = SubprocessIOChunker(
 
    #            cmd,
 
    #            input,
 
    #            buffer_size = 65536,
 
    #            chunk_size = 4096
 
    #            )
 
    #    except (EnvironmentError) as e:
 
    #        print str(e)
 
    #        raise e
 
    #
 
    #    return answer
 

	
 

	
 
    '''
 
    def __init__(self, cmd, inputstream=None, buffer_size=65536,
 
                 chunk_size=4096, starting_values=[], **kwargs):
 
        '''
 
        Initializes SubprocessIOChunker
 

	
 
        :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
 
        :param inputstream: (Default: None) A file-like, string, or file pointer.
 
        :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
 
        :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
 
        :param starting_values: (Default: []) An array of strings to put in front of output que.
 
        '''
 

	
 
        if inputstream:
 
            input_streamer = StreamFeeder(inputstream)
 
            input_streamer.start()
 
            inputstream = input_streamer.output
 

	
 
        _p = subprocess.Popen(cmd,
 
            bufsize=-1,
 
            shell=True,
 
            stdin=inputstream,
 
            stdout=subprocess.PIPE,
 
            stderr=subprocess.PIPE,
 
            **kwargs
 
            )
 

	
 
        bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size, starting_values)
 
        bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
 

	
 
        while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
 
            # doing this until we reach either end of file, or end of buffer.
 
            bg_out.data_added_event.wait(1)
 
            bg_out.data_added_event.clear()
 

	
 
        # at this point it's still ambiguous if we are done reading or just full buffer.
 
        # Either way, if error (returned by ended process, or implied based on
 
        # presence of stuff in stderr output) we error out.
 
        # Else, we are happy.
 
        _returncode = _p.poll()
 
        if _returncode or (_returncode == None and bg_err.length):
 
            try:
 
                _p.terminate()
 
            except:
 
                pass
 
            bg_out.stop()
 
            bg_err.stop()
 
            raise EnvironmentError("Subprocess exited due to an error.\n" + "".join(bg_err))
 
            err = '%r' % ''.join(bg_err)
 
            raise EnvironmentError("Subprocess exited due to an error.\n" + err)
 

	
 
        self.process = _p
 
        self.output = bg_out
 
        self.error = bg_err
 

	
 
    def __iter__(self):
 
        return self
 

	
 
    def next(self):
 
        if self.process.poll():
 
            raise EnvironmentError("Subprocess exited due to an error:\n" + ''.join(self.error))
 
            err = '%r' % ''.join(self.error)
 
            raise EnvironmentError("Subprocess exited due to an error:\n" + err)
 
        return self.output.next()
 

	
 
    def throw(self, type, value=None, traceback=None):
 
        if self.output.length or not self.output.done_reading:
 
            raise type(value)
 

	
 
    def close(self):
 
        try:
 
            self.process.terminate()
 
        except:
 
            pass
 
        try:
 
            self.output.close()
 
        except:
 
            pass
 
        try:
 
            self.error.close()
 
        except:
 
            pass
 

	
 
    def __del__(self):
 
        self.close()
0 comments (0 inline, 0 general)