@ -5,6 +5,7 @@
""" Unit tests for subprocess2.py. """
""" Unit tests for subprocess2.py. """
import logging
import optparse
import optparse
import os
import os
import sys
import sys
@ -19,6 +20,14 @@ import subprocess2
# Method could be a function
# Method could be a function
# pylint: disable=R0201
# pylint: disable=R0201
def convert ( string ) :
""" Converts string to CRLF on Windows. """
if sys . platform == ' win32 ' :
return string . replace ( ' \n ' , ' \r \n ' )
return string
class Subprocess2Test ( unittest . TestCase ) :
class Subprocess2Test ( unittest . TestCase ) :
# Can be mocked in a test.
# Can be mocked in a test.
TO_SAVE = {
TO_SAVE = {
@ -147,13 +156,24 @@ class Subprocess2Test(unittest.TestCase):
self . assertEquals ( expected , results )
self . assertEquals ( expected , results )
def test_timeout ( self ) :
def test_timeout ( self ) :
# It'd be better to not discard stdout.
out , returncode = subprocess2 . communicate (
out , returncode = subprocess2 . communicate (
self . exe + [ ' --sleep ' , ' --stdout ' ] ,
self . exe + [ ' --sleep ' , ' --stdout ' ] ,
timeout = 0.01 ,
timeout = 0.01 ,
stdout = subprocess2 . PIPE )
stdout = subprocess2 . PIPE ,
shell = False )
self . assertEquals ( subprocess2 . TIMED_OUT , returncode )
self . assertEquals ( subprocess2 . TIMED_OUT , returncode )
self . assertEquals ( [ ' ' , None ] , out )
self . assertEquals ( ( ' ' , None ) , out )
def test_timeout_shell_throws ( self ) :
try :
subprocess2 . communicate (
self . exe + [ ' --sleep ' , ' --stdout ' ] ,
timeout = 0.01 ,
stdout = subprocess2 . PIPE ,
shell = True )
self . fail ( )
except TypeError :
pass
def test_check_output_no_stdout ( self ) :
def test_check_output_no_stdout ( self ) :
try :
try :
@ -168,10 +188,7 @@ class Subprocess2Test(unittest.TestCase):
stdout = subprocess2 . VOID ,
stdout = subprocess2 . VOID ,
stderr = subprocess2 . PIPE )
stderr = subprocess2 . PIPE )
self . assertEquals ( None , out )
self . assertEquals ( None , out )
expected = ' a \n bb \n ccc \n '
self . assertEquals ( convert ( ' a \n bb \n ccc \n ' ) , err )
if sys . platform == ' win32 ' :
expected = expected . replace ( ' \n ' , ' \r \n ' )
self . assertEquals ( expected , err )
self . assertEquals ( 0 , code )
self . assertEquals ( 0 , code )
def test_stderr_void ( self ) :
def test_stderr_void ( self ) :
@ -235,6 +252,66 @@ class Subprocess2Test(unittest.TestCase):
self . assertEquals ( None , e . stderr )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
self . assertEquals ( 64 , e . returncode )
def test_check_output_tee_stderr ( self ) :
stderr = [ ]
out , returncode = subprocess2 . communicate (
self . exe + [ ' --stderr ' ] , stderr = stderr . append )
self . assertEquals ( convert ( ' a \n bb \n ccc \n ' ) , ' ' . join ( stderr ) )
self . assertEquals ( ( None , None ) , out )
self . assertEquals ( 0 , returncode )
def test_check_output_tee_stdout_stderr ( self ) :
stdout = [ ]
stderr = [ ]
out , returncode = subprocess2 . communicate (
self . exe + [ ' --stdout ' , ' --stderr ' ] ,
stdout = stdout . append ,
stderr = stderr . append )
self . assertEquals ( convert ( ' A \n BB \n CCC \n ' ) , ' ' . join ( stdout ) )
self . assertEquals ( convert ( ' a \n bb \n ccc \n ' ) , ' ' . join ( stderr ) )
self . assertEquals ( ( None , None ) , out )
self . assertEquals ( 0 , returncode )
def test_check_output_tee_stdin ( self ) :
stdout = [ ]
stdin = ' 0123456789 '
out , returncode = subprocess2 . communicate (
self . exe + [ ' --stdout ' , ' --read ' ] , stdin = stdin , stdout = stdout . append )
self . assertEquals ( convert ( ' A \n BB \n CCC \n ' ) , ' ' . join ( stdout ) )
self . assertEquals ( ( None , None ) , out )
self . assertEquals ( 0 , returncode )
def test_check_output_tee_throw ( self ) :
stderr = [ ]
try :
subprocess2 . check_output (
self . exe + [ ' --stderr ' , ' --fail ' ] , stderr = stderr . append )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( convert ( ' a \n bb \n ccc \n ' ) , ' ' . join ( stderr ) )
self . assertEquals ( ' ' , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
def test_check_output_tee_large ( self ) :
stdout = [ ]
# Read 128kb. On my workstation it takes >2s. Welcome to 2011.
out , returncode = subprocess2 . communicate (
self . exe + [ ' --large ' ] , stdout = stdout . append )
self . assertEquals ( 128 * 1024 , len ( ' ' . join ( stdout ) ) )
self . assertEquals ( ( None , None ) , out )
self . assertEquals ( 0 , returncode )
def test_check_output_tee_large_stdin ( self ) :
stdout = [ ]
# Write 128kb.
stdin = ' 0123456789abcdef ' * ( 8 * 1024 )
out , returncode = subprocess2 . communicate (
self . exe + [ ' --large ' , ' --read ' ] , stdin = stdin , stdout = stdout . append )
self . assertEquals ( 128 * 1024 , len ( ' ' . join ( stdout ) ) )
self . assertEquals ( ( None , None ) , out )
self . assertEquals ( 0 , returncode )
def child_main ( args ) :
def child_main ( args ) :
parser = optparse . OptionParser ( )
parser = optparse . OptionParser ( )
@ -247,9 +324,13 @@ def child_main(args):
parser . add_option ( ' --stdout ' , action = ' store_true ' )
parser . add_option ( ' --stdout ' , action = ' store_true ' )
parser . add_option ( ' --stderr ' , action = ' store_true ' )
parser . add_option ( ' --stderr ' , action = ' store_true ' )
parser . add_option ( ' --sleep ' , action = ' store_true ' )
parser . add_option ( ' --sleep ' , action = ' store_true ' )
parser . add_option ( ' --large ' , action = ' store_true ' )
parser . add_option ( ' --read ' , action = ' store_true ' )
options , args = parser . parse_args ( args )
options , args = parser . parse_args ( args )
if args :
if args :
parser . error ( ' Internal error ' )
parser . error ( ' Internal error ' )
if options . sleep :
time . sleep ( 10 )
def do ( string ) :
def do ( string ) :
if options . stdout :
if options . stdout :
@ -260,12 +341,23 @@ def child_main(args):
do ( ' A ' )
do ( ' A ' )
do ( ' BB ' )
do ( ' BB ' )
do ( ' CCC ' )
do ( ' CCC ' )
if options . sleep :
if options . large :
time . sleep ( 10 )
# Print 128kb.
string = ' 0123456789abcdef ' * ( 8 * 1024 )
sys . stdout . write ( string )
if options . read :
try :
while sys . stdin . read ( ) :
pass
except OSError :
pass
return options . return_value
return options . return_value
if __name__ == ' __main__ ' :
if __name__ == ' __main__ ' :
logging . basicConfig ( level =
[ logging . WARNING , logging . INFO , logging . DEBUG ] [
min ( 2 , sys . argv . count ( ' -v ' ) ) ] )
if len ( sys . argv ) > 1 and sys . argv [ 1 ] == ' --child ' :
if len ( sys . argv ) > 1 and sys . argv [ 1 ] == ' --child ' :
sys . exit ( child_main ( sys . argv [ 2 : ] ) )
sys . exit ( child_main ( sys . argv [ 2 : ] ) )
unittest . main ( )
unittest . main ( )