From b9552596367ca05df42013f294959977f50fd9fb Mon Sep 17 00:00:00 2001
From: Kevin Van Brunt <kmvanbrunt@gmail.com>
Date: Thu, 10 Oct 2024 17:07:11 -0400
Subject: [PATCH] Fixed bug where async_alert() overwrites readline's
 incremental and non-incremental search prompts.

---
 CHANGELOG.md               |  4 +++
 cmd2/ansi.py               |  6 ++---
 cmd2/cmd2.py               | 50 +++++++++++++++++++++++++++---------
 cmd2/rl_utils.py           | 52 ++++++++++++++++++++++++++++++++++++--
 examples/async_printing.py |  5 ++--
 5 files changed, 98 insertions(+), 19 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1dde4766..aad25891 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,10 @@
 * Bug Fixes
   * Fixed issue where persistent history file was not saved upon SIGHUP and SIGTERM signals.
   * Multiline commands are no longer fragmented in up-arrow history.
+  * Fixed bug where `async_alert()` overwrites readline's incremental and non-incremental search prompts.
+    * This fix introduces behavior where an updated prompt won't display after an aborted search
+      until a user presses Enter. See [async_printing.py](https://github.com/python-cmd2/cmd2/blob/master/examples/async_printing.py)
+      example for how to handle this case using `Cmd.need_prompt_refresh()`.
 * Enhancements
   * Removed dependency on `attrs` and replaced with [dataclasses](https://docs.python.org/3/library/dataclasses.html)
   * add `allow_clipboard` initialization parameter and attribute to disable ability to
diff --git a/cmd2/ansi.py b/cmd2/ansi.py
index 62b85384..93c2f019 100644
--- a/cmd2/ansi.py
+++ b/cmd2/ansi.py
@@ -1071,9 +1071,9 @@ def async_alert_str(*, terminal_columns: int, prompt: str, line: str, cursor_off
     # Calculate how many terminal lines are taken up by all prompt lines except for the last one.
     # That will be included in the input lines calculations since that is where the cursor is.
     num_prompt_terminal_lines = 0
-    for line in prompt_lines[:-1]:
-        line_width = style_aware_wcswidth(line)
-        num_prompt_terminal_lines += int(line_width / terminal_columns) + 1
+    for prompt_line in prompt_lines[:-1]:
+        prompt_line_width = style_aware_wcswidth(prompt_line)
+        num_prompt_terminal_lines += int(prompt_line_width / terminal_columns) + 1
 
     # Now calculate how many terminal lines are take up by the input
     last_prompt_line = prompt_lines[-1]
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py
index 79fd2bf2..aec77f1b 100644
--- a/cmd2/cmd2.py
+++ b/cmd2/cmd2.py
@@ -130,8 +130,10 @@
 from .rl_utils import (
     RlType,
     rl_escape_prompt,
+    rl_get_display_prompt,
     rl_get_point,
     rl_get_prompt,
+    rl_in_search_mode,
     rl_set_prompt,
     rl_type,
     rl_warning,
@@ -3295,6 +3297,12 @@ def _set_up_cmd2_readline(self) -> _SavedReadlineSettings:
         """
         readline_settings = _SavedReadlineSettings()
 
+        if rl_type == RlType.GNU:
+            # To calculate line count when printing async_alerts, we rely on commands wider than
+            # the terminal to wrap across multiple lines. The default for horizontal-scroll-mode
+            # is "off" but a user may have overridden it in their readline initialization file.
+            readline.parse_and_bind("set horizontal-scroll-mode off")
+
         if self._completion_supported():
             # Set up readline for our tab completion needs
             if rl_type == RlType.GNU:
@@ -5270,6 +5278,26 @@ class TestMyAppCase(Cmd2TestCase):
             # Return a failure error code to support automated transcript-based testing
             self.exit_code = 1
 
+    def need_prompt_refresh(self, new_prompt: str) -> bool:  # pragma: no cover
+        """
+        Check if the onscreen prompt needs to be refreshed.
+
+        There are two cases when the onscreen prompt needs to be refreshed.
+        1. self.prompt differs from the new prompt.
+        2. readline's prompt differs from the new prompt.
+
+           This case occurs when async_alert() is called while a user is in search mode (e.g. Ctrl-r).
+           To prevent overwriting readline's onscreen search prompt, async_alert() does not update
+           readline's saved prompt value. Therefore when a user aborts a search, the old prompt
+           is still on screen until they press Enter or another call to async_alert() is made.
+
+        This function is a convenient way for an async alert thread to check both cases.
+
+        :param new_prompt: the new prompt string
+        :return: True if the onscreen prompt needs to be refreshed, otherwise False.
+        """
+        return new_prompt != self.prompt or new_prompt != rl_get_prompt()
+
     def async_alert(self, alert_msg: str, new_prompt: Optional[str] = None) -> None:  # pragma: no cover
         """
         Display an important message to the user while they are at a command line prompt.
@@ -5277,7 +5305,7 @@ def async_alert(self, alert_msg: str, new_prompt: Optional[str] = None) -> None:
         text and cursor location is left alone.
 
         IMPORTANT: This function will not print an alert unless it can acquire self.terminal_lock to ensure
-                   a prompt is onscreen. Therefore, it is best to acquire the lock before calling this function
+                   a prompt is on screen. Therefore, it is best to acquire the lock before calling this function
                    to guarantee the alert prints and to avoid raising a RuntimeError.
 
                    This function is only needed when you need to print an alert while the main thread is blocking
@@ -5309,20 +5337,21 @@ def async_alert(self, alert_msg: str, new_prompt: Optional[str] = None) -> None:
             if new_prompt is not None:
                 self.prompt = new_prompt
 
-            # Check if the prompt to display has changed from what's currently displayed
-            cur_onscreen_prompt = rl_get_prompt()
-            new_onscreen_prompt = self.continuation_prompt if self._at_continuation_prompt else self.prompt
-
-            if new_onscreen_prompt != cur_onscreen_prompt:
-                update_terminal = True
+            # We won't change the onscreen prompt while readline is in search mode (e.g. Ctrl-r).
+            # Otherwise the new prompt will overwrite readline's onscreen search prompt.
+            if not rl_in_search_mode():
+                new_onscreen_prompt = self.continuation_prompt if self._at_continuation_prompt else self.prompt
+                if self.need_prompt_refresh(new_onscreen_prompt):
+                    update_terminal = True
+                    rl_set_prompt(new_onscreen_prompt)
 
             if update_terminal:
                 import shutil
 
-                # Generate the string which will replace the current prompt and input lines with the alert
+                # Print a string which replaces the current prompt and input lines with the alert
                 terminal_str = ansi.async_alert_str(
                     terminal_columns=shutil.get_terminal_size().columns,
-                    prompt=cur_onscreen_prompt,
+                    prompt=rl_get_display_prompt(),
                     line=readline.get_line_buffer(),
                     cursor_offset=rl_get_point(),
                     alert_msg=alert_msg,
@@ -5333,9 +5362,6 @@ def async_alert(self, alert_msg: str, new_prompt: Optional[str] = None) -> None:
                 elif rl_type == RlType.PYREADLINE:
                     readline.rl.mode.console.write(terminal_str)
 
-                # Update Readline's prompt before we redraw it
-                rl_set_prompt(new_onscreen_prompt)
-
                 # Redraw the prompt and input lines below the alert
                 rl_force_redisplay()
 
diff --git a/cmd2/rl_utils.py b/cmd2/rl_utils.py
index 28d9d2d6..9eb8c2c6 100644
--- a/cmd2/rl_utils.py
+++ b/cmd2/rl_utils.py
@@ -200,7 +200,7 @@ def rl_get_point() -> int:  # pragma: no cover
 
 
 def rl_get_prompt() -> str:  # pragma: no cover
-    """Gets Readline's current prompt"""
+    """Get Readline's prompt"""
     if rl_type == RlType.GNU:
         encoded_prompt = ctypes.c_char_p.in_dll(readline_lib, "rl_prompt").value
         if encoded_prompt is None:
@@ -221,6 +221,24 @@ def rl_get_prompt() -> str:  # pragma: no cover
     return rl_unescape_prompt(prompt)
 
 
+def rl_get_display_prompt() -> str:  # pragma: no cover
+    """
+    Get Readline's currently displayed prompt.
+
+    In GNU Readline, the displayed prompt sometimes differs from the prompt.
+    This occurs in functions that use the prompt string as a message area, such as incremental search.
+    """
+    if rl_type == RlType.GNU:
+        encoded_prompt = ctypes.c_char_p.in_dll(readline_lib, "rl_display_prompt").value
+        if encoded_prompt is None:
+            prompt = ''
+        else:
+            prompt = encoded_prompt.decode(encoding='utf-8')
+        return rl_unescape_prompt(prompt)
+    else:
+        return rl_get_prompt()
+
+
 def rl_set_prompt(prompt: str) -> None:  # pragma: no cover
     """
     Sets Readline's prompt
@@ -237,7 +255,8 @@ def rl_set_prompt(prompt: str) -> None:  # pragma: no cover
 
 
 def rl_escape_prompt(prompt: str) -> str:
-    """Overcome bug in GNU Readline in relation to calculation of prompt length in presence of ANSI escape codes
+    """
+    Overcome bug in GNU Readline in relation to calculation of prompt length in presence of ANSI escape codes
 
     :param prompt: original prompt
     :return: prompt safe to pass to GNU Readline
@@ -276,3 +295,32 @@ def rl_unescape_prompt(prompt: str) -> str:
         prompt = prompt.replace(escape_start, "").replace(escape_end, "")
 
     return prompt
+
+
+def rl_in_search_mode() -> bool:
+    """Check if readline is doing either an incremental (e.g. Ctrl-r) or non-incremental (e.g. Esc-p) search"""
+    if rl_type == RlType.GNU:
+        # GNU Readline defines constants that we can use to determine if in search mode.
+        #     RL_STATE_ISEARCH    0x0000080
+        #     RL_STATE_NSEARCH    0x0000100
+        IN_SEARCH_MODE = 0x0000180
+
+        readline_state = ctypes.c_int.in_dll(readline_lib, "rl_readline_state").value
+        return bool(IN_SEARCH_MODE & readline_state)
+    elif rl_type == RlType.PYREADLINE:
+        from pyreadline3.modes.emacs import (  # type: ignore[import]
+            EmacsMode,
+        )
+
+        # These search modes only apply to Emacs mode, which is the default.
+        if not isinstance(readline.rl.mode, EmacsMode):
+            return False
+
+        # While in search mode, the current keyevent function is set one of the following.
+        search_funcs = (
+            readline.rl.mode._process_incremental_search_keyevent,
+            readline.rl.mode._process_non_incremental_search_keyevent,
+        )
+        return readline.rl.mode.process_keyevent_queue[-1] in search_funcs
+    else:
+        return False
diff --git a/examples/async_printing.py b/examples/async_printing.py
index 6ff3a262..c5655313 100755
--- a/examples/async_printing.py
+++ b/examples/async_printing.py
@@ -189,8 +189,9 @@ def _alerter_thread_func(self) -> None:
                     new_title = "Alerts Printed: {}".format(self._alert_count)
                     self.set_window_title(new_title)
 
-                # No alerts needed to be printed, check if the prompt changed
-                elif new_prompt != self.prompt:
+                # There are no alerts to print, but we should still check
+                # if the onscreen prompt needs to be refreshed.
+                elif self.need_prompt_refresh(new_prompt):
                     self.async_update_prompt(new_prompt)
 
                 # Don't forget to release the lock