depot_tools: add create_temp_file.py
It takes text from stdin and dump the read text to a temp file. Bug: 428193406 Change-Id: I268dc4905e157d619d1af7d0b96f711f224f39db Reviewed-on: https://chromium-review.googlesource.com/c/chromium/tools/depot_tools/+/6789063 Reviewed-by: Gavin Mak <gavinmak@google.com> Commit-Queue: Scott Lee <ddoman@chromium.org>changes/63/6789063/4
parent
435e3b303e
commit
4fd484473c
@ -0,0 +1,40 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
|
||||
def run(prefix=None, suffix=None):
|
||||
""" Reads text from stdin, writes it to a temporary file.
|
||||
|
||||
The path of the temporary file is printed to stdout on success.
|
||||
"""
|
||||
try:
|
||||
with tempfile.NamedTemporaryFile(mode='w+',
|
||||
delete=False,
|
||||
encoding='utf-8',
|
||||
prefix=prefix,
|
||||
suffix=suffix) as temp_file:
|
||||
for line in sys.stdin:
|
||||
temp_file.write(line)
|
||||
temp_file.flush()
|
||||
temp_file_path = temp_file.name
|
||||
|
||||
print(temp_file_path)
|
||||
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Writes stdin to a temporary file and then print the path.",
|
||||
)
|
||||
parser.add_argument('--prefix',
|
||||
help="Optional prefix for the temporary file name.")
|
||||
parser.add_argument('--suffix',
|
||||
help="Optional suffix for the temporary file name.")
|
||||
args = parser.parse_args()
|
||||
run(prefix=args.prefix, suffix=args.suffix)
|
||||
@ -0,0 +1,116 @@
|
||||
#!/usr/bin/env vpython3
|
||||
# Copyright (c) 2022 The Chromium Authors. All rights reserved.
|
||||
# Use of this source code is governed by a BSD-style license that can be
|
||||
# found in the LICENSE file.
|
||||
|
||||
import argparse
|
||||
import unittest
|
||||
import sys
|
||||
import os
|
||||
import io
|
||||
import tempfile
|
||||
from unittest.mock import patch
|
||||
|
||||
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
sys.path.insert(0, ROOT_DIR)
|
||||
|
||||
import create_temp_file
|
||||
|
||||
|
||||
class TestCreateTempFileFromStdin(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test environment: capture stdout and stderr."""
|
||||
self.held_stdout = sys.stdout
|
||||
self.held_stderr = sys.stderr
|
||||
self.new_stdout = io.StringIO()
|
||||
self.new_stderr = io.StringIO()
|
||||
sys.stdout = self.new_stdout
|
||||
sys.stderr = self.new_stderr
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up test environment: restore stdout and stderr."""
|
||||
sys.stdout = self.held_stdout
|
||||
sys.stderr = self.held_stderr
|
||||
|
||||
def _read_content_and_delete_file(self, file_path):
|
||||
"""Helper to read content of a temporary file."""
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
ret = f.read()
|
||||
os.remove(file_path)
|
||||
return ret
|
||||
|
||||
@patch('sys.stdin', new_callable=io.StringIO)
|
||||
def test_basic_text_input(self, mock_stdin):
|
||||
"""Test with basic multi-line text input."""
|
||||
test_content = "Hello, world!\nThis is a test.\n"
|
||||
mock_stdin.write(test_content)
|
||||
mock_stdin.seek(0) # Rewind the mock stdin to the beginning
|
||||
|
||||
create_temp_file.run()
|
||||
|
||||
# Get the output printed to stdout (the temp file path)
|
||||
output = self.new_stdout.getvalue().strip()
|
||||
self.assertTrue(os.path.exists(output)) # Check if file was created
|
||||
|
||||
# Verify content
|
||||
file_content = self._read_content_and_delete_file(output)
|
||||
self.assertEqual(self.new_stderr.getvalue(), "") # No errors on stderr
|
||||
self.assertEqual(file_content, test_content)
|
||||
|
||||
@patch('sys.stdin', new_callable=io.StringIO)
|
||||
def test_empty_input(self, mock_stdin):
|
||||
"""Test with empty input from stdin."""
|
||||
test_content = ""
|
||||
mock_stdin.write(test_content)
|
||||
mock_stdin.seek(0)
|
||||
|
||||
create_temp_file.run()
|
||||
output = self.new_stdout.getvalue().strip()
|
||||
self.assertTrue(os.path.exists(output)) # File should still be created
|
||||
file_content = self._read_content_and_delete_file(output)
|
||||
self.assertEqual(file_content, test_content)
|
||||
self.assertFalse(os.path.exists(output))
|
||||
self.assertEqual(self.new_stderr.getvalue(), "")
|
||||
|
||||
@patch('sys.stdin', new_callable=io.StringIO)
|
||||
def test_with_prefix(self, mock_stdin):
|
||||
"""Test if the --prefix argument is respected."""
|
||||
test_content = "Prefix test content.\n"
|
||||
mock_stdin.write(test_content)
|
||||
mock_stdin.seek(0)
|
||||
|
||||
# Simulate argparse.args
|
||||
mock_args = argparse.Namespace(prefix="myprefix_", suffix=None)
|
||||
with patch('argparse.ArgumentParser.parse_args',
|
||||
return_value=mock_args):
|
||||
create_temp_file.run(prefix=mock_args.prefix)
|
||||
|
||||
output = self.new_stdout.getvalue().strip()
|
||||
self.assertTrue(os.path.exists(output))
|
||||
self.assertTrue(os.path.basename(output).startswith("myprefix_"))
|
||||
file_content = self._read_content_and_delete_file(output)
|
||||
self.assertEqual(file_content, test_content)
|
||||
|
||||
@patch('sys.stdin', new_callable=io.StringIO)
|
||||
def test_with_suffix(self, mock_stdin):
|
||||
"""Test if the --suffix argument is respected."""
|
||||
test_content = "Suffix test content.\n"
|
||||
mock_stdin.write(test_content)
|
||||
mock_stdin.seek(0)
|
||||
|
||||
# Simulate argparse.args
|
||||
mock_args = argparse.Namespace(prefix=None, suffix=".log")
|
||||
with patch('argparse.ArgumentParser.parse_args',
|
||||
return_value=mock_args):
|
||||
create_temp_file.run(suffix=mock_args.suffix)
|
||||
|
||||
output = self.new_stdout.getvalue().strip()
|
||||
self.assertTrue(os.path.exists(output))
|
||||
self.assertTrue(os.path.basename(output).endswith(".log"))
|
||||
file_content = self._read_content_and_delete_file(output)
|
||||
self.assertEqual(file_content, test_content)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Loading…
Reference in New Issue