@ -5,18 +5,12 @@
""" Unit tests for subprocess2.py. """
import logging
import optparse
import os
import sys
import time
import unittest
try :
import fcntl
except ImportError :
fcntl = None
ROOT_DIR = os . path . dirname ( os . path . dirname ( os . path . abspath ( __file__ ) ) )
sys . path . insert ( 0 , ROOT_DIR )
@ -25,25 +19,7 @@ import subprocess2
# Method could be a function
# pylint: disable=R0201
def convert_to_crlf ( string ) :
""" Unconditionally convert LF to CRLF. """
return string . replace ( ' \n ' , ' \r \n ' )
def convert_to_cr ( string ) :
""" Unconditionally convert LF to CR. """
return string . replace ( ' \n ' , ' \r ' )
def convert_win ( string ) :
""" Converts string to CRLF on Windows only. """
if sys . platform == ' win32 ' :
return string . replace ( ' \n ' , ' \r \n ' )
return string
class DefaultsTest ( unittest . TestCase ) :
class Subprocess2Test ( unittest . TestCase ) :
# Can be mocked in a test.
TO_SAVE = {
subprocess2 : [
@ -52,6 +28,8 @@ class DefaultsTest(unittest.TestCase):
}
def setUp ( self ) :
self . exe_path = __file__
self . exe = [ sys . executable , self . exe_path , ' --child ' ]
self . saved = { }
for module , names in self . TO_SAVE . iteritems ( ) :
self . saved [ module ] = dict (
@ -168,67 +146,14 @@ class DefaultsTest(unittest.TestCase):
}
self . assertEquals ( expected , results )
def test_timeout_shell_throws ( self ) :
# Never called.
_ = self . _fake_Popen ( )
try :
subprocess2 . communicate (
sys . executable ,
timeout = 0.01 ,
stdout = subprocess2 . PIPE ,
shell = True )
self . fail ( )
except TypeError :
pass
class S2Test ( unittest . TestCase ) :
def setUp ( self ) :
super ( S2Test , self ) . setUp ( )
self . exe_path = __file__
self . exe = [ sys . executable , self . exe_path , ' --child ' ]
self . states = { }
if fcntl :
for v in ( sys . stdin , sys . stdout , sys . stderr ) :
fileno = v . fileno ( )
self . states [ fileno ] = fcntl . fcntl ( fileno , fcntl . F_GETFL )
def tearDown ( self ) :
for fileno , fl in self . states . iteritems ( ) :
self . assertEquals ( fl , fcntl . fcntl ( fileno , fcntl . F_GETFL ) )
super ( S2Test , self ) . tearDown ( )
def _run_test ( self , function ) :
""" Runs tests in 6 combinations:
- LF output with universal_newlines = False
- CR output with universal_newlines = False
- CRLF output with universal_newlines = False
- LF output with universal_newlines = True
- CR output with universal_newlines = True
- CRLF output with universal_newlines = True
First | function | argument is the convertion for the origianl expected LF
string to the right EOL .
Second | function | argument is the executable and initial flag to run , to
control what EOL is used by the child process .
Third | function | argument is universal_newlines value .
"""
noop = lambda x : x
function ( noop , self . exe , False )
function ( convert_to_cr , self . exe + [ ' --cr ' ] , False )
function ( convert_to_crlf , self . exe + [ ' --crlf ' ] , False )
function ( noop , self . exe , True )
function ( noop , self . exe + [ ' --cr ' ] , True )
function ( noop , self . exe + [ ' --crlf ' ] , True )
def test_timeout ( self ) :
# It'd be better to not discard stdout.
out , returncode = subprocess2 . communicate (
self . exe + [ ' --sleep _first ' , ' --stdout ' ] ,
self . exe + [ ' --sleep ' , ' --stdout ' ] ,
timeout = 0.01 ,
stdout = subprocess2 . PIPE ,
shell = False )
stdout = subprocess2 . PIPE )
self . assertEquals ( subprocess2 . TIMED_OUT , returncode )
self . assertEquals ( (' ' , None ) , out )
self . assertEquals ( [ ' ' , None ] , out )
def test_check_output_no_stdout ( self ) :
try :
@ -238,78 +163,68 @@ class S2Test(unittest.TestCase):
pass
def test_stdout_void ( self ) :
def fn ( c , e , un ) :
( out , err ) , code = subprocess2 . communicate (
e + [ ' --stdout ' , ' --stderr ' ] ,
stdout = subprocess2 . VOID ,
stderr = subprocess2 . PIPE ,
universal_newlines = un )
self . assertEquals ( None , out )
self . assertEquals ( c ( ' a \n bb \n ccc \n ' ) , err )
self . assertEquals ( 0 , code )
self . _run_test( fn )
( out , err ) , code = subprocess2 . communicate (
self . exe + [ ' --stdout ' , ' --stderr ' ] ,
stdout = subprocess2 . VOID ,
stderr = subprocess2 . PIPE )
self . assertEquals ( None , out )
expected = ' a \n bb \n ccc \n '
if sys . platform == ' win32 ' :
expected = expected . replace ( ' \n ' , ' \r \n ' )
self . assertEquals ( expected , err )
self . assertEquals( 0 , code )
def test_stderr_void ( self ) :
def fn ( c , e , un ) :
( out , err ) , code = subprocess2 . communicate (
e + [ ' --stdout ' , ' --stderr ' ] ,
stdout = subprocess2 . PIPE ,
stderr = subprocess2 . VOID ,
universal_newlines = un )
self . assertEquals ( c ( ' A \n BB \n CCC \n ' ) , out )
self . assertEquals ( None , err )
self . assertEquals ( 0 , code )
self . _run_test ( fn )
( out , err ) , code = subprocess2 . communicate (
self . exe + [ ' --stdout ' , ' --stderr ' ] ,
universal_newlines = True ,
stdout = subprocess2 . PIPE ,
stderr = subprocess2 . VOID )
self . assertEquals ( ' A \n BB \n CCC \n ' , out )
self . assertEquals ( None , err )
self . assertEquals ( 0 , code )
def test_check_output_throw_stdout ( self ) :
def fn ( c , e , un ) :
try :
subprocess2 . check_output (
e + [ ' --fail ' , ' --stdout ' ] , universal_newlines = un )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( c ( ' A \n BB \n CCC \n ' ) , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
self . _run_test ( fn )
try :
subprocess2 . check_output (
self . exe + [ ' --fail ' , ' --stdout ' ] , universal_newlines = True )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( ' A \n BB \n CCC \n ' , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
def test_check_output_throw_no_stderr ( self ) :
def fn ( c , e , un ) :
try :
subprocess2 . check_output (
e + [ ' --fail ' , ' --stderr ' ] , universal_newlines = un )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( c ( ' ' ) , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
self . _run_test ( fn )
try :
subprocess2 . check_output (
self . exe + [ ' --fail ' , ' --stderr ' ] , universal_newlines = True )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( ' ' , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
def test_check_output_throw_stderr ( self ) :
def fn ( c , e , un ) :
try :
subprocess2 . check_output (
e + [ ' --fail ' , ' --stderr ' ] , stderr = subprocess2 . PIPE ,
universal_newlines = un )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( ' ' , e . stdout )
self . assertEquals ( c ( ' a \n bb \n ccc \n ' ) , e . stderr )
self . assertEquals ( 64 , e . returncode )
self . _run_test ( fn )
try :
subprocess2 . check_output (
self . exe + [ ' --fail ' , ' --stderr ' ] , stderr = subprocess2 . PIPE ,
universal_newlines = True )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( ' ' , e . stdout )
self . assertEquals ( ' a \n bb \n ccc \n ' , e . stderr )
self . assertEquals ( 64 , e . returncode )
def test_check_output_throw_stderr_stdout ( self ) :
def fn ( c , e , un ) :
try :
subprocess2 . check_output (
e + [ ' --fail ' , ' --stderr ' ] , stderr = subprocess2 . STDOUT ,
universal_newlines = un )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( c ( ' a \n bb \n ccc \n ' ) , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
self . _run_test ( fn )
try :
subprocess2 . check_output (
self . exe + [ ' --fail ' , ' --stderr ' ] , stderr = subprocess2 . STDOUT ,
universal_newlines = True )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( ' a \n bb \n ccc \n ' , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
def test_check_call_throw ( self ) :
try :
@ -320,87 +235,8 @@ class S2Test(unittest.TestCase):
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
def test_check_output_tee_stderr ( self ) :
def fn ( c , e , un ) :
stderr = [ ]
out , returncode = subprocess2 . communicate (
e + [ ' --stderr ' ] , stderr = stderr . append ,
universal_newlines = un )
self . assertEquals ( c ( ' a \n bb \n ccc \n ' ) , ' ' . join ( stderr ) )
self . assertEquals ( ( None , None ) , out )
self . assertEquals ( 0 , returncode )
self . _run_test ( fn )
def test_check_output_tee_stdout_stderr ( self ) :
def fn ( c , e , un ) :
stdout = [ ]
stderr = [ ]
out , returncode = subprocess2 . communicate (
e + [ ' --stdout ' , ' --stderr ' ] ,
stdout = stdout . append ,
stderr = stderr . append ,
universal_newlines = un )
self . assertEquals ( c ( ' A \n BB \n CCC \n ' ) , ' ' . join ( stdout ) )
self . assertEquals ( c ( ' a \n bb \n ccc \n ' ) , ' ' . join ( stderr ) )
self . assertEquals ( ( None , None ) , out )
self . assertEquals ( 0 , returncode )
self . _run_test ( fn )
def test_check_output_tee_stdin ( self ) :
def fn ( c , e , un ) :
stdout = [ ]
stdin = ' 0123456789 '
out , returncode = subprocess2 . communicate (
e + [ ' --stdout ' , ' --read ' ] , stdin = stdin , stdout = stdout . append ,
universal_newlines = un )
self . assertEquals ( c ( ' A \n BB \n CCC \n ' ) , ' ' . join ( stdout ) )
self . assertEquals ( ( None , None ) , out )
self . assertEquals ( 0 , returncode )
self . _run_test ( fn )
def test_check_output_tee_throw ( self ) :
def fn ( c , e , un ) :
stderr = [ ]
try :
subprocess2 . check_output (
e + [ ' --stderr ' , ' --fail ' ] , stderr = stderr . append ,
universal_newlines = un )
self . fail ( )
except subprocess2 . CalledProcessError , e :
self . assertEquals ( c ( ' a \n bb \n ccc \n ' ) , ' ' . join ( stderr ) )
self . assertEquals ( ' ' , e . stdout )
self . assertEquals ( None , e . stderr )
self . assertEquals ( 64 , e . returncode )
self . _run_test ( fn )
def test_check_output_tee_large ( self ) :
stdout = [ ]
# Read 128kb. On my workstation it takes >2s. Welcome to 2011.
out , returncode = subprocess2 . communicate (
self . exe + [ ' --large ' ] , stdout = stdout . append )
self . assertEquals ( 128 * 1024 , len ( ' ' . join ( stdout ) ) )
self . assertEquals ( ( None , None ) , out )
self . assertEquals ( 0 , returncode )
def test_check_output_tee_large_stdin ( self ) :
stdout = [ ]
# Write 128kb.
stdin = ' 0123456789abcdef ' * ( 8 * 1024 )
out , returncode = subprocess2 . communicate (
self . exe + [ ' --large ' , ' --read ' ] , stdin = stdin , stdout = stdout . append )
self . assertEquals ( 128 * 1024 , len ( ' ' . join ( stdout ) ) )
self . assertEquals ( ( None , None ) , out )
self . assertEquals ( 0 , returncode )
def child_main ( args ) :
if sys . platform == ' win32 ' :
# Annoying, make sure the output is not translated on Windows.
# pylint: disable=E1101,F0401
import msvcrt
msvcrt . setmode ( sys . stdout . fileno ( ) , os . O_BINARY )
msvcrt . setmode ( sys . stderr . fileno ( ) , os . O_BINARY )
parser = optparse . OptionParser ( )
parser . add_option (
' --fail ' ,
@ -408,52 +244,28 @@ def child_main(args):
action = ' store_const ' ,
default = 0 ,
const = 64 )
parser . add_option (
' --crlf ' , action = ' store_const ' , const = ' \r \n ' , dest = ' eol ' , default = ' \n ' )
parser . add_option (
' --cr ' , action = ' store_const ' , const = ' \r ' , dest = ' eol ' )
parser . add_option ( ' --stdout ' , action = ' store_true ' )
parser . add_option ( ' --stderr ' , action = ' store_true ' )
parser . add_option ( ' --sleep_first ' , action = ' store_true ' )
parser . add_option ( ' --sleep_last ' , action = ' store_true ' )
parser . add_option ( ' --large ' , action = ' store_true ' )
parser . add_option ( ' --read ' , action = ' store_true ' )
parser . add_option ( ' --sleep ' , action = ' store_true ' )
options , args = parser . parse_args ( args )
if args :
parser . error ( ' Internal error ' )
if options . sleep_first :
time . sleep ( 10 )
def do ( string ) :
if options . stdout :
sys . stdout . write ( string . upper ( ) )
sys . stdout . write ( options . eol )
print >> sys . stdout , string . upper ( )
if options . stderr :
sys . stderr . write ( string . lower ( ) )
sys . stderr . write ( options . eol )
print >> sys . stderr , string . lower ( )
do ( ' A ' )
do ( ' BB ' )
do ( ' CCC ' )
if options . large :
# Print 128kb.
string = ' 0123456789abcdef ' * ( 8 * 1024 )
sys . stdout . write ( string )
if options . read :
try :
while sys . stdin . read ( ) :
pass
except OSError :
pass
if options . sleep_last :
if options . sleep :
time . sleep ( 10 )
return options . return_value
if __name__ == ' __main__ ' :
logging . basicConfig ( level =
[ logging . WARNING , logging . INFO , logging . DEBUG ] [
min ( 2 , sys . argv . count ( ' -v ' ) ) ] )
if len ( sys . argv ) > 1 and sys . argv [ 1 ] == ' --child ' :
sys . exit ( child_main ( sys . argv [ 2 : ] ) )
unittest . main ( )