diff --git a/sen/tui/buffer.py b/sen/tui/buffer.py index 281c6d9..d49fdc7 100644 --- a/sen/tui/buffer.py +++ b/sen/tui/buffer.py @@ -1,8 +1,12 @@ import logging + from sen.docker_backend import DockerContainer from sen.exceptions import NotifyError from sen.tui.constants import HELP_TEXT -from sen.tui.widget import AsyncScrollableListBox, MainListBox, ScrollableListBox, get_operation_notify_widget +from sen.tui.widgets.list.main import MainListBox +from sen.tui.widgets.list.util import get_operation_notify_widget +from sen.tui.widgets.list.common import AsyncScrollableListBox, ScrollableListBox + logger = logging.getLogger(__name__) diff --git a/sen/tui/widgets/__init__.py b/sen/tui/widgets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sen/tui/widgets/list/__init__.py b/sen/tui/widgets/list/__init__.py new file mode 100644 index 0000000..712451e --- /dev/null +++ b/sen/tui/widgets/list/__init__.py @@ -0,0 +1,3 @@ +""" +module with widgets for displaying lists of objects +""" \ No newline at end of file diff --git a/sen/tui/widgets/list/base.py b/sen/tui/widgets/list/base.py new file mode 100644 index 0000000..cf127be --- /dev/null +++ b/sen/tui/widgets/list/base.py @@ -0,0 +1,170 @@ +import logging + +import urwid +from sen.exceptions import NotifyError + + +logger = logging.getLogger(__name__) + + +class WidgetBase(urwid.ListBox): + """ + common class fot widgets + """ + + def __init__(self, *args, **kwargs): + self.search_string = None + self.filter_query = None + super().__init__(*args, **kwargs) + self.ro_content = self.body[:] # unfiltered content of a widget + + def set_body(self, widgets): + self.body[:] = widgets + + def reload_widget(self): + # this is the easiest way to refresh body + self.body[:] = self.body + + def _search(self, reverse_search=False): + if self.search_string is None: + raise NotifyError("No search pattern specified.") + if not self.search_string: + self.search_string = None + return + pos = self.focus_position + original_position = pos + wrapped = False + while True: + if reverse_search: + obj, pos = self.body.get_prev(pos) + else: + obj, pos = self.body.get_next(pos) + if obj is None: + # wrap + wrapped = True + if reverse_search: + obj, pos = self.body[-1], len(self.body) + else: + obj, pos = self.body[0], 0 + if wrapped and ( + (pos > original_position and not reverse_search) or + (pos < original_position and reverse_search) + ): + self.search_string = None + raise NotifyError("Pattern not found: %r." % self.search_string) + # FIXME: figure out nicer search api + if hasattr(obj, "matches_search"): + condition = obj.matches_search(self.search_string) + else: + condition = self.search_string in obj.original_widget.text + if condition: + self.set_focus(pos) + self.reload_widget() + break + + def filter(self, s, widgets_to_filter=None): + s = s.strip() + + if not s: + self.filter_query = None + self.body[:] = self.ro_content + return + + widgets = [] + for obj in widgets_to_filter or self.ro_content: + + # FIXME: figure out nicer search api + if hasattr(obj, "matches_search"): + condition = obj.matches_search(s) + else: + condition = s in obj.original_widget.text + + if condition: + widgets.append(obj) + if not widgets_to_filter: + self.filter_query = s + return widgets + + def find_previous(self, search_pattern=None): + if search_pattern is not None: + self.search_string = search_pattern + self._search(reverse_search=True) + + def find_next(self, search_pattern=None): + if search_pattern is not None: + self.search_string = search_pattern + self._search() + + def status_bar(self): + columns_list = [] + + def add_subwidget(markup, color_attr=None): + if color_attr is None: + w = urwid.AttrMap(urwid.Text(markup), "status_text") + else: + w = urwid.AttrMap(urwid.Text(markup), color_attr) + columns_list.append((len(markup), w)) + + if self.search_string: + add_subwidget("Search: ") + add_subwidget(repr(self.search_string)) + + if self.search_string and self.filter_query: + add_subwidget(", ") + + if self.filter_query: + add_subwidget("Filter: ") + add_subwidget(repr(self.filter_query)) + + return columns_list + + +class VimMovementListBox(WidgetBase): + """ + ListBox with vim-like movement which can be inherited in other widgets + """ + + def __init__(self, *args, **kwargs): + # we want "gg"! + self.cached_key = None + super().__init__(*args, **kwargs) + + def keypress(self, size, key): + logger.debug("VimListBox keypress %r", key) + + # FIXME: workaround so we allow "gg" only, and not "g*" + if self.cached_key == "g" and key != "g": + self.cached_key = None + + if key == "j": + return super().keypress(size, "down") + elif key == "k": + return super().keypress(size, "up") + elif key == "ctrl d": + try: + self.set_focus(self.get_focus()[1] + 10) + except IndexError: + self.set_focus(len(self.body) - 1) + self.reload_widget() + return + elif key == "ctrl u": + try: + self.set_focus(self.get_focus()[1] - 10) + except IndexError: + self.set_focus(0) + self.reload_widget() + return + elif key == "G": + self.set_focus(len(self.body) - 1) + self.body[:] = self.body + return + elif key == "g": + if self.cached_key is None: + self.cached_key = "g" + elif self.cached_key == "g": + self.set_focus(0) + self.reload_widget() + self.cached_key = None + return + key = super().keypress(size, key) + return key diff --git a/sen/tui/widgets/list/common.py b/sen/tui/widgets/list/common.py new file mode 100644 index 0000000..5931d8b --- /dev/null +++ b/sen/tui/widgets/list/common.py @@ -0,0 +1,59 @@ +import logging +import threading + +import urwid + +from sen.tui.widgets.list.base import VimMovementListBox +from sen.util import _ensure_unicode + + +logger = logging.getLogger(__name__) + + +class ScrollableListBox(VimMovementListBox): + def __init__(self, text): + text = _ensure_unicode(text) + list_of_texts = text.split("\n") + self.walker = urwid.SimpleFocusListWalker([ + urwid.AttrMap(urwid.Text(t, align="left", wrap="any"), "main_list_dg", "main_list_white") + for t in list_of_texts + ]) + super().__init__(self.walker) + + +class AsyncScrollableListBox(VimMovementListBox): + def __init__(self, generator, ui, static_data=None): + self.log_texts = [] + if static_data: + static_data = _ensure_unicode(static_data).split("\n") + for d in static_data: + log_entry = d.strip() + if log_entry: + self.log_texts.append(urwid.Text(("main_list_dg", log_entry), + align="left", wrap="any")) + walker = urwid.SimpleFocusListWalker(self.log_texts) + super(AsyncScrollableListBox, self).__init__(walker) + + def fetch_logs(): + for line in generator: + line = _ensure_unicode(line) + if self.stop.is_set(): + break + if self.filter_query: + if self.filter_query not in line: + continue + walker.append( + urwid.AttrMap( + urwid.Text(line.strip(), align="left", wrap="any"), "main_list_dg", "main_list_white" + ) + ) + walker.set_focus(len(walker) - 1) + ui.refresh() + + self.stop = threading.Event() + self.thread = threading.Thread(target=fetch_logs, daemon=True) + self.thread.start() + + def destroy(self): + self.stop.set() + diff --git a/sen/tui/widget.py b/sen/tui/widgets/list/main.py similarity index 63% rename from sen/tui/widget.py rename to sen/tui/widgets/list/main.py index f971625..b78875b 100644 --- a/sen/tui/widget.py +++ b/sen/tui/widgets/list/main.py @@ -1,4 +1,3 @@ -import datetime import logging import re import threading @@ -8,73 +7,13 @@ from sen.tui.constants import MAIN_LIST_FOCUS from sen.docker_backend import DockerImage, DockerContainer -from sen.util import _ensure_unicode, log_traceback +from sen.tui.widgets.list.base import VimMovementListBox +from sen.tui.widgets.list.util import get_map, get_time_attr_map, get_operation_notify_widget +from sen.tui.widgets.util import AdHocAttrMap +from sen.util import log_traceback -logger = logging.getLogger(__name__) - - -def get_color_text(markup, color_attr="status_text"): - w = urwid.AttrMap(urwid.Text(markup), color_attr) - return len(markup), w - -def get_operation_notify_widget(operation, notif_level="info", display_always=True): - attr = "notif_{}".format(notif_level) - took = operation.took - text_list = [] - if took > 300: - fmt_str = "{} Query took " - text_list.append((attr, fmt_str.format(operation.pretty_message))) - command_took_str = "{:.2f}".format(took) - if took < 500: - text_list.append(("notif_text_yellow", command_took_str)) - elif took < 1000: - text_list.append(("notif_text_orange", command_took_str)) - else: - command_took_str = "{:.2f}".format(took / 1000.0) - text_list.append(("notif_text_red", command_took_str)) - text_list.append((attr, " s")) - if took < 1000: - text_list.append((attr, " ms")) - elif display_always: - text_list.append((attr, operation.pretty_message)) - else: - return - return urwid.AttrMap(urwid.Text(text_list), attr) - - -class AdHocAttrMap(urwid.AttrMap): - """ - Ad-hoc attr map change - - taken from https://github.com/pazz/alot/ - """ - def __init__(self, w, maps, init_map='normal'): - self.maps = maps - urwid.AttrMap.__init__(self, w, maps[init_map]) - - def set_map(self, attrstring): - self.set_attr_map({None: self.maps[attrstring]}) - - -def get_map(defult="main_list_dg"): - return {"normal": defult, "focus": MAIN_LIST_FOCUS} - - -def get_time_attr_map(t): - """ - now -> | - hour ago -> | - day ago -> | - |--------------|--------------------|---------------------| - """ - now = datetime.datetime.now() - if t + datetime.timedelta(hours=3) > now: - return get_map("main_list_white") - if t + datetime.timedelta(days=3) > now: - return get_map("main_list_lg") - else: - return get_map("main_list_dg") +logger = logging.getLogger(__name__) class MainLineWidget(urwid.AttrMap): @@ -192,217 +131,6 @@ def __str__(self): return "{}".format(self.docker_object) -class WidgetBase(urwid.ListBox): - """ - common class fot widgets - """ - - def __init__(self, *args, **kwargs): - self.search_string = None - self.filter_query = None - super().__init__(*args, **kwargs) - self.ro_content = self.body[:] # unfiltered content of a widget - - def set_body(self, widgets): - self.body[:] = widgets - - def reload_widget(self): - # this is the easiest way to refresh body - self.body[:] = self.body - - def _search(self, reverse_search=False): - if self.search_string is None: - raise NotifyError("No search pattern specified.") - if not self.search_string: - self.search_string = None - return - pos = self.focus_position - original_position = pos - wrapped = False - while True: - if reverse_search: - obj, pos = self.body.get_prev(pos) - else: - obj, pos = self.body.get_next(pos) - if obj is None: - # wrap - wrapped = True - if reverse_search: - obj, pos = self.body[-1], len(self.body) - else: - obj, pos = self.body[0], 0 - if wrapped and ( - (pos > original_position and not reverse_search) or - (pos < original_position and reverse_search) - ): - self.search_string = None - raise NotifyError("Pattern not found: %r." % self.search_string) - # FIXME: figure out nicer search api - if hasattr(obj, "matches_search"): - condition = obj.matches_search(self.search_string) - else: - condition = self.search_string in obj.original_widget.text - if condition: - self.set_focus(pos) - self.reload_widget() - break - - def filter(self, s, widgets_to_filter=None): - s = s.strip() - - if not s: - self.filter_query = None - self.body[:] = self.ro_content - return - - widgets = [] - for obj in widgets_to_filter or self.ro_content: - - # FIXME: figure out nicer search api - if hasattr(obj, "matches_search"): - condition = obj.matches_search(s) - else: - condition = s in obj.original_widget.text - - if condition: - widgets.append(obj) - if not widgets_to_filter: - self.filter_query = s - return widgets - - def find_previous(self, search_pattern=None): - if search_pattern is not None: - self.search_string = search_pattern - self._search(reverse_search=True) - - def find_next(self, search_pattern=None): - if search_pattern is not None: - self.search_string = search_pattern - self._search() - - def status_bar(self): - columns_list = [] - - def add_subwidget(markup, color_attr=None): - if color_attr is None: - w = urwid.AttrMap(urwid.Text(markup), "status_text") - else: - w = urwid.AttrMap(urwid.Text(markup), color_attr) - columns_list.append((len(markup), w)) - - if self.search_string: - add_subwidget("Search: ") - add_subwidget(repr(self.search_string)) - - if self.search_string and self.filter_query: - add_subwidget(", ") - - if self.filter_query: - add_subwidget("Filter: ") - add_subwidget(repr(self.filter_query)) - - return columns_list - - -class VimMovementListBox(WidgetBase): - """ - ListBox with vim-like movement which can be inherited in other widgets - """ - - def __init__(self, *args, **kwargs): - # we want "gg"! - self.cached_key = None - super().__init__(*args, **kwargs) - - def keypress(self, size, key): - logger.debug("VimListBox keypress %r", key) - - # FIXME: workaround so we allow "gg" only, and not "g*" - if self.cached_key == "g" and key != "g": - self.cached_key = None - - if key == "j": - return super().keypress(size, "down") - elif key == "k": - return super().keypress(size, "up") - elif key == "ctrl d": - try: - self.set_focus(self.get_focus()[1] + 10) - except IndexError: - self.set_focus(len(self.body) - 1) - self.reload_widget() - return - elif key == "ctrl u": - try: - self.set_focus(self.get_focus()[1] - 10) - except IndexError: - self.set_focus(0) - self.reload_widget() - return - elif key == "G": - self.set_focus(len(self.body) - 1) - self.body[:] = self.body - return - elif key == "g": - if self.cached_key is None: - self.cached_key = "g" - elif self.cached_key == "g": - self.set_focus(0) - self.reload_widget() - self.cached_key = None - return - key = super().keypress(size, key) - return key - - -class ScrollableListBox(VimMovementListBox): - def __init__(self, text): - text = _ensure_unicode(text) - list_of_texts = text.split("\n") - self.walker = urwid.SimpleFocusListWalker([ - urwid.AttrMap(urwid.Text(t, align="left", wrap="any"), "main_list_dg", "main_list_white") - for t in list_of_texts - ]) - super().__init__(self.walker) - - -class AsyncScrollableListBox(VimMovementListBox): - def __init__(self, generator, ui, static_data=None): - self.log_texts = [] - if static_data: - static_data = _ensure_unicode(static_data).split("\n") - for d in static_data: - log_entry = d.strip() - if log_entry: - self.log_texts.append(urwid.Text(("main_list_dg", log_entry), - align="left", wrap="any")) - walker = urwid.SimpleFocusListWalker(self.log_texts) - super(AsyncScrollableListBox, self).__init__(walker) - - def fetch_logs(): - for line in generator: - line = _ensure_unicode(line) - if self.stop.is_set(): - break - if self.filter_query: - if self.filter_query not in line: - continue - walker.append( - urwid.AttrMap( - urwid.Text(line.strip(), align="left", wrap="any"), "main_list_dg", "main_list_white" - ) - ) - walker.set_focus(len(walker) - 1) - ui.refresh() - - self.stop = threading.Event() - self.thread = threading.Thread(target=fetch_logs, daemon=True) - self.thread.start() - - def destroy(self): - self.stop.set() - - class MainListBox(VimMovementListBox): def __init__(self, docker_backend, ui): self.d = docker_backend diff --git a/sen/tui/widgets/list/util.py b/sen/tui/widgets/list/util.py new file mode 100644 index 0000000..6d6c9b2 --- /dev/null +++ b/sen/tui/widgets/list/util.py @@ -0,0 +1,60 @@ +import datetime +import logging + +import urwid + +from sen.tui.constants import MAIN_LIST_FOCUS + + +logger = logging.getLogger(__name__) + + +def get_color_text(markup, color_attr="status_text"): + w = urwid.AttrMap(urwid.Text(markup), color_attr) + return len(markup), w + + +def get_operation_notify_widget(operation, notif_level="info", display_always=True): + attr = "notif_{}".format(notif_level) + took = operation.took + text_list = [] + if took > 300: + fmt_str = "{} Query took " + text_list.append((attr, fmt_str.format(operation.pretty_message))) + command_took_str = "{:.2f}".format(took) + if took < 500: + text_list.append(("notif_text_yellow", command_took_str)) + elif took < 1000: + text_list.append(("notif_text_orange", command_took_str)) + else: + command_took_str = "{:.2f}".format(took / 1000.0) + text_list.append(("notif_text_red", command_took_str)) + text_list.append((attr, " s")) + if took < 1000: + text_list.append((attr, " ms")) + elif display_always: + text_list.append((attr, operation.pretty_message)) + else: + return + return urwid.AttrMap(urwid.Text(text_list), attr) + + +def get_map(defult="main_list_dg"): + return {"normal": defult, "focus": MAIN_LIST_FOCUS} + + +def get_time_attr_map(t): + """ + now -> | + hour ago -> | + day ago -> | + |--------------|--------------------|---------------------| + """ + now = datetime.datetime.now() + if t + datetime.timedelta(hours=3) > now: + return get_map("main_list_white") + if t + datetime.timedelta(days=3) > now: + return get_map("main_list_lg") + else: + return get_map("main_list_dg") + diff --git a/sen/tui/widgets/util.py b/sen/tui/widgets/util.py new file mode 100644 index 0000000..af358f6 --- /dev/null +++ b/sen/tui/widgets/util.py @@ -0,0 +1,16 @@ +import urwid + + +class AdHocAttrMap(urwid.AttrMap): + """ + Ad-hoc attr map change + + taken from https://github.com/pazz/alot/ + """ + def __init__(self, w, maps, init_map='normal'): + self.maps = maps + urwid.AttrMap.__init__(self, w, maps[init_map]) + + def set_map(self, attrstring): + self.set_attr_map({None: self.maps[attrstring]}) + diff --git a/tests/test_widgets.py b/tests/test_widgets.py index 6ec1729..e59593c 100644 --- a/tests/test_widgets.py +++ b/tests/test_widgets.py @@ -1,6 +1,6 @@ import pytest from flexmock import flexmock -from sen.tui.widget import ScrollableListBox, AsyncScrollableListBox +from sen.tui.widgets.list.common import ScrollableListBox, AsyncScrollableListBox from .constants import SCREEN_WIDTH, SCREEN_HEIGHT