|
|
|
@ -380,6 +380,42 @@ class S2Test(BaseTestCase):
|
|
|
|
|
pass
|
|
|
|
|
self._run_test(fn)
|
|
|
|
|
|
|
|
|
|
def test_stdin(self):
|
|
|
|
|
def fn(c, e, un):
|
|
|
|
|
stdin = '0123456789'
|
|
|
|
|
res = subprocess2.communicate(
|
|
|
|
|
e + ['--read'],
|
|
|
|
|
stdin=stdin,
|
|
|
|
|
universal_newlines=un)
|
|
|
|
|
self._check_res(res, None, None, 10)
|
|
|
|
|
self._run_test(fn)
|
|
|
|
|
|
|
|
|
|
def test_stdin_unicode(self):
|
|
|
|
|
def fn(c, e, un):
|
|
|
|
|
stdin = u'0123456789'
|
|
|
|
|
res = subprocess2.communicate(
|
|
|
|
|
e + ['--read'],
|
|
|
|
|
stdin=stdin,
|
|
|
|
|
universal_newlines=un)
|
|
|
|
|
self._check_res(res, None, None, 10)
|
|
|
|
|
self._run_test(fn)
|
|
|
|
|
|
|
|
|
|
def test_stdin_void(self):
|
|
|
|
|
res = subprocess2.communicate(self.exe + ['--read'], stdin=VOID)
|
|
|
|
|
self._check_res(res, None, None, 0)
|
|
|
|
|
|
|
|
|
|
def test_stdin_void_stdout_timeout(self):
|
|
|
|
|
# Make sure a mix of VOID, PIPE and timeout works.
|
|
|
|
|
def fn(c, e, un):
|
|
|
|
|
res = subprocess2.communicate(
|
|
|
|
|
e + ['--stdout', '--read'],
|
|
|
|
|
stdin=VOID,
|
|
|
|
|
stdout=PIPE,
|
|
|
|
|
timeout=10,
|
|
|
|
|
universal_newlines=un)
|
|
|
|
|
self._check_res(res, c('A\nBB\nCCC\n'), None, 0)
|
|
|
|
|
self._run_test(fn)
|
|
|
|
|
|
|
|
|
|
def test_stdout_void(self):
|
|
|
|
|
def fn(c, e, un):
|
|
|
|
|
res = subprocess2.communicate(
|
|
|
|
@ -435,21 +471,26 @@ class S2Test(BaseTestCase):
|
|
|
|
|
|
|
|
|
|
def test_tee_stdin(self):
|
|
|
|
|
def fn(c, e, un):
|
|
|
|
|
# Mix of stdin input and stdout callback.
|
|
|
|
|
stdout = []
|
|
|
|
|
stdin = '0123456789'
|
|
|
|
|
res = subprocess2.communicate(
|
|
|
|
|
e + ['--stdout', '--read'], stdin=stdin, stdout=stdout.append,
|
|
|
|
|
e + ['--stdout', '--read'],
|
|
|
|
|
stdin=stdin,
|
|
|
|
|
stdout=stdout.append,
|
|
|
|
|
universal_newlines=un)
|
|
|
|
|
self.assertEquals(c('A\nBB\nCCC\n'), ''.join(stdout))
|
|
|
|
|
self._check_res(res, None, None, 0)
|
|
|
|
|
self._check_res(res, None, None, 10)
|
|
|
|
|
self._run_test(fn)
|
|
|
|
|
|
|
|
|
|
def test_tee_throw(self):
|
|
|
|
|
def fn(c, e, un):
|
|
|
|
|
# Make sure failure still returns stderr completely.
|
|
|
|
|
stderr = []
|
|
|
|
|
try:
|
|
|
|
|
subprocess2.check_output(
|
|
|
|
|
e + ['--stderr', '--fail'], stderr=stderr.append,
|
|
|
|
|
e + ['--stderr', '--fail'],
|
|
|
|
|
stderr=stderr.append,
|
|
|
|
|
universal_newlines=un)
|
|
|
|
|
self.fail()
|
|
|
|
|
except subprocess2.CalledProcessError, e:
|
|
|
|
@ -580,9 +621,10 @@ def child_main(args):
|
|
|
|
|
string = '0123456789abcdef' * (8*1024)
|
|
|
|
|
sys.stdout.write(string)
|
|
|
|
|
if options.read:
|
|
|
|
|
assert options.return_value is 0
|
|
|
|
|
try:
|
|
|
|
|
while sys.stdin.read():
|
|
|
|
|
pass
|
|
|
|
|
while sys.stdin.read(1):
|
|
|
|
|
options.return_value += 1
|
|
|
|
|
except OSError:
|
|
|
|
|
pass
|
|
|
|
|
if options.sleep_last:
|
|
|
|
|