From a620f2bae82e11cd554325f340946528061854aa Mon Sep 17 00:00:00 2001 From: Andy Mikhailenko Date: Sun, 22 Oct 2023 16:46:33 +0200 Subject: [PATCH] feat: support realtime output through a pipe (fixes #145) (#202) If the output stream is not a terminal (i.e. redirected to a file or another process), it's probably buffered. In most cases it doesn't matter. However, if the output of your program is generated with delays between the lines and you may want to redirect them to another process and immediately see the results (e.g. `my_app.py | grep something`), it's a good idea to force flushing of the buffer. To do so, set `dispatch(..., always_flush=True)`. --- CHANGES.rst | 7 +++++++ src/argh/dispatching.py | 36 ++++++++++++++++++++++++++++++++++-- tests/test_dispatching.py | 20 ++++++++++++++++++++ 3 files changed, 61 insertions(+), 2 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 654cc5e..cc973f3 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,6 +2,13 @@ Changelog ~~~~~~~~~ +Version 0.31.0 +-------------- + +Enhancements: + +- Added `always_flush` argument to `dispatch()` (issue #145) + Version 0.30.0 -------------- diff --git a/src/argh/dispatching.py b/src/argh/dispatching.py index fb4b085..a983f7b 100644 --- a/src/argh/dispatching.py +++ b/src/argh/dispatching.py @@ -79,6 +79,7 @@ def dispatch( raw_output: bool = False, namespace: Optional[argparse.Namespace] = None, skip_unknown_args: bool = False, + always_flush: bool = False, ) -> Optional[str]: """ Parses given list of arguments using given parser, calls the relevant @@ -142,6 +143,19 @@ def dispatch( that support for combined default and nested functions may be broken if a different type of object is forced. + :param always_flush: + + If the output stream is not a terminal (i.e. redirected to a file or + another process), it's probably buffered. In most cases it doesn't + matter. + + However, if the output of your program is generated with delays + between the lines and you may want to redirect them to another process + and immediately see the results (e.g. `my_app.py | grep something`), + it's a good idea to force flushing of the buffer. + + .. versionadded:: 0.31 + By default the exceptions are not wrapped and will propagate. The only exception that is always wrapped is :class:`~argh.exceptions.CommandError` which is interpreted as an expected event so the traceback is hidden. @@ -182,6 +196,7 @@ def dispatch( output_file=output_file, errors_file=errors_file, raw_output=raw_output, + always_flush=always_flush, ) @@ -225,20 +240,29 @@ def run_endpoint_function( output_file: IO = sys.stdout, errors_file: IO = sys.stderr, raw_output: bool = False, + always_flush: bool = False, ) -> Optional[str]: """ .. versionadded:: 0.30 Extracts arguments from the namespace object, calls the endpoint function and processes its output. + + :param always_flush: + + Flush the buffer after every line even if `output_file` is not a TTY. + Turn this off if you don't need dynamic output) """ lines = _execute_command(function, namespace_obj, errors_file) - return _process_command_output(lines, output_file, raw_output) + return _process_command_output(lines, output_file, raw_output, always_flush) def _process_command_output( - lines: Iterator[str], output_file: Optional[IO], raw_output: bool + lines: Iterator[str], + output_file: Optional[IO], + raw_output: bool, + always_flush: bool, ) -> Optional[str]: out_io: IO @@ -260,6 +284,14 @@ def _process_command_output( # in most cases user wants one message per line out_io.write("\n") + # If it's not a terminal (i.e. redirected to a file or another + # process), it's probably buffered. In most cases it doesn't matter + # but if the output is generated with delays between the lines and we + # may want to monitor it (e.g. `my_app.py | grep something`), it's a + # good idea to force flushing. + if always_flush: + out_io.flush() + if output_file is None: # user wanted a string; return contents of our temporary file-like obj out_io.seek(0) diff --git a/tests/test_dispatching.py b/tests/test_dispatching.py index 987b8c8..38fbd1c 100644 --- a/tests/test_dispatching.py +++ b/tests/test_dispatching.py @@ -54,6 +54,23 @@ def func(): mock_dispatch.assert_called_with(mock_parser) +@pytest.mark.parametrize("always_flush", [True, False]) +def test_run_endpoint_function__always_flush(always_flush): + def func(): + return ["first line", "second line"] + + out_io = Mock(spec=io.StringIO) + + argh.dispatching.run_endpoint_function( + func, argparse.Namespace(), output_file=out_io, always_flush=always_flush + ) + + if always_flush: + assert out_io.flush.call_count == 2 + else: + out_io.flush.assert_not_called() + + @patch("argh.dispatching.parse_and_resolve") @patch("argh.dispatching.run_endpoint_function") def test_dispatch_command_two_stage(mock_run_endpoint_function, mock_parse_and_resolve): @@ -68,6 +85,7 @@ def func() -> str: mock_errors_file = Mock(io.TextIOBase) raw_output = False skip_unknown_args = False + always_flush = True mock_endpoint_function = Mock() mock_namespace = Mock(argparse.Namespace) mock_namespace_obj = Mock(argparse.Namespace) @@ -83,6 +101,7 @@ def func() -> str: output_file=mock_output_file, errors_file=mock_errors_file, raw_output=raw_output, + always_flush=always_flush, ) mock_parse_and_resolve.assert_called_with( @@ -98,6 +117,7 @@ def func() -> str: output_file=mock_output_file, errors_file=mock_errors_file, raw_output=raw_output, + always_flush=always_flush, ) assert retval == "run_endpoint_function retval"