Implement accelerated tee support for POSIX.

This removes all the need of threading, which removes the contention on the GIL
lock.

Taking S2Test.test_check_output_tee_large as baseline, numbers are
real/user/sys in seconds:
Ubuntu workstation: ~25x  2.4/1.9/1.5 -> 0.10/0.70/0.02
OSX 10.6 laptop:    ~40x  6.4/5.3/3.9 -> 0.15/0.80/0.07
Cygwin on win7:      ~4x  2.8/2.2/1.3 -> 0.60/0.16/0.30

R=dpranke@chromium.org
BUG=
TEST=


Review URL: http://codereview.chromium.org/8462008

git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@109283 0039d316-1c4b-4281-b951-d872f2087c98
experimental/szager/collated-output
maruel@chromium.org 14 years ago
parent f2dca4e174
commit fefff18b55

@ -13,11 +13,16 @@ import errno
import logging import logging
import os import os
import Queue import Queue
import select
import subprocess import subprocess
import sys import sys
import time import time
import threading import threading
if sys.platform != 'win32':
import fcntl
# Constants forwarded from subprocess. # Constants forwarded from subprocess.
PIPE = subprocess.PIPE PIPE = subprocess.PIPE
STDOUT = subprocess.STDOUT STDOUT = subprocess.STDOUT
@ -203,7 +208,7 @@ def Popen(args, **kwargs):
def _queue_pipe_read(pipe, name, done, dest): def _queue_pipe_read(pipe, name, done, dest):
"""Queue characters read from a pipe into a queue. """Queues characters read from a pipe into a queue.
Left outside the _tee_threads function to not introduce a function closure Left outside the _tee_threads function to not introduce a function closure
to speed up variable lookup. to speed up variable lookup.
@ -336,6 +341,80 @@ def _tee_threads(proc, timeout, start, stdin, args, kwargs):
return proc.returncode return proc.returncode
def _read_pipe(handles, pipe, out_fn):
"""Reads bytes from a pipe and calls the output callback."""
data = pipe.read()
if not data:
del handles[pipe]
else:
out_fn(data)
def _tee_posix(proc, timeout, start, stdin, args, kwargs):
"""Polls a process and its pipe using select.select().
TODO(maruel): Implement a non-polling method for OSes that support it.
"""
handles_r = {}
if callable(kwargs.get('stdout')):
handles_r[proc.stdout] = lambda: _read_pipe(
handles_r, proc.stdout, kwargs['stdout'])
if callable(kwargs.get('stderr')):
handles_r[proc.stderr] = lambda: _read_pipe(
handles_r, proc.stderr, kwargs['stderr'])
handles_w = {}
if isinstance(stdin, str):
stdin_io = cStringIO.StringIO(stdin)
def write_stdin():
data = stdin_io.read(1)
if data:
proc.stdin.write(data)
else:
del handles_w[proc.stdin]
proc.stdin.close()
handles_w[proc.stdin] = write_stdin
else:
# TODO(maruel): Fix me, it could be VOID.
assert stdin is None
# Make all the file objects of the child process non-blocking file.
# TODO(maruel): Test if a pipe is handed to the child process.
for pipe in (proc.stdin, proc.stdout, proc.stderr):
fileno = pipe and getattr(pipe, 'fileno', lambda: None)()
if fileno:
# Note: making a pipe non-blocking means the C stdio could act wrong. In
# particular, readline() cannot be used. Work around is to use os.read().
fl = fcntl.fcntl(fileno, fcntl.F_GETFL)
fcntl.fcntl(fileno, fcntl.F_SETFL, fl | os.O_NONBLOCK)
timed_out = False
while handles_r or handles_w or (timeout and proc.poll() is None):
period = None
if timeout:
period = max(0, timeout - (time.time() - start))
if not period and not timed_out:
proc.kill()
timed_out = True
if timed_out:
period = 0.001
# It reconstructs objects on each loop, not very efficient.
reads, writes, _, = select.select(
handles_r.keys(), handles_w.keys(), [], period)
for read in reads:
handles_r[read]()
for write in writes:
handles_w[write]()
# No pipe open anymore and if there was a time out, the child process was
# killed already.
proc.wait()
if timed_out:
return TIMED_OUT
return proc.returncode
def communicate(args, timeout=None, **kwargs): def communicate(args, timeout=None, **kwargs):
"""Wraps subprocess.Popen().communicate(). """Wraps subprocess.Popen().communicate().
@ -386,7 +465,12 @@ def communicate(args, timeout=None, **kwargs):
if kwargs.get('stderr') == PIPE: if kwargs.get('stderr') == PIPE:
stderr = [] stderr = []
kwargs['stderr'] = stderr.append kwargs['stderr'] = stderr.append
returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs) if sys.platform == 'win32':
# On cygwin, ctypes._FUNCFLAG_STDCALL, which is used by ctypes.WINFUNCTYPE,
# doesn't exist so _tee_win() cannot be used yet.
returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs)
else:
returncode = _tee_posix(proc, timeout, start, stdin, args, kwargs)
if not stdout is None: if not stdout is None:
stdout = ''.join(stdout) stdout = ''.join(stdout)
if not stderr is None: if not stderr is None:

@ -12,6 +12,11 @@ import sys
import time import time
import unittest import unittest
try:
import fcntl
except ImportError:
fcntl = None
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, ROOT_DIR) sys.path.insert(0, ROOT_DIR)
@ -182,6 +187,16 @@ class S2Test(unittest.TestCase):
super(S2Test, self).setUp() super(S2Test, self).setUp()
self.exe_path = __file__ self.exe_path = __file__
self.exe = [sys.executable, self.exe_path, '--child'] self.exe = [sys.executable, self.exe_path, '--child']
self.states = {}
if fcntl:
for v in (sys.stdin, sys.stdout, sys.stderr):
fileno = v.fileno()
self.states[fileno] = fcntl.fcntl(fileno, fcntl.F_GETFL)
def tearDown(self):
for fileno, fl in self.states.iteritems():
self.assertEquals(fl, fcntl.fcntl(fileno, fcntl.F_GETFL))
super(S2Test, self).tearDown()
def _run_test(self, function): def _run_test(self, function):
"""Runs tests in 6 combinations: """Runs tests in 6 combinations:

Loading…
Cancel
Save