Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support realtime output through a pipe (fixes #145) #202

Merged
merged 1 commit into from
Oct 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
Changelog
~~~~~~~~~

Version 0.31.0
--------------

Enhancements:

- Added `always_flush` argument to `dispatch()` (issue #145)

Version 0.30.0
--------------

Expand Down
36 changes: 34 additions & 2 deletions src/argh/dispatching.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def dispatch(
raw_output: bool = False,
namespace: Optional[argparse.Namespace] = None,
skip_unknown_args: bool = False,
always_flush: bool = False,
) -> Optional[str]:
"""
Parses given list of arguments using given parser, calls the relevant
Expand Down Expand Up @@ -142,6 +143,19 @@ def dispatch(
that support for combined default and nested functions may be broken
if a different type of object is forced.

:param always_flush:

If the output stream is not a terminal (i.e. redirected to a file or
another process), it's probably buffered. In most cases it doesn't
matter.

However, if the output of your program is generated with delays
between the lines and you may want to redirect them to another process
and immediately see the results (e.g. `my_app.py | grep something`),
it's a good idea to force flushing of the buffer.

.. versionadded:: 0.31

By default the exceptions are not wrapped and will propagate. The only
exception that is always wrapped is :class:`~argh.exceptions.CommandError`
which is interpreted as an expected event so the traceback is hidden.
Expand Down Expand Up @@ -182,6 +196,7 @@ def dispatch(
output_file=output_file,
errors_file=errors_file,
raw_output=raw_output,
always_flush=always_flush,
)


Expand Down Expand Up @@ -225,20 +240,29 @@ def run_endpoint_function(
output_file: IO = sys.stdout,
errors_file: IO = sys.stderr,
raw_output: bool = False,
always_flush: bool = False,
) -> Optional[str]:
"""
.. versionadded:: 0.30

Extracts arguments from the namespace object, calls the endpoint function
and processes its output.

:param always_flush:

Flush the buffer after every line even if `output_file` is not a TTY.
Turn this off if you don't need dynamic output)
"""
lines = _execute_command(function, namespace_obj, errors_file)

return _process_command_output(lines, output_file, raw_output)
return _process_command_output(lines, output_file, raw_output, always_flush)


def _process_command_output(
lines: Iterator[str], output_file: Optional[IO], raw_output: bool
lines: Iterator[str],
output_file: Optional[IO],
raw_output: bool,
always_flush: bool,
) -> Optional[str]:
out_io: IO

Expand All @@ -260,6 +284,14 @@ def _process_command_output(
# in most cases user wants one message per line
out_io.write("\n")

# If it's not a terminal (i.e. redirected to a file or another
# process), it's probably buffered. In most cases it doesn't matter
# but if the output is generated with delays between the lines and we
# may want to monitor it (e.g. `my_app.py | grep something`), it's a
# good idea to force flushing.
if always_flush:
out_io.flush()

if output_file is None:
# user wanted a string; return contents of our temporary file-like obj
out_io.seek(0)
Expand Down
20 changes: 20 additions & 0 deletions tests/test_dispatching.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,23 @@ def func():
mock_dispatch.assert_called_with(mock_parser)


@pytest.mark.parametrize("always_flush", [True, False])
def test_run_endpoint_function__always_flush(always_flush):
def func():
return ["first line", "second line"]

out_io = Mock(spec=io.StringIO)

argh.dispatching.run_endpoint_function(
func, argparse.Namespace(), output_file=out_io, always_flush=always_flush
)

if always_flush:
assert out_io.flush.call_count == 2
else:
out_io.flush.assert_not_called()


@patch("argh.dispatching.parse_and_resolve")
@patch("argh.dispatching.run_endpoint_function")
def test_dispatch_command_two_stage(mock_run_endpoint_function, mock_parse_and_resolve):
Expand All @@ -68,6 +85,7 @@ def func() -> str:
mock_errors_file = Mock(io.TextIOBase)
raw_output = False
skip_unknown_args = False
always_flush = True
mock_endpoint_function = Mock()
mock_namespace = Mock(argparse.Namespace)
mock_namespace_obj = Mock(argparse.Namespace)
Expand All @@ -83,6 +101,7 @@ def func() -> str:
output_file=mock_output_file,
errors_file=mock_errors_file,
raw_output=raw_output,
always_flush=always_flush,
)

mock_parse_and_resolve.assert_called_with(
Expand All @@ -98,6 +117,7 @@ def func() -> str:
output_file=mock_output_file,
errors_file=mock_errors_file,
raw_output=raw_output,
always_flush=always_flush,
)
assert retval == "run_endpoint_function retval"

Expand Down