#!/usr/bin/env python # Copyright (c) 2011 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Unit tests for subprocess2.py.""" import optparse import os import sys import time import unittest ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, ROOT_DIR) import subprocess2 # Method could be a function # pylint: disable=R0201 class Subprocess2Test(unittest.TestCase): # Can be mocked in a test. TO_SAVE = { subprocess2: [ 'Popen', 'communicate', 'call', 'check_call', 'capture', 'check_output'], subprocess2.subprocess: ['Popen'], } def setUp(self): self.exe_path = __file__ self.exe = [sys.executable, self.exe_path, '--child'] self.saved = {} for module, names in self.TO_SAVE.iteritems(): self.saved[module] = dict( (name, getattr(module, name)) for name in names) # TODO(maruel): Do a reopen() on sys.__stdout__ and sys.__stderr__ so they # can be trapped in the child process for better coverage. def tearDown(self): for module, saved in self.saved.iteritems(): for name, value in saved.iteritems(): setattr(module, name, value) @staticmethod def _fake_communicate(): results = {} def fake_communicate(args, **kwargs): assert not results results.update(kwargs) results['args'] = args return ['stdout', 'stderr'], 0 subprocess2.communicate = fake_communicate return results @staticmethod def _fake_Popen(): results = {} class fake_Popen(object): returncode = -8 def __init__(self, args, **kwargs): assert not results results.update(kwargs) results['args'] = args def communicate(self): return None, None subprocess2.Popen = fake_Popen return results @staticmethod def _fake_subprocess_Popen(): results = {} class fake_Popen(object): returncode = -8 def __init__(self, args, **kwargs): assert not results results.update(kwargs) results['args'] = args def communicate(self): return None, None subprocess2.subprocess.Popen = fake_Popen return results def test_check_call_defaults(self): results = self._fake_communicate() self.assertEquals( ['stdout', 'stderr'], subprocess2.check_call_out(['foo'], a=True)) expected = { 'args': ['foo'], 'a':True, } self.assertEquals(expected, results) def test_capture_defaults(self): results = self._fake_communicate() self.assertEquals( 'stdout', subprocess2.capture(['foo'], a=True)) expected = { 'args': ['foo'], 'a':True, 'stdin': subprocess2.VOID, 'stdout': subprocess2.PIPE, } self.assertEquals(expected, results) def test_communicate_defaults(self): results = self._fake_Popen() self.assertEquals( ((None, None), -8), subprocess2.communicate(['foo'], a=True)) expected = { 'args': ['foo'], 'a': True, } self.assertEquals(expected, results) def test_Popen_defaults(self): results = self._fake_subprocess_Popen() proc = subprocess2.Popen(['foo'], a=True) self.assertEquals(-8, proc.returncode) expected = { 'args': ['foo'], 'a': True, 'shell': bool(sys.platform=='win32'), } if sys.platform != 'win32': env = os.environ.copy() is_english = lambda name: env.get(name, 'en').startswith('en') if not is_english('LANG'): env['LANG'] = 'en_US.UTF-8' expected['env'] = env if not is_english('LANGUAGE'): env['LANGUAGE'] = 'en_US.UTF-8' expected['env'] = env self.assertEquals(expected, results) def test_check_output_defaults(self): results = self._fake_communicate() # It's discarding 'stderr' because it assumes stderr=subprocess2.STDOUT but # fake_communicate() doesn't 'implement' that. self.assertEquals('stdout', subprocess2.check_output(['foo'], a=True)) expected = { 'args': ['foo'], 'a':True, 'stdin': subprocess2.VOID, 'stdout': subprocess2.PIPE, } self.assertEquals(expected, results) def test_timeout(self): # It'd be better to not discard stdout. out, returncode = subprocess2.communicate( self.exe + ['--sleep', '--stdout'], timeout=0.01, stdout=subprocess2.PIPE) self.assertEquals(subprocess2.TIMED_OUT, returncode) self.assertEquals(['', None], out) def test_check_output_no_stdout(self): try: subprocess2.check_output(self.exe, stdout=subprocess2.PIPE) self.fail() except TypeError: pass def test_stdout_void(self): (out, err), code = subprocess2.communicate( self.exe + ['--stdout', '--stderr'], stdout=subprocess2.VOID, stderr=subprocess2.PIPE) self.assertEquals(None, out) self.assertEquals('a\nbb\nccc\n', err) self.assertEquals(0, code) def test_stderr_void(self): (out, err), code = subprocess2.communicate( self.exe + ['--stdout', '--stderr'], universal_newlines=True, stdout=subprocess2.PIPE, stderr=subprocess2.VOID) self.assertEquals('A\nBB\nCCC\n', out) self.assertEquals(None, err) self.assertEquals(0, code) def test_check_output_throw_stdout(self): try: subprocess2.check_output( self.exe + ['--fail', '--stdout'], universal_newlines=True) self.fail() except subprocess2.CalledProcessError, e: self.assertEquals('A\nBB\nCCC\n', e.stdout) self.assertEquals(None, e.stderr) self.assertEquals(64, e.returncode) def test_check_output_throw_no_stderr(self): try: subprocess2.check_output( self.exe + ['--fail', '--stderr'], universal_newlines=True) self.fail() except subprocess2.CalledProcessError, e: self.assertEquals('', e.stdout) self.assertEquals(None, e.stderr) self.assertEquals(64, e.returncode) def test_check_output_throw_stderr(self): try: subprocess2.check_output( self.exe + ['--fail', '--stderr'], stderr=subprocess2.PIPE, universal_newlines=True) self.fail() except subprocess2.CalledProcessError, e: self.assertEquals('', e.stdout) self.assertEquals('a\nbb\nccc\n', e.stderr) self.assertEquals(64, e.returncode) def test_check_output_throw_stderr_stdout(self): try: subprocess2.check_output( self.exe + ['--fail', '--stderr'], stderr=subprocess2.STDOUT, universal_newlines=True) self.fail() except subprocess2.CalledProcessError, e: self.assertEquals('a\nbb\nccc\n', e.stdout) self.assertEquals(None, e.stderr) self.assertEquals(64, e.returncode) def test_check_call_throw(self): try: subprocess2.check_call(self.exe + ['--fail', '--stderr']) self.fail() except subprocess2.CalledProcessError, e: self.assertEquals(None, e.stdout) self.assertEquals(None, e.stderr) self.assertEquals(64, e.returncode) def child_main(args): parser = optparse.OptionParser() parser.add_option( '--fail', dest='return_value', action='store_const', default=0, const=64) parser.add_option('--stdout', action='store_true') parser.add_option('--stderr', action='store_true') parser.add_option('--sleep', action='store_true') options, args = parser.parse_args(args) if args: parser.error('Internal error') def do(string): if options.stdout: print >> sys.stdout, string.upper() if options.stderr: print >> sys.stderr, string.lower() do('A') do('BB') do('CCC') if options.sleep: time.sleep(10) return options.return_value if __name__ == '__main__': if len(sys.argv) > 1 and sys.argv[1] == '--child': sys.exit(child_main(sys.argv[2:])) unittest.main()