diff --git a/art/OmegaLogo.png b/art/OmegaLogo.png
new file mode 100644
index 0000000..66afeaa
Binary files /dev/null and b/art/OmegaLogo.png differ
diff --git a/art/OmegaLogoIcon.png b/art/OmegaLogoIcon.png
new file mode 100644
index 0000000..7370d42
Binary files /dev/null and b/art/OmegaLogoIcon.png differ
diff --git a/meta.yaml b/meta.yaml
new file mode 100644
index 0000000..4178778
--- /dev/null
+++ b/meta.yaml
@@ -0,0 +1,75 @@
+{% set name = "napari-chatgpt" %}
+{% set version = "2024.2.19" %}
+ name: {{ name|lower }}
+ version: {{ version }}
+ url: https://pypi.io/packages/source/{{ name[0] }}/{{ name }}/napari-chatgpt-{{ version }}.tar.gz
+ sha256: 4b7238d46766db40fff48735028ff17cd4229f1028a212646f381d459ab89625
+ noarch: python
+ script: {{ PYTHON }} -m pip install . -vv --no-deps --no-build-isolation
+ number: 0
+ host:
+ - python >=3.9
+ - setuptools >=42.0.0
+ - wheel
+ - setuptools-scm
+ - pip
+ run:
+ - python >=3.9
+ - numpy
+ - magicgui
+ - scikit-image
+ - qtpy
+ - qtawesome
+ - langchain ==0.1.5
+ - langchain-openai ==0.0.5
+ - openai
+ - anthropic
+ - fastapi
+ - uvicorn
+ - websockets
+ - tiktoken
+ - wikipedia
+ - lxml
+ - gtts
+ - playsound
+ - matplotlib-base
+ - xarray
+ - arbol
+ - microsoft::playwright
+ - duckduckgo-search
+ - ome-zarr
+ - transformers
+ - cryptography
+ - tabulate
+ - numba
+ - imageio
+ - notebook
+ - nbformat
+ - jedi
+ - black
+# imports:
+# - napari_chatgpt
+# commands:
+# - pip check
+# requires:
+# - pip
+ home: https://github.com/royerlab/napari-chatgpt
+ summary: A napari plugin to process and analyse images with chatGPT.
+ license: BSD-3-Clause
+ license_file: LICENSE
+ recipe-maintainers:
+ - royerloic
diff --git a/setup.cfg b/setup.cfg
index 09ae717..4e3cc22 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,6 +1,6 @@
name = napari-chatgpt
-version = v2024.2.4
+version = v2024.2.24
description = A napari plugin to process and analyse images with chatGPT.
long_description = file: README.md
long_description_content_type = text/markdown
@@ -35,6 +35,7 @@ install_requires =
+ QtAwesome
@@ -60,6 +61,8 @@ install_requires =
+ jedi
+ black
python_requires = >=3.9
diff --git a/src/microplugin/__init__.py b/src/microplugin/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/microplugin/code_editor/__init__.py b/src/microplugin/code_editor/__init__.py
new file mode 100644
index 0000000..6e4b0de
--- /dev/null
+++ b/src/microplugin/code_editor/__init__.py
@@ -0,0 +1 @@
+# hello
diff --git a/src/microplugin/code_editor/clickable_icon.py b/src/microplugin/code_editor/clickable_icon.py
new file mode 100644
index 0000000..78123ac
--- /dev/null
+++ b/src/microplugin/code_editor/clickable_icon.py
@@ -0,0 +1,100 @@
+from typing import Union
+from qtpy.QtCore import Qt, Signal
+from qtpy.QtGui import QIcon, QPixmap, QColor, QImage
+from qtpy.QtWidgets import QLabel
+class ClickableIcon(QLabel):
+ clicked = Signal() # Signal to emit when the label is clicked
+ def __init__(
+ self,
+ icon: Union[QIcon, QPixmap, str],
+ size: int = 24,
+ invert_colors: bool = True,
+ parent=None
+ ):
+ """
+ Create a clickable icon label.
+ The icon can be: QIcon, QPixmap, or a string with the path to the image.
+ For example:
+ icon = QApplication.style().standardIcon(QStyle.SP_FileIcon)
+ Parameters
+ ----------
+ icon : QIcon, QPixmap, or str
+ The icon to display
+ parent : QWidget, optional
+ The parent widget, by default None
+ """
+ super().__init__(parent)
+ # Store the size for use in scaling the icon:
+ self.size = size
+ # Convert the icon to QIcon if it is a string:
+ if isinstance(icon, str):
+ icon = QIcon(icon)
+ # Convert the icon to QPixmap if it is a QIcon or use it directly if it's already a QPixmap:
+ if isinstance(icon, QIcon):
+ # Get the pixmap from the icon, scaled isotropically:
+ pixmap = icon.pixmap(self.size, self.size)
+ elif isinstance(icon, QPixmap):
+ pixmap = icon.scaled(self.size,
+ self.size,
+ Qt.KeepAspectRatio,
+ Qt.SmoothTransformation)
+ # Invert colors if requested:
+ if invert_colors:
+ pixmap = self._modify_pixmap_for_dark_ui(pixmap)
+ # If the icon is a QPixmap, use it directly:
+ self.setPixmap(pixmap)
+ # Change cursor to hand pointer when hovering over the label:
+ self.setCursor(Qt.PointingHandCursor)
+ def mousePressEvent(self, event):
+ if event.button() == Qt.LeftButton:
+ self.clicked.emit()
+ @staticmethod
+ def _modify_pixmap_for_dark_ui(pixmap):
+ # Convert QPixmap to QImage
+ image = pixmap.toImage()
+ # Ensure image supports alpha channel:
+ image = image.convertToFormat(QImage.Format_ARGB32)
+ # Access image pixels to invert colors and rotate hue, preserving transparency
+ for x in range(image.width()):
+ for y in range(image.height()):
+ # Get the current pixel's color with alpha channel
+ color = QColor(image.pixel(x, y))
+ # Invert colors
+ color.setRed(255 - color.red())
+ color.setGreen(255 - color.green())
+ color.setBlue(255 - color.blue())
+ # # Rotate hue by 180 degrees
+ # if (
+ # color.hue() != -1
+ # ): # Check if color is not grayscale (hue is undefined for grayscale)
+ # hue = (color.hue() + 180) % 360
+ # color.setHsv(
+ # hue, color.saturation(), color.value(), color.alpha()
+ # ) # Preserve alpha
+ #
+ # # Set the modified color back to the image
+ # image.setPixel(
+ # x, y, color.rgba()
+ # ) # Use rgba() to include the alpha channel
+ # Convert QImage back to QPixmap
+ modified_pixmap = QPixmap.fromImage(image)
+ return modified_pixmap
diff --git a/src/microplugin/code_editor/code_drop_send_widget.py b/src/microplugin/code_editor/code_drop_send_widget.py
new file mode 100644
index 0000000..4f946b0
--- /dev/null
+++ b/src/microplugin/code_editor/code_drop_send_widget.py
@@ -0,0 +1,197 @@
+from typing import Callable, Tuple
+from arbol import aprint
+from qtpy.QtCore import QTimer
+from qtpy.QtWidgets import QWidget, QPushButton, QComboBox, \
+ QSizePolicy, QHBoxLayout
+from microplugin.network.code_drop_client import CodeDropClient
+class CodeDropSendWidget(QWidget):
+ def __init__(self,
+ code_drop_client: CodeDropClient,
+ max_height: int = 50,
+ margin: int = 0,
+ refresh_interval: int = 1):
+ super().__init__()
+ # Code Drop Client:
+ self.code_drop_client = code_drop_client
+ # Initialize widgets:
+ self.initUI(max_height=max_height,
+ margin=margin)
+ # Refresh interval, converting from seconds to milliseconds:
+ self.refresh_interval = refresh_interval * 1000
+ # field for timer:
+ self.timer = None
+ def initUI(self, max_height: int,
+ margin: int):
+ # Layout:
+ layout = QHBoxLayout(self)
+ # Server selection dropdown
+ self.username_address_port_combo_box = QComboBox()
+ layout.addWidget(self.username_address_port_combo_box)
+ # Send message button
+ self.send_button = QPushButton('Send')
+ self.send_button.clicked.connect(self.send_code)
+ self.send_button.setSizePolicy(QSizePolicy.Maximum,
+ QSizePolicy.Maximum) # Adjust size policy
+ layout.addWidget(self.send_button)
+ # Cancel message button
+ self.cancel_button = QPushButton('Cancel')
+ self.cancel_button.clicked.connect(self.canceled)
+ self.cancel_button.setSizePolicy(QSizePolicy.Maximum,
+ QSizePolicy.Maximum) # Adjust size policy
+ layout.addWidget(self.cancel_button)
+ # Set the layout margins:
+ layout.setContentsMargins(margin,
+ margin,
+ margin,
+ margin)
+ # Set the layout:
+ self.setLayout(layout)
+ # Set the vertical size policy to Minimum so it takes the least vertical space
+ self.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Minimum)
+ # Attempt to directly control the widget's size
+ self.setMaximumHeight(max_height) # Adjust 100 to your needs
+ # Hide the widget initially:
+ self.hide()
+ def update_server_list(self):
+ def _identifier(username_address_port):
+ if not username_address_port:
+ return None
+ username, address, port = username_address_port
+ return f'{username}:{address}:{str(port)}'
+ # Assume that the combination of address and port is unique for each server and remains constant.
+ # Save the current selection based on a unique identifier (e.g., address and port):
+ current_selection = self.username_address_port_combo_box.currentData()
+ current_identifier = _identifier(current_selection) if current_selection else None
+ # Clear the combo box before updating:
+ self.username_address_port_combo_box.clear()
+ # Re-populate the combo box with updated server list:
+ for key, username_address_port in self.code_drop_client.servers.items():
+ string_to_display = f"{username_address_port[0]} at {key} ({username_address_port[1]}:{username_address_port[2]})"
+ # Add the server to the combo box, using the identifier as the item data for easy lookup:
+ self.username_address_port_combo_box.addItem(string_to_display,
+ username_address_port)
+ # Restore the current selection by finding the item with the matching unique identifier:
+ if current_identifier:
+ for index in range(self.username_address_port_combo_box.count()):
+ item_data = self.username_address_port_combo_box.itemData(
+ index)
+ item_identifier = _identifier(item_data)
+ if item_identifier == current_identifier:
+ self.username_address_port_combo_box.setCurrentIndex(
+ index)
+ break
+ else:
+ # If there was no selection or the selected server is no longer available, default to the first item:
+ self.username_address_port_combo_box.setCurrentIndex(0)
+ def send_code(self):
+ # Get the selected server address and port:
+ username_address_port = self.username_address_port_combo_box.currentData()
+ if username_address_port:
+ # Split address from port:
+ username, server_address, server_port = username_address_port
+ # Logging":
+ aprint(
+ f"Sending code to {username} at {server_address}:{server_port}.")
+ # Get the filename and code:
+ filename, code = self.get_code_callable()
+ if filename is not None and code is not None:
+ # Send message:
+ self.code_drop_client.send_code_message(server_address=server_address,
+ server_port=server_port,
+ filename=filename,
+ code=code)
+ else:
+ aprint("No code to send, no code could be obtained.")
+ else:
+ aprint("Please select a recipient.")
+ # To avoid code duplication, we call canceled to hide widget and stop discovery:
+ self.canceled()
+ def canceled(self):
+ # Hide:
+ self.hide()
+ # Disable discovery worker:
+ self.code_drop_client.discover_worker.is_enabled = False
+ # Stop refreshing server list:
+ self.stop_server_list_refresh()
+ def start_server_list_refresh(self):
+ # Initial refresh:
+ self.update_server_list()
+ # Start the timer:
+ self.timer = QTimer(self)
+ self.timer.timeout.connect(self.update_server_list)
+ self.timer.start(self.refresh_interval)
+ # Ensure the timer is stopped when the widget is closed:
+ self.destroyed.connect(self.stop_server_list_refresh)
+ def stop_server_list_refresh(self):
+ # Stop the timer:
+ if self.timer:
+ self.timer.stop()
+ self.timer.deleteLater()
+ self.timer=None
+ def show_send_dialog(self,
+ get_code_callable: Callable[[], Tuple[str, str]]):
+ # Store the filename and code:
+ self.get_code_callable = get_code_callable
+ # Enable discovery worker:
+ self.code_drop_client.discover_worker.is_enabled = True
+ # Start refreshing server list:
+ self.start_server_list_refresh()
+ # show widget:
+ self.show()
+ def stop(self):
+ # Stop refreshing server list:
+ self.stop_server_list_refresh()
+ def close(self):
+ self.stop()
+ super().close()
diff --git a/src/microplugin/code_editor/code_snippet_editor_widget.py b/src/microplugin/code_editor/code_snippet_editor_widget.py
new file mode 100644
index 0000000..9555dc3
--- /dev/null
+++ b/src/microplugin/code_editor/code_snippet_editor_widget.py
@@ -0,0 +1,890 @@
+import json
+import os
+import sys
+from datetime import datetime
+from typing import Optional
+import qtawesome
+from arbol import aprint
+from qtpy.QtCore import Qt
+from qtpy.QtGui import QFontMetrics
+from qtpy.QtWidgets import (
+ QApplication,
+ QSplitter,
+ QListWidget,
+ QWidget,
+ QMainWindow,
+ QVBoxLayout,
+ QToolBar,
+ QMenu,
+ QAction, QSizePolicy, QListWidgetItem, )
+from microplugin.code_editor.text_dialog import TextDialog
+from microplugin.code_editor.clickable_icon import ClickableIcon
+from microplugin.code_editor.code_drop_send_widget import \
+ CodeDropSendWidget
+from microplugin.code_editor.console_widget import ConsoleWidget
+from microplugin.code_editor.python_code_editor_manager import \
+ MultiEditorManager
+from microplugin.code_editor.python_code_editor_widget import \
+ PythonCodeEditor
+from microplugin.code_editor.text_input_widget import TextInputWidget
+from microplugin.code_editor.yes_no_cancel_question_widget import \
+ YesNoCancelQuestionWidget
+from microplugin.formating.black_formating import format_code
+from microplugin.network.code_drop_client import CodeDropClient
+from microplugin.network.code_drop_server import CodeDropServer
+class CodeSnippetEditorWidget(QWidget):
+ def __init__(self,
+ folder_path: str,
+ variables: Optional[dict] = None,
+ parent=None):
+ """
+ Create a widget for editing Python code snippets.
+ Parameters
+ ----------
+ folder_path : str
+ The path to the folder containing the Python code snippets.
+ """
+ super().__init__(parent)
+ self.folder_path = folder_path
+ self.filename_to_displayname = {}
+ self.displayname_to_filename = {}
+ self.currently_open_filename = None
+ # Dictionary to hold undo stacks for each file
+ self.undo_stacks = {}
+ # Set the variables:
+ self.variables = variables or {}
+ # Start the network client and server:
+ self.client = CodeDropClient()
+ self.client.discover_worker.server_discovered.connect(
+ self.on_server_discovered)
+ self.client.start_discovering()
+ # Start the server:
+ self.server = CodeDropServer(self.server_message_received)
+ self.server.start_broadcasting()
+ self.server.start_receiving()
+ # is OpenAI available?
+ from napari_chatgpt.utils.api_keys.api_key import is_api_key_available
+ self.is_openai_available = is_api_key_available('OpenAI')
+ # Initialize the UI:
+ self.init_UI()
+ # Set the default model name for the language model:
+ self.llm_model_name = None
+ def init_UI(self):
+ main_layout = QVBoxLayout(self)
+ # Initialize the toolbar
+ self.toolbar = QToolBar("Main Toolbar")
+ main_layout.addWidget(self.toolbar)
+ # Icon color:
+ icon_color = '#5E636F'
+ # function to get icon from fontawesome:
+ def _get_icon(icon_name: str):
+ return qtawesome.icon(icon_name, color=icon_color)
+ # New file button with a standard icon:
+ new_file_clickable_icon = ClickableIcon(_get_icon('fa5s.file-alt'))
+ new_file_clickable_icon.setToolTip("New file")
+ self.toolbar.addWidget(new_file_clickable_icon)
+ new_file_clickable_icon.clicked.connect(self.new_file_dialog)
+ # Duplicate file button with a custom icon:
+ duplicate_file_clickable_icon = ClickableIcon(_get_icon('fa5s.copy'))
+ duplicate_file_clickable_icon.setToolTip("Duplicate file")
+ self.toolbar.addWidget(duplicate_file_clickable_icon)
+ duplicate_file_clickable_icon.clicked.connect(self.duplicate_file)
+ # Delete file button with a custom icon:
+ delete_file_clickable_icon = ClickableIcon(_get_icon('fa5s.trash-alt'))
+ delete_file_clickable_icon.setToolTip("Delete file")
+ self.toolbar.addWidget(delete_file_clickable_icon)
+ delete_file_clickable_icon.clicked.connect(self.delete_file)
+ # Clean and reformat code in file button with a custom icon:
+ clean_file_clickable_icon = ClickableIcon(_get_icon('fa5s.hand-sparkles'))
+ clean_file_clickable_icon.setToolTip("Clean and reformat code")
+ self.toolbar.addWidget(clean_file_clickable_icon)
+ clean_file_clickable_icon.clicked.connect(self.clean_and_reformat_current_file)
+ # Check if the OpenAI API key is available:
+ if self.is_openai_available:
+ # Check if the file is 'safe':
+ check_code_safety_clickable_icon = ClickableIcon(_get_icon('fa5s.virus-slash'))
+ check_code_safety_clickable_icon.setToolTip("Check code safety")
+ self.toolbar.addWidget(check_code_safety_clickable_icon)
+ check_code_safety_clickable_icon.clicked.connect(
+ self.check_code_safety_with_AI)
+ # Improve code in file button with a custom icon:
+ comment_code_clickable_icon = ClickableIcon(_get_icon('fa5s.edit'))
+ comment_code_clickable_icon.setToolTip("Improve code comments and explanations")
+ self.toolbar.addWidget(comment_code_clickable_icon)
+ comment_code_clickable_icon.clicked.connect(self.comment_code_with_AI)
+ # Use AI to change code based on prompt:
+ modify_code_clickable_icon = ClickableIcon(_get_icon('fa5s.robot'))
+ modify_code_clickable_icon.setToolTip("Modify code with AI")
+ self.toolbar.addWidget(modify_code_clickable_icon)
+ modify_code_clickable_icon.clicked.connect(self.modify_code_with_AI)
+ # Send file button with a custom icon:
+ send_file_clickable_icon = ClickableIcon(_get_icon('fa5s.wifi'))
+ send_file_clickable_icon.setToolTip("Send file")
+ self.toolbar.addWidget(send_file_clickable_icon)
+ send_file_clickable_icon.clicked.connect(self.send_current_file)
+ # Create a spacer widget and set its size policy to expanding
+ spacer = QWidget()
+ spacer.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Preferred)
+ self.toolbar.addWidget(spacer)
+ # Run file button with a custom icon:
+ run_file_clickable_icon = ClickableIcon(_get_icon('fa5s.play-circle'))
+ run_file_clickable_icon.setToolTip("Run file")
+ self.toolbar.addWidget(run_file_clickable_icon)
+ run_file_clickable_icon.clicked.connect(self.run_current_file)
+ # Splitter for the list widget and the code editor:
+ self.splitter = QSplitter(Qt.Horizontal)
+ # List widget for the file names:
+ self.list_widget = QListWidget()
+ self.list_widget.setContextMenuPolicy(Qt.CustomContextMenu)
+ self.list_widget.customContextMenuRequested.connect(
+ self.show_context_menu)
+ # Code editor widget:
+ self.editor_manager = MultiEditorManager(self.on_text_modified)
+ # Add widgets to the splitter:
+ self.splitter.addWidget(self.list_widget)
+ self.splitter.addWidget(self.editor_manager)
+ # Add the splitter to the main layout:
+ main_layout.addWidget(self.splitter)
+ # Add YesNoCancelQuestionWidget to the main layout:
+ self.yes_no_cancel_question_widget = YesNoCancelQuestionWidget()
+ main_layout.addWidget(self.yes_no_cancel_question_widget)
+ # Add TextInputQuestionWidget to the main layout:
+ self.text_input_widget = TextInputWidget()
+ main_layout.addWidget(self.text_input_widget)
+ # Add CodeDropSendWidget to the main layout:
+ self.code_drop_send_widget = CodeDropSendWidget(self.client)
+ main_layout.addWidget(self.code_drop_send_widget)
+ # Add ConsoleWidget to the main layout:
+ self.console_widget = ConsoleWidget()
+ main_layout.addWidget(self.console_widget)
+ # Set the layout for the main widget:
+ self.setLayout(main_layout)
+ # Connect signals and slots:
+ self.list_widget.currentItemChanged.connect(
+ self.current_list_item_changed)
+ self.populate_list()
+ def show_context_menu(self, position):
+ # Create the context menu:
+ context_menu = QMenu(self)
+ # Instantiate actions for the context menu:
+ rename_action = QAction("Rename", self)
+ duplicate_action = QAction("Duplicate", self)
+ delete_action = QAction("Delete", self)
+ open_in_system = QAction("Open in system", self)
+ find_in_system = QAction("Find in system", self)
+ # Instantiate AI actions for the context menu:
+ if self.is_openai_available:
+ clean_action = QAction("Clean", self)
+ check_action = QAction("Check", self)
+ comment_action = QAction("Comment", self)
+ modify_action = QAction("Modify", self)
+ # Add actions to the context menu:
+ context_menu.addAction(rename_action)
+ context_menu.addAction(duplicate_action)
+ context_menu.addAction(delete_action)
+ context_menu.addAction(open_in_system)
+ context_menu.addAction(find_in_system)
+ # Add AI actions to the context menu:
+ if self.is_openai_available:
+ context_menu.addAction(clean_action)
+ context_menu.addAction(check_action)
+ context_menu.addAction(comment_action)
+ context_menu.addAction(modify_action)
+ # Connect the actions to the corresponding slots:
+ rename_action.triggered.connect(self.rename_file)
+ duplicate_action.triggered.connect(self.duplicate_file)
+ delete_action.triggered.connect(self.delete_file_from_context_menu)
+ open_in_system.triggered.connect(self.open_file_in_system)
+ find_in_system.triggered.connect(self.find_file_in_system)
+ # Connect AI actions to the corresponding slots:
+ if self.is_openai_available:
+ clean_action.triggered.connect(self.clean_and_reformat_current_file)
+ check_action.triggered.connect(self.check_code_safety_with_AI)
+ comment_action.triggered.connect(self.comment_code_with_AI)
+ modify_action.triggered.connect(self.modify_code_with_AI)
+ # Show the context menu:
+ context_menu.exec_(self.list_widget.mapToGlobal(position))
+ def on_server_discovered(self, server_name, server_address, port_number):
+ aprint(
+ f"Discovered server: {server_name} at {server_address}:{port_number}")
+ def server_message_received(self, addr, message):
+ # Get IP address and port number:
+ ip_address, port = addr
+ aprint(
+ f"Message of length: {len(message)} received from {ip_address}:{port}")
+ # Parse the message in JSON format:
+ message_dict = json.loads(message)
+ hostname = message_dict['hostname']
+ username = message_dict['username']
+ filename = message_dict['filename']
+ code = message_dict['code']
+ # Prepend a comment line to the code that gives the date and time it was recieved and from where?
+ code = f"# File '{filename}' of length: {len(code)} received from {username} at {hostname} ({ip_address}:{port}) at {datetime.now()}\n" + code
+ # Ask for confirmation:
+ message = f"Accept file '{filename}' sent by {username} at {hostname} ({ip_address}:{port})?"
+ def _accept_file():
+ self.new_file(filename=filename,
+ code=code,
+ postfix_if_exists='received')
+ # Show the question widget:
+ self.yes_no_cancel_question_widget.show_question(message=message,
+ yes_callback=_accept_file,
+ cancel_text=None)
+ def populate_list(self, selected_filename: Optional[str] = None):
+ # Clear the list widget and dictionaries:
+ self.list_widget.clear()
+ self.filename_to_displayname.clear()
+ self.displayname_to_filename.clear()
+ self.currently_open_filename = None
+ # Get the list of files in the folder:
+ file_list = os.listdir(self.folder_path)
+ # Sort the list of files:
+ file_list.sort()
+ # Reset the index for duplicated display names:
+ display_name_index = 0
+ # Populate the list widget with the Python files in the folder:
+ max_width = 0
+ fm = QFontMetrics(self.list_widget.font())
+ for filename in file_list:
+ if filename.endswith(".py"):
+ # Add the file to the list widget:
+ display_name = self.truncate_filename(filename)
+ # If there is already a file with the same display name, add a number to the display name:
+ if display_name in self.displayname_to_filename:
+ display_name_index += 1
+ display_name = self.truncate_filename(filename, display_name_index)
+ else:
+ display_name_index = 0
+ # Add the file to the dictionaries:
+ self.filename_to_displayname[filename] = display_name
+ self.displayname_to_filename[display_name] = filename
+ # Create a QListWidgetItem for the file:
+ item = QListWidgetItem(display_name)
+ # Set the tooltip to the full filename if it has been truncated:
+ if display_name != filename:
+ item.setToolTip(filename)
+ # Add the item to the list widget:
+ self.list_widget.addItem(item)
+ # Update the maximum width:
+ item_width = fm.width(display_name) + 20
+ max_width = max(max_width, item_width)
+ # Ensure that the list widget is wide enough to show the longest file name:
+ self.list_widget.setMaximumWidth(max_width)
+ # Ensure that the list widget cannot be too small
+ self.list_widget.setMinimumWidth(fm.width('some_file.py') + 20)
+ # Ensure that the selected file is loaded:
+ if selected_filename:
+ # Make sure to select the right row:
+ for i in range(self.list_widget.count()):
+ if self.list_widget.item(i).text() == self.filename_to_displayname.get(selected_filename, ""):
+ self.list_widget.setCurrentRow(i)
+ self.load_snippet_by_filename(selected_filename)
+ else:
+ # Otherwise select the first file in the list if there is one using load_snippet_by_filename:
+ if self.list_widget.count() > 0:
+ self.list_widget.setCurrentRow(0)
+ self.load_snippet()
+ def truncate_filename(self,
+ filename: str,
+ index: Optional[int] = None,
+ max_length: int = 40) -> str:
+ # Convert index to string if it is not None:
+ if index is not None:
+ index_str = str(index)
+ else:
+ index_str = ""
+ # Truncate the filename if it is too long:
+ if len(filename) > max_length:
+ extension = filename.split(".")[-1]
+ base_length = max_length - len(extension) - 3
+ return f"{filename[:base_length]}…{index_str}.{extension}"
+ return filename
+ def on_text_modified(self):
+ # Save the file when the text is modified:
+ self.save_current_file()
+ def load_snippet(self):
+ # Get the current item:
+ current = self.list_widget.currentItem()
+ # If there is a current item:
+ if current:
+ # Get the filename from the display name:
+ display_name = current.text()
+ filename = self.displayname_to_filename.get(display_name, "")
+ # Load the snippet by filename:
+ self.load_snippet_by_filename(filename)
+ def load_snippet_by_filename(self, filename):
+ # Switch editor to the file:
+ self.editor_manager.switch_to(filename)
+ # Set the currently open filename:
+ self.currently_open_filename = filename
+ # Load the snippet from the file:
+ with open(os.path.join(self.folder_path, filename), "r") as file:
+ self.editor_manager.current_editor.setPlainTextUndoable(file.read())
+ def save_current_file(self):
+ # If there is a currently open file, save it:
+ if self.currently_open_filename:
+ # Make sure that there is an editor <=> at least one file in the list:
+ if not self.editor_manager.current_editor:
+ return
+ # Get the full path to the file:
+ full_path = os.path.join(self.folder_path,
+ self.currently_open_filename)
+ # Save the file:
+ with open(full_path, "w") as file:
+ file.write(self.editor_manager.current_editor.toPlainText())
+ def new_file(self,
+ filename: str,
+ code: Optional[str] = "",
+ postfix_if_exists = '_copy'):
+ # Make sure the file has '.py' extension:
+ if not filename.endswith(".py"):
+ filename = f"{filename}.py"
+ # Full path to the file:
+ full_path = os.path.join(self.folder_path, filename)
+ # Make sure the file does not already exist:
+ while os.path.exists(full_path):
+ # Get filename from fullpath:
+ _, filename = os.path.split(full_path)
+ # Get the filename without extension:
+ base_name, ext = os.path.splitext(filename)
+ # If the filename already contains the postfix, the we check if there is number after the postfix:
+ if postfix_if_exists in base_name:
+ # Get the number after the postfix:
+ postfix_number = base_name.split(postfix_if_exists)[-1]
+ try:
+ postfix_number = int(postfix_number)
+ except ValueError:
+ postfix_number = 0
+ # Increase the number and add it to the base name:
+ base_name = base_name.split(postfix_if_exists)[0]
+ base_name = f"{base_name}{postfix_if_exists}{postfix_number + 1}"
+ # Change the filename to avoid overwriting the existing file:
+ filename = f"{base_name}{ext}"
+ else:
+ # Add the postfix to the base name:
+ base_name = f"{base_name}{postfix_if_exists}"
+ # Change the filename to avoid overwriting the existing file:
+ filename = f"{base_name}{ext}"
+ # Update the full path:
+ full_path = os.path.join(self.folder_path, filename)
+ # Create the file and write the text to it:
+ with open(full_path, "w") as file:
+ file.write(code)
+ # Repopulate the list:
+ self.populate_list(filename)
+ def new_file_dialog(self):
+ def _new_file(filename_text: str):
+ # if no file is selected but the editor has contents, use it as the new file's content:
+ if self.editor_manager.current_editor:
+ current_editor = self.editor_manager.current_editor
+ current_text_in_editor = current_editor.toPlainText()
+ if self.currently_open_filename is None and current_text_in_editor and len(
+ current_text_in_editor) > 0:
+ code = current_editor.toPlainText()
+ else:
+ code = ""
+ else:
+ code = ""
+ self.new_file(filename=filename_text, code=code)
+ # Show the text input widget:
+ self.text_input_widget.show_input(
+ message="Enter file name:",
+ enter_text="Create",
+ cancel_text="Cancel",
+ enter_callback=_new_file,
+ cancel_callback=None,
+ do_after_callable=None
+ )
+ def duplicate_file(self):
+ # get current item:
+ current_item = self.list_widget.currentItem()
+ if current_item:
+ # Save current file:
+ self.save_current_file()
+ # Get original filename and display name:
+ original_display_name = current_item.text()
+ original_filename = self.displayname_to_filename[
+ original_display_name]
+ # get base name and extension:
+ base_name, ext = os.path.splitext(original_filename)
+ # New filename:
+ new_filename = f"{base_name}_copy{ext}"
+ # new_display_name = f"{base_name}_copy"
+ # Ensure the new filename does not already exist
+ counter = 1
+ while os.path.exists(os.path.join(self.folder_path, new_filename)):
+ counter += 1
+ new_filename = f"{base_name}_copy{counter}{ext}"
+ # new_display_name = f"{base_name}_copy{counter}"
+ # Copy the file on disk
+ with open(
+ os.path.join(self.folder_path, original_filename), "r"
+ ) as original_file:
+ with open(
+ os.path.join(self.folder_path, new_filename), "w"
+ ) as new_file:
+ new_file.write(original_file.read())
+ # Update the list and dictionaries:
+ self.populate_list(new_filename)
+ def delete_file_from_context_menu(self):
+ # Get current item:
+ current_item = self.list_widget.currentItem()
+ # If there is a current item:
+ if current_item:
+ # Get the filename from the display name:
+ filename_to_delete = self.displayname_to_filename[
+ current_item.text()]
+ # Delete the file:
+ self.delete_file(filename_to_delete)
+ def delete_file(self, filename=None):
+ # Determine which file to delete:
+ file_to_delete = filename or self.currently_open_filename
+ # If there is a file to delete:
+ if file_to_delete:
+ # Ask for confirmation:
+ message = f"Are you sure you want to delete file {file_to_delete}?"
+ def _delete_file():
+ # Get the full path to the file:
+ file_to_delete_path = os.path.join(self.folder_path,
+ file_to_delete)
+ # remove file:
+ os.remove(file_to_delete_path)
+ # Refresh the list after deletion:
+ self.populate_list()
+ if self.list_widget.count() > 0:
+ self.list_widget.setCurrentRow(0)
+ self.load_snippet()
+ else:
+ self.editor_manager.current_editor.clear()
+ self.currently_open_filename = None
+ # Show the question widget:
+ self.yes_no_cancel_question_widget.show_question(message=message,
+ yes_callback=_delete_file,
+ cancel_text=None)
+ def rename_file(self):
+ # Get current line:
+ current_item = self.list_widget.currentItem()
+ if current_item:
+ # Save current file:
+ self.save_current_file()
+ # Get original filename and display name:
+ old_display_name = current_item.text()
+ old_filename = self.displayname_to_filename[old_display_name]
+ def _rename_file(new_name: str):
+ # New filename:
+ new_filename = f"{new_name}.py"
+ # Rename file on disk
+ os.rename(
+ os.path.join(self.folder_path, old_filename),
+ os.path.join(self.folder_path, new_filename),
+ )
+ # Update the list and dictionaries:
+ self.populate_list(new_filename)
+ self.text_input_widget.show_input(
+ message="Enter new file name:",
+ placeholder_text="Enter new name here",
+ default_text=old_filename[:-3],
+ enter_text="Rename",
+ cancel_text="Cancel",
+ enter_callback=_rename_file,
+ cancel_callback=None,
+ do_after_callable=None
+ )
+ # # Ask for new name:
+ # new_name, ok = QInputDialog.getText(
+ # self, "Rename file", "New name:", text=old_filename[:-3]
+ # ) # Exclude '.py' extension
+ def clean_and_reformat_current_file(self):
+ if self.currently_open_filename:
+ # Make sure that there is an editor <=> at least one file in the list:
+ if not self.editor_manager.current_editor:
+ return
+ # Get the code from the editor:
+ code = self.editor_manager.current_editor.toPlainText()
+ # Uses black to format the code:
+ formatted_code = format_code(code)
+ # Set the formatted code in the editor:
+ self.editor_manager.current_editor.setPlainTextUndoable(formatted_code)
+ # Save the file after improving it:
+ self.save_current_file()
+ def open_file_in_system(self):
+ # Get current item:
+ current_item = self.list_widget.currentItem()
+ if current_item:
+ # Get the filename from the display name:
+ filename_to_open = self.displayname_to_filename[
+ current_item.text()]
+ # Open the file in the system for different OS:
+ # First OSX:
+ if sys.platform == "darwin":
+ os.system(f'open {os.path.join(self.folder_path, filename_to_open)}')
+ # Then Windows:
+ elif sys.platform == "win32" or sys.platform == "cygwin" or sys.platform == "msys" or sys.platform == "win64":
+ os.system(f'start {os.path.join(self.folder_path, filename_to_open)}')
+ # Then Linux:
+ else:
+ os.system(f'xdg-open {os.path.join(self.folder_path, filename_to_open)}')
+ def find_file_in_system(self):
+ # Open the folder in the system for different OS:
+ # First OSX:
+ if sys.platform == "darwin":
+ os.system(f'open {self.folder_path}')
+ # Then Windows:
+ elif sys.platform == "win32" or sys.platform == "cygwin" or sys.platform == "msys" or sys.platform == "win64":
+ os.system(f'start {self.folder_path}')
+ # Then Linux:
+ else:
+ os.system(f'xdg-open {self.folder_path}')
+ def check_code_safety_with_AI(self):
+ if self.currently_open_filename:
+ # Make sure that there is an editor <=> at least one file in the list:
+ if not self.editor_manager.current_editor:
+ return
+ # Get the code from the editor:
+ code = self.editor_manager.current_editor.toPlainText()
+ # Check the code for safety by calling ChatGPT with a custom prompt:
+ from napari_chatgpt.utils.python.check_code_safety import \
+ check_code_safety
+ response, safety_rank = check_code_safety(code,
+ model_name=self.llm_model_name,
+ verbose=True)
+ # Create the icons for the dialog:
+ InformationIcon = qtawesome.icon('fa5s.info-circle')
+ WarningIcon = qtawesome.icon('fa5s.exclamation-triangle')
+ # Create and show the custom dialog
+ dialog = TextDialog(
+ f"Code Safety Report for: {self.currently_open_filename}",
+ response,
+ InformationIcon if safety_rank in ['A', 'B'] else WarningIcon
+ )
+ # Show the dialog:
+ dialog.exec_()
+ def comment_code_with_AI(self):
+ if self.currently_open_filename:
+ # Make sure that there is an editor <=> at least one file in the list:
+ if not self.editor_manager.current_editor:
+ return
+ # Get the code from the editor:
+ code = self.editor_manager.current_editor.toPlainText()
+ # Add comments to the code:
+ from napari_chatgpt.utils.python.add_comments import add_comments
+ code = add_comments(code, model_name=self.llm_model_name, verbose=True)
+ # Set the commented code in the editor:
+ self.editor_manager.current_editor.setPlainTextUndoable(code)
+ def modify_code_with_AI(self):
+ if self.currently_open_filename:
+ def _modify_code(request: str):
+ # If there is no currently open file, return:
+ if not self.currently_open_filename:
+ return
+ # Get the code from the editor:
+ code = self.editor_manager.current_editor.toPlainText()
+ # Modify the code based on the request:
+ from napari_chatgpt.utils.python.modify_code import modify_code
+ modified_code = modify_code(code=code,
+ request=request,
+ model_name=self.llm_model_name,
+ verbose=True)
+ # Request without new line characters:
+ request_nonl = request.replace('\n', ' ')
+ # Add comment to the code that explains what as changed:
+ modified_code = f"# Code modified by Omega at {datetime.now()}.\n# Request:{request_nonl}.\n\n{modified_code}"
+ # Set the commented code in the editor:
+ self.editor_manager.current_editor.setPlainTextUndoable(modified_code)
+ placeholder_text = ("Explain how you you want to modify the code of the currently selected file.\n"
+ "For example:\n"
+ " 'Make a widget from this code',\n"
+ " 'Make the code work for 3d stacks',\n"
+ " etc...\n"
+ "You can also place 'TODO's or 'FIXME' in the code, in that case no prompt is required.\n"
+ "Finally, you can undo the changes with CTRL+Z.\n")
+ # Show the text input widget:
+ self.text_input_widget.show_input(
+ message="Prompt:",
+ placeholder_text=placeholder_text,
+ enter_text="Modify",
+ cancel_text="Cancel",
+ enter_callback=_modify_code,
+ cancel_callback=None,
+ do_after_callable=None,
+ multi_line=True,
+ max_height=200
+ )
+ def send_current_file(self):
+ # Send the file if there is a currently open file:
+ if self.currently_open_filename:
+ def _get_current_code_and_filename():
+ # Get the code from the editor:
+ if self.editor_manager.current_editor:
+ code = self.editor_manager.current_editor.toPlainText()
+ return self.currently_open_filename, code
+ return None, None
+ # Show the send dialog:
+ self.code_drop_send_widget.show_send_dialog(
+ get_code_callable=_get_current_code_and_filename
+ )
+ def run_current_file(self):
+ # Run the file if there is a currently open file:
+ if self.currently_open_filename:
+ # Save the file before running it:
+ self.save_current_file()
+ # Make sure that there is an editor <=> at least one file in the list:
+ if not self.editor_manager.current_editor:
+ return
+ # Get code from the editor:
+ code = self.editor_manager.current_editor.toPlainText()
+ try:
+ # Local import to avoid circular import:
+ from napari_chatgpt.utils.python.dynamic_import import \
+ execute_as_module
+ # Run the code as a module:
+ captured_output = execute_as_module(code, **self.variables)
+ # Show the output in the console:
+ self.console_widget.append_message(captured_output)
+ aprint(f"Tool completed task successfully:\n {captured_output}")
+ except Exception as e:
+ aprint(f"Error running file: {e}")
+ import traceback
+ traceback.print_exc()
+ # String that contains stacktrace:
+ captured_stacktrace = traceback.format_exc()
+ # Show the output in the console:
+ self.console_widget.append_message(captured_stacktrace, message_type='error')
+ def current_list_item_changed(self, current, previous):
+ # If the current item is different from the previous one, load the snippet:
+ if current and (previous == None or current.text != previous.text()):
+ # Load the snippet:
+ self.load_snippet()
+ def close(self):
+ # Stop the server:
+ self.server.stop()
+ # Stop the client:
+ self.client.stop()
+ # Close the manager:
+ self.editor_manager.close()
+ # Close the widgets:
+ self.yes_no_cancel_question_widget.close()
+ self.text_input_widget.close()
+ self.code_drop_send_widget.close()
+ # Close the widget itself.
+ super().close()
+if __name__ == "__main__":
+ app = QApplication(sys.argv)
+ mainWindow = QMainWindow()
+ codeEditorWidget = CodeSnippetEditorWidget("")
+ mainWindow.setCentralWidget(codeEditorWidget)
+ mainWindow.setWindowTitle("Python Code Snippet Editor")
+ mainWindow.resize(800, 600)
+ mainWindow.show()
+ sys.exit(app.exec_())
diff --git a/src/microplugin/code_editor/code_snippet_editor_window.py b/src/microplugin/code_editor/code_snippet_editor_window.py
new file mode 100644
index 0000000..e5a364c
--- /dev/null
+++ b/src/microplugin/code_editor/code_snippet_editor_window.py
@@ -0,0 +1,80 @@
+import sys
+from typing import Tuple, Optional
+from qtpy.QtCore import Qt
+from qtpy.QtWidgets import QApplication
+from qtpy.QtWidgets import QMainWindow
+from microplugin.code_editor.code_snippet_editor_widget import \
+ CodeSnippetEditorWidget
+# Enable High DPI display with PyQt5
+if hasattr(Qt, 'AA_UseHighDpiPixmaps'):
+ QApplication.setAttribute(Qt.AA_UseHighDpiPixmaps, True)
+if hasattr(Qt, 'AA_EnableHighDpiScaling'):
+ QApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True)
+class CodeSnippetEditorWindow(QMainWindow):
+ def __init__(self,
+ folder_path: str,
+ title: Optional[str] = "Python Code Snippet Editor",
+ size: Optional[Tuple[int, int]] = None,
+ variables: Optional[dict] = None,
+ parent=None,
+ *args, **kwargs):
+ """
+ Create a main window for the Python code snippet editor.
+ Parameters
+ ----------
+ folder_path : str
+ The path to the folder containing the Python code snippets.
+ args : list
+ Positional arguments to pass to the parent class.
+ kwargs : dict
+ Keyword arguments to pass to the parent class.
+ """
+ super(CodeSnippetEditorWindow, self).__init__(parent=parent,
+ *args,
+ **kwargs)
+ # Create the code snippet editor widget:
+ self.code_editor_widget = CodeSnippetEditorWidget(folder_path,
+ variables=variables,
+ parent=self)
+ self.setCentralWidget(self.code_editor_widget)
+ # get this machine's hostname:
+ server_hostname = self.code_editor_widget.server.server_hostname
+ # Get the server port:
+ server_port = self.code_editor_widget.server.server_port
+ # Shorten the folder path to the last two directories:
+ splitted_folder_path = folder_path.split('/')
+ if len(splitted_folder_path) > 2:
+ folder_path = '...' + '/'.join(splitted_folder_path[-2:])
+ # Set the window title:
+ self.setWindowTitle(
+ f"{title} - {folder_path} - {server_hostname}:{server_port}")
+ # Set the window size:
+ self.resize(size or self.code_editor_widget.sizeHint())
+ def close(self):
+ self.code_editor_widget.close()
+ super().close()
+if __name__ == "__main__":
+ app = QApplication(sys.argv)
+ mainWindow = CodeSnippetEditorWindow('', size=(800, 600))
+ mainWindow.show()
+ sys.exit(app.exec_())
diff --git a/src/microplugin/code_editor/console_widget.py b/src/microplugin/code_editor/console_widget.py
new file mode 100644
index 0000000..fd8eac2
--- /dev/null
+++ b/src/microplugin/code_editor/console_widget.py
@@ -0,0 +1,124 @@
+from qtpy.QtCore import Qt
+from qtpy.QtWidgets import QVBoxLayout, QWidget, QTextEdit, QApplication, \
+ QHBoxLayout, QPushButton
+class ConsoleWidget(QWidget):
+ def __init__(self,
+ margin: int = 0,
+ icon_size: int = 20,
+ parent=None):
+ super().__init__(parent=parent)
+ # This helps in auto-hiding when losing focus:
+ self.setWindowFlags(Qt.Popup)
+ # Initialize the UI
+ self.initUI(margin=margin,
+ icon_size=icon_size)
+ def initUI(self,
+ margin: int,
+ icon_size: int
+ ):
+ # Main layout
+ self.layout = QVBoxLayout()
+ # Top bar layout for the close and clear buttons
+ self.topBarLayout = QHBoxLayout()
+ # Set spacing between widgets in the layout to a smaller value
+ self.topBarLayout.setSpacing(1) # Adjust this value as needed
+ # Close and clear buttons
+ self.closeButton = QPushButton("X")
+ self.clearButton = QPushButton("C")
+ # Set the size of the buttons
+ self.closeButton.setFixedSize(icon_size, icon_size) # Make the button small
+ self.clearButton.setFixedSize(icon_size, icon_size) # Make the button small
+ # Connect the close button to the hide method
+ self.closeButton.clicked.connect(self.hide)
+ self.clearButton.clicked.connect(self.clear_console) # Connect clear button
+ # Add a spacer to push the close button to the right
+ self.topBarLayout.addStretch()
+ self.topBarLayout.addWidget(self.clearButton) # Add clear button
+ self.topBarLayout.addWidget(self.closeButton)
+ # Create a read-only QTextEdit widget to act as the console output
+ self.console_output = QTextEdit()
+ self.console_output.setReadOnly(True)
+ self.console_output.setLineWrapMode(QTextEdit.NoWrap)
+ # Add the top bar layout and the console output to the main layout
+ self.layout.addLayout(self.topBarLayout)
+ self.layout.addWidget(self.console_output)
+ # Set the layout margins:
+ self.layout.setContentsMargins(margin, margin, margin,
+ margin)
+ # Set the layout to the widget
+ self.setLayout(self.layout)
+ # Hide the widget initially:
+ self.hide()
+ def append_message(self, message: str, message_type: str = 'info'):
+ """
+ Append a message to the console output.
+ Args:
+ message (str): The message to append.
+ message_type (str): The type of message ('info', 'error', etc.). Can be used to format messages differently.
+ """
+ # Clean the message:
+ message = message.strip()
+ # if the message is empty, do nothing:
+ if len(message) == 0:
+ return
+ # Replace '\n' with '
' to display newlines in the QTextEdit:
+ message = message.replace('\n', '
+ # Replace '\t' with ' ' to display tabs in the QTextEdit:
+ message = message.replace('\t', ' ')
+ # Replace ' ' with ' ' to display spaces in the QTextEdit:
+ message = message.replace(' ', ' ')
+ if message_type == 'error':
+ message = f"{message}"
+ elif message_type == 'info':
+ message = f"{message}"
+ # Append the message to the console
+ self.console_output.append(message)
+ # show the widget:
+ self.show()
+ def clear_console(self):
+ """
+ Clear the console output.
+ """
+ self.console_output.clear()
+# Example usage
+if __name__ == "__main__":
+ import sys
+ app = QApplication(sys.argv)
+ consoleWidget = ConsoleWidget()
+ consoleWidget.show()
+ consoleWidget.append_message("This is an info message.", "info")
+ consoleWidget.append_message("This is an error message.", "error")
+ sys.exit(app.exec_())
diff --git a/src/microplugin/code_editor/python_code_completer.py b/src/microplugin/code_editor/python_code_completer.py
new file mode 100644
index 0000000..33cd8c2
--- /dev/null
+++ b/src/microplugin/code_editor/python_code_completer.py
@@ -0,0 +1,25 @@
+from jedi import Interpreter
+from qtpy.QtCore import QStringListModel, Qt
+from qtpy.QtWidgets import QCompleter
+class PythonCodeCompleter(QCompleter):
+ def __init__(self, parent=None):
+ super(PythonCodeCompleter, self).__init__(parent)
+ # Set the completion mode to PopupCompletion to display the completions in a popup:
+ self.setCompletionMode(QCompleter.PopupCompletion)
+ # Set the case sensitivity to case insensitive:
+ self.setCaseSensitivity(Qt.CaseInsensitive)
+ # Set the model to an empty QStringListModel:
+ self.interpreter = None
+ def updateCompletions(self, text):
+ if self.interpreter is None:
+ self.interpreter = Interpreter(text, [])
+ else:
+ self.interpreter = Interpreter(text, self.interpreter.namespaces)
+ completions = [c.name for c in self.interpreter.complete()]
+ self.setModel(QStringListModel(completions))
diff --git a/src/microplugin/code_editor/python_code_editor_manager.py b/src/microplugin/code_editor/python_code_editor_manager.py
new file mode 100644
index 0000000..dd73e17
--- /dev/null
+++ b/src/microplugin/code_editor/python_code_editor_manager.py
@@ -0,0 +1,59 @@
+from qtpy.QtWidgets import QWidget, QVBoxLayout
+from microplugin.code_editor.python_code_editor_widget import \
+ PythonCodeEditor
+class MultiEditorManager(QWidget):
+ def __init__(self,
+ on_text_modified_callback,
+ editor_widget_class: type[QWidget] = PythonCodeEditor,
+ parent=None):
+ super().__init__(parent)
+ self.on_text_modified_callback = on_text_modified_callback
+ self.editor_widget_class = editor_widget_class
+ self.layout = QVBoxLayout(self)
+ self.editors = {} # Key: filename, Value: PythonCodeEditor instance
+ self.current_editor_name = None
+ self.current_editor = None
+ def switch_to(self, filename):
+ # If we are already editing the filename, do nothing:
+ if filename == self.current_editor_name:
+ return
+ if self.current_editor:
+ # Disconnect the previous editor's textChanged signal
+ self.current_editor.textChanged.disconnect(self.on_text_modified_callback)
+ # Hide the previous editor:
+ self.current_editor.hide()
+ # If the editor for the filename does not exist, create it:
+ if filename not in self.editors:
+ editor = self.editor_widget_class(self)
+ self.editors[filename] = editor
+ self.layout.addWidget(editor)
+ # Set the current editor to the editor for the filename:
+ self.current_editor = self.editors[filename]
+ # Set the current editor name to the filename:
+ self.current_editor_name = filename
+ # Connect the current editor's textChanged signal to the callback:
+ self.current_editor.textChanged.connect(self.on_text_modified_callback)
+ # Show the current editor:
+ self.current_editor.show()
+ def close(self):
+ # Close all the editors:
+ for editor in self.editors.values():
+ editor.close()
+ # Close the parent widget
+ super().close()
diff --git a/src/microplugin/code_editor/python_code_editor_widget.py b/src/microplugin/code_editor/python_code_editor_widget.py
new file mode 100644
index 0000000..0b2b9ad
--- /dev/null
+++ b/src/microplugin/code_editor/python_code_editor_widget.py
@@ -0,0 +1,145 @@
+import jedi # Make sure jedi is installed
+from qtpy.QtCore import QStringListModel
+from qtpy.QtCore import Qt
+from qtpy.QtGui import QTextCursor
+from qtpy.QtWidgets import (
+ QPlainTextEdit,
+ QCompleter,
+from microplugin.code_editor.python_syntax_highlighting import \
+ PythonSyntaxHighlighter
+class PythonCodeEditor(QPlainTextEdit):
+ def __init__(self, parent=None):
+ super().__init__(parent)
+ # Tab Length Customization
+ self.tab_length = 4 # Default to 4 spaces
+ self.setTabStopDistance(self.fontMetrics().width(" ") * self.tab_length)
+ # Set placeholder text:
+ self.setPlaceholderText("Type your code here...")
+ # Syntax highlighter:
+ self.python_syntax_highlighter = PythonSyntaxHighlighter(
+ self.document())
+ # Completer setup:
+ self.completer = QCompleter(self)
+ self.completer.setWidget(self)
+ self.completer.setCompletionMode(QCompleter.PopupCompletion)
+ self.completer.setCaseSensitivity(Qt.CaseInsensitive)
+ self.completer.activated.connect(self.insertCompletion)
+ def textUnderCursor(self):
+ text_cursor = self.textCursor()
+ text_cursor.select(QTextCursor.WordUnderCursor)
+ return text_cursor.selectedText()
+ def insertCompletion(self, completion):
+ if self.completer.widget() != self:
+ return
+ tc = self.textCursor()
+ extra = len(completion) - len(self.completer.completionPrefix())
+ tc.movePosition(QTextCursor.Left)
+ tc.movePosition(QTextCursor.EndOfWord)
+ tc.insertText(completion[-extra:]) # insert the selected completion
+ self.setTextCursor(tc)
+ def keyPressEvent(self, event):
+ if self.completer:
+ if self.completer.popup().isVisible():
+ if event.key() in (
+ Qt.Key_Enter,
+ Qt.Key_Return,
+ Qt.Key_Escape,
+ Qt.Key_Tab,
+ Qt.Key_Backtab,
+ ):
+ # Ignore the event if the completer is visible and the key is one of the above:
+ event.ignore()
+ return
+ # Hide the completer when the user types a character:
+ self.completer.popup().hide()
+ if event.text() == ".":
+ # Show completions after typing a dot:
+ super().keyPressEvent(event)
+ self.updateCompleter(show_completions=True)
+ elif event.key() in (Qt.Key_Return, Qt.Key_Enter):
+ # Indent new lines after certain keywords:
+ cursor = self.textCursor()
+ line = cursor.block().text()
+ indentation = len(line) - len(line.lstrip(" "))
+ should_indent = any(
+ kw in line
+ for kw in [
+ "if",
+ "elif",
+ "else",
+ "for",
+ "while",
+ "try",
+ "except",
+ "finally",
+ "with",
+ "def",
+ "class",
+ ]
+ ) or line.endswith(":")
+ # Insert new line and indentation:
+ super().keyPressEvent(event)
+ self.insertPlainText(
+ " " * (indentation + (self.tab_length if should_indent else 0))
+ )
+ else:
+ super().keyPressEvent(event)
+ self.updateCompleter()
+ def updateCompleter(self, show_completions=False):
+ text_under_cursor = self.textUnderCursor()
+ if text_under_cursor != "" or show_completions:
+ # Get completions from Jedi:
+ script = jedi.Script(code=self.toPlainText(), path="temp.py")
+ completions = script.complete(
+ line=self.textCursor().blockNumber() + 1,
+ column=self.textCursor().columnNumber(),
+ )
+ completion_list = [c.name for c in completions]
+ # Update the completer model and show it:
+ self.completer.setModel(QStringListModel(completion_list))
+ if completion_list:
+ self.completer.setCompletionPrefix(text_under_cursor)
+ cr = self.cursorRect()
+ cr.setWidth(
+ self.completer.popup().sizeHintForColumn(0)
+ + self.completer.popup().verticalScrollBar().sizeHint().width()
+ )
+ self.completer.complete(cr) # popup it up!
+ elif show_completions:
+ # Close the completer if there's no text under cursor and no force show
+ self.completer.popup().hide()
+ def setPlainTextUndoable(self, text):
+ # Obtain the current text cursor from the editor
+ tc = self.textCursor()
+ # Start an undoable operation
+ tc.beginEditBlock()
+ # Select all text
+ tc.select(QTextCursor.Document)
+ # Remove the selected text (the entire document content)
+ tc.removeSelectedText()
+ # Insert the new text
+ tc.insertText(text)
+ # End the undoable operation
+ tc.endEditBlock()
\ No newline at end of file
diff --git a/src/microplugin/code_editor/python_syntax_highlighting.py b/src/microplugin/code_editor/python_syntax_highlighting.py
new file mode 100644
index 0000000..800a74d
--- /dev/null
+++ b/src/microplugin/code_editor/python_syntax_highlighting.py
@@ -0,0 +1,201 @@
+# syntax.py
+from qtpy import QtCore
+from qtpy import QtGui
+def format(color, style=''):
+ """Return a QTextCharFormat with the given attributes.
+ """
+ _color = QtGui.QColor()
+ _color.setNamedColor(color)
+ _format = QtGui.QTextCharFormat()
+ _format.setForeground(_color)
+ if 'bold' in style:
+ _format.setFontWeight(QtGui.QFont.Bold)
+ if 'italic' in style:
+ _format.setFontItalic(True)
+ return _format
+# Syntax styles that can be shared by all languages
+ 'keyword': format('#C68261'),
+ 'operator': format('#F0F0F0'),
+ 'brace': format('lightGray'),
+ 'defclass': format('#B4B6BC', 'bold'),
+ 'string': format('#5FA167'),
+ 'string2': format('#5FA167', 'italic'),
+ 'comment': format('#547760', 'italic'),
+ 'self': format('#864A7E'),
+ 'numbers': format('#25A2AF'),
+class PythonSyntaxHighlighter(QtGui.QSyntaxHighlighter):
+ """Syntax highlighter for the Python language.
+ """
+ # Python keywords
+ keywords = [
+ 'and', 'assert', 'break', 'class', 'continue', 'def',
+ 'del', 'elif', 'else', 'except', 'exec', 'finally',
+ 'for', 'from', 'global', 'if', 'import', 'in',
+ 'is', 'lambda', 'not', 'or', 'pass', 'print',
+ 'raise', 'return', 'try', 'while', 'yield',
+ 'None', 'True', 'False',
+ ]
+ # Python operators
+ operators = [
+ '=',
+ # Comparison
+ '==', '!=', '<', '<=', '>', '>=',
+ # Arithmetic
+ '\+', '-', '\*', '/', '//', '\%', '\*\*',
+ # In-place
+ '\+=', '-=', '\*=', '/=', '\%=',
+ # Bitwise
+ '\^', '\|', '\&', '\~', '>>', '<<',
+ # Dict:
+ '\|='
+ ]
+ # Python braces
+ braces = [
+ '\{', '\}', '\(', '\)', '\[', '\]',
+ ]
+ def __init__(self, parent: QtGui.QTextDocument) -> None:
+ super().__init__(parent)
+ # Multi-line strings (expression, flag, style)
+ self.tri_single = (QtCore.QRegExp("'''"), 1, STYLES['string2'])
+ self.tri_double = (QtCore.QRegExp('"""'), 2, STYLES['string2'])
+ rules = []
+ # Keyword, operator, and brace rules
+ rules += [(r'\b%s\b' % w, 0, STYLES['keyword'])
+ for w in PythonSyntaxHighlighter.keywords]
+ rules += [(r'%s' % o, 0, STYLES['operator'])
+ for o in PythonSyntaxHighlighter.operators]
+ rules += [(r'%s' % b, 0, STYLES['brace'])
+ for b in PythonSyntaxHighlighter.braces]
+ # All other rules
+ rules += [
+ # 'self'
+ (r'\bself\b', 0, STYLES['self']),
+ # 'def' followed by an identifier
+ (r'\bdef\b\s*(\w+)', 1, STYLES['defclass']),
+ # 'class' followed by an identifier
+ (r'\bclass\b\s*(\w+)', 1, STYLES['defclass']),
+ # Numeric literals
+ (r'\b[+-]?[0-9]+[lL]?\b', 0, STYLES['numbers']),
+ (r'\b[+-]?0[xX][0-9A-Fa-f]+[lL]?\b', 0, STYLES['numbers']),
+ (r'\b[+-]?[0-9]+(?:\.[0-9]+)?(?:[eE][+-]?[0-9]+)?\b', 0,
+ STYLES['numbers']),
+ # Double-quoted string, possibly containing escape sequences
+ (r'"[^"\\]*(\\.[^"\\]*)*"', 0, STYLES['string']),
+ # Single-quoted string, possibly containing escape sequences
+ (r"'[^'\\]*(\\.[^'\\]*)*'", 0, STYLES['string']),
+ # From '#' until a newline
+ (r'#[^\n]*', 0, STYLES['comment']),
+ ]
+ # Build a QRegExp for each pattern
+ self.rules = [(QtCore.QRegExp(pat), index, fmt)
+ for (pat, index, fmt) in rules]
+ def highlightBlock(self, text):
+ """Apply syntax highlighting to the given block of text.
+ """
+ self.tripleQuoutesWithinStrings = []
+ # Do other syntax formatting
+ for expression, nth, format in self.rules:
+ index = expression.indexIn(text, 0)
+ if index >= 0:
+ # if there is a string we check
+ # if there are some triple quotes within the string
+ # they will be ignored if they are matched again
+ if expression.pattern() in [r'"[^"\\]*(\\.[^"\\]*)*"',
+ r"'[^'\\]*(\\.[^'\\]*)*'"]:
+ innerIndex = self.tri_single[0].indexIn(text, index + 1)
+ if innerIndex == -1:
+ innerIndex = self.tri_double[0].indexIn(text, index + 1)
+ if innerIndex != -1:
+ tripleQuoteIndexes = range(innerIndex, innerIndex + 3)
+ self.tripleQuoutesWithinStrings.extend(
+ tripleQuoteIndexes)
+ while index >= 0:
+ # skipping triple quotes within strings
+ if index in self.tripleQuoutesWithinStrings:
+ index += 1
+ expression.indexIn(text, index)
+ continue
+ # We actually want the index of the nth match
+ index = expression.pos(nth)
+ length = len(expression.cap(nth))
+ self.setFormat(index, length, format)
+ index = expression.indexIn(text, index + length)
+ self.setCurrentBlockState(0)
+ # Do multi-line strings
+ in_multiline = self.match_multiline(text, *self.tri_single)
+ if not in_multiline:
+ in_multiline = self.match_multiline(text, *self.tri_double)
+ def match_multiline(self, text, delimiter, in_state, style):
+ """Do highlighting of multi-line strings. ``delimiter`` should be a
+ ``QRegExp`` for triple-single-quotes or triple-double-quotes, and
+ ``in_state`` should be a unique integer to represent the corresponding
+ state changes when inside those strings. Returns True if we're still
+ inside a multi-line string when this function is finished.
+ """
+ # If inside triple-single quotes, start at 0
+ if self.previousBlockState() == in_state:
+ start = 0
+ add = 0
+ # Otherwise, look for the delimiter on this line
+ else:
+ start = delimiter.indexIn(text)
+ # skipping triple quotes within strings
+ if start in self.tripleQuoutesWithinStrings:
+ return False
+ # Move past this match
+ add = delimiter.matchedLength()
+ # As long as there's a delimiter match on this line...
+ while start >= 0:
+ # Look for the ending delimiter
+ end = delimiter.indexIn(text, start + add)
+ # Ending delimiter on this line?
+ if end >= add:
+ length = end - start + add + delimiter.matchedLength()
+ self.setCurrentBlockState(0)
+ # No; multi-line string
+ else:
+ self.setCurrentBlockState(in_state)
+ length = len(text) - start + add
+ # Apply formatting
+ self.setFormat(start, length, style)
+ # Look for the next match
+ start = delimiter.indexIn(text, start + length)
+ # Return True if still inside a multi-line string, False otherwise
+ if self.currentBlockState() == in_state:
+ return True
+ else:
+ return False
+# Custom dialog for displaying large amounts of text within reasonable limits
+from qtpy.QtCore import Qt
+from qtpy.QtWidgets import QDialog, QTextEdit, QSizePolicy, QVBoxLayout, \
+ QPushButton
+class TextDialog(QDialog):
+ def __init__(self, title, text, icon=None,
+ parent=None): # icon parameter is optional and can be None
+ super().__init__(parent)
+ self.setWindowTitle(title)
+ self.setWindowModality(Qt.ApplicationModal)
+ # Use a QTextEdit inside the dialog to display long messages
+ self.textEdit = QTextEdit()
+ self.textEdit.setText(text)
+ self.textEdit.setReadOnly(True)
+ self.textEdit.setSizePolicy(QSizePolicy.Expanding,
+ QSizePolicy.Expanding)
+ # "OK" button to close the dialog
+ self.okButton = QPushButton("OK")
+ # Connect the button's clicked signal to the dialog's close slot:
+ self.okButton.clicked.connect(self.close)
+ # Layout
+ layout = QVBoxLayout()
+ layout.addWidget(self.textEdit)
+ # Add the button to the layout, aligned at the center
+ layout.addWidget(self.okButton, 0, Qt.AlignCenter)
+ self.setLayout(layout)
+ # Set the window icon if an icon is provided
+ if icon is not None:
+ self.setWindowIcon(icon)
\ No newline at end of file
+from typing import Optional
+from arbol import aprint
+from qtpy.QtWidgets import QHBoxLayout, QWidget, QPushButton, QLabel, \
+ QSizePolicy, QLineEdit, QTextEdit
+class TextInputWidget(QWidget):
+ def __init__(self,
+ max_height: int = 50,
+ margin: int = 0,
+ parent=None):
+ super().__init__(parent=parent)
+ # Initialize callbacks:
+ self.enter_callback = None
+ self.cancel_callback = None
+ self.do_after_callable = None
+ #
+ self.multi_line = False
+ # Initialize widgets:
+ self.initUI(max_height=max_height,
+ margin=margin)
+ def initUI(self,
+ max_height: int,
+ margin: int):
+ # Layout:
+ self.layout = QHBoxLayout(self)
+ # Message label:
+ self.message_label = QLabel()
+ self.layout.addWidget(self.message_label)
+ # Input fields (both single-line and multi-line, hidden by default):
+ self.single_line_input = QLineEdit()
+ self.single_line_input.setPlaceholderText("Enter text here")
+ self.single_line_input.setSizePolicy(QSizePolicy.Expanding,
+ QSizePolicy.Preferred)
+ self.single_line_input.returnPressed.connect(self.on_enter)
+ self.layout.addWidget(self.single_line_input)
+ self.multi_line_input = QTextEdit()
+ self.multi_line_input.setPlaceholderText("Enter text here")
+ self.multi_line_input.setSizePolicy(QSizePolicy.Expanding,
+ QSizePolicy.Preferred)
+ self.layout.addWidget(self.multi_line_input)
+ self.multi_line_input.hide() # Initially hidden
+ # Buttons:
+ self.enter_button = QPushButton('Enter')
+ self.enter_button.clicked.connect(self.on_enter)
+ self.layout.addWidget(self.enter_button)
+ self.cancel_button = QPushButton('Cancel')
+ self.cancel_button.clicked.connect(self.on_cancel)
+ self.layout.addWidget(self.cancel_button)
+ self.layout.setContentsMargins(margin, margin, margin, margin)
+ self.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Minimum)
+ self.setMaximumHeight(max_height)
+ self.hide()
+ def show_input(self, message: str,
+ placeholder_text: str = "Enter input here",
+ default_text: str = '',
+ enter_text: str = "Enter",
+ cancel_text: str = "Cancel",
+ enter_callback=None,
+ cancel_callback=None,
+ do_after_callable=None,
+ multi_line: bool = False,
+ max_height: Optional[int] = None):
+ # Set multi_line:
+ self.multi_line = multi_line
+ # Set the maximum height if provided:
+ if max_height:
+ self.setMaximumHeight(max_height)
+ # Hide both inputs initially:
+ self.single_line_input.hide()
+ self.multi_line_input.hide()
+ # Determine which input field to show based on multi_line:
+ if multi_line:
+ self.current_input = self.multi_line_input
+ else:
+ self.current_input = self.single_line_input
+ # Configure the chosen input field and show it:
+ self.current_input.setText(default_text)
+ self.current_input.setPlaceholderText(placeholder_text)
+ self.current_input.show()
+ # Update labels and buttons:
+ self.message_label.setText(message)
+ self.enter_button.setText(enter_text)
+ self.cancel_button.setText(cancel_text)
+ # Show or hide the cancel button based on cancel_text:
+ if cancel_text:
+ self.cancel_button.show()
+ else:
+ self.cancel_button.hide()
+ # Set the callbacks:
+ self.enter_callback = enter_callback
+ self.cancel_callback = cancel_callback
+ self.do_after_callable = do_after_callable
+ # Show the widget:
+ self.show()
+ def on_enter(self):
+ input_text = self.current_input.text() if not self.multi_line else self.current_input.toPlainText()
+ try:
+ if self.enter_callback:
+ self.enter_callback(input_text)
+ except Exception as e:
+ aprint(f'Error in on_enter: {e}')
+ import traceback
+ traceback.print_exc()
+ finally:
+ self.hide()
+ if self.do_after_callable:
+ self.do_after_callable(input_text)
+ def on_cancel(self):
+ try:
+ if self.cancel_callback:
+ self.cancel_callback('')
+ except Exception as e:
+ aprint(f'Error in on_enter: {e}')
+ import traceback
+ traceback.print_exc()
+ finally:
+ self.hide()
+ if self.do_after_callable:
+ self.do_after_callable('')
+from typing import Optional
+from arbol import aprint
+from qtpy.QtWidgets import QHBoxLayout, QWidget, QPushButton, QLabel, \
+ QSizePolicy
+class YesNoCancelQuestionWidget(QWidget):
+ def __init__(self,
+ max_height: int = 50,
+ margin: int = 0,
+ parent=None):
+ super().__init__(parent=parent)
+ # Initialize callbacks:
+ self.yes_callback = None
+ self.no_callback = None
+ self.cancel_callback = None
+ self.do_after_callable = None
+ # Initialize widgets:
+ self.initUI(max_height=max_height,
+ margin=margin)
+ def initUI(self, max_height: int,
+ margin: int):
+ layout = QHBoxLayout()
+ # Set message:
+ self.message_label = QLabel()
+ self.message_label.setText('')
+ # Buttons:
+ self.yes_button = QPushButton('')
+ self.no_button = QPushButton('')
+ self.cancel_button = QPushButton('')
+ # Connect buttons to callbacks:
+ self.yes_button.clicked.connect(self.on_yes)
+ self.no_button.clicked.connect(self.on_no)
+ self.cancel_button.clicked.connect(self.on_cancel)
+ # Add widgets to layout:
+ layout.addWidget(self.message_label, 1)
+ layout.addWidget(self.yes_button)
+ layout.addWidget(self.no_button)
+ layout.addWidget(self.cancel_button)
+ layout.setContentsMargins(margin, margin, margin,
+ margin) # Reduce margins if necessary
+ self.setLayout(layout)
+ # Set the vertical size policy to Minimum so it takes the least vertical space
+ self.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Minimum)
+ # Attempt to directly control the widget's size
+ self.setMaximumHeight(max_height) # Adjust 100 to your needs
+ # Hide the widget initially:
+ self.hide()
+ def show_question(self, message: str,
+ yes_text: str = "Yes",
+ no_text: str = "No",
+ cancel_text: Optional[str] = "Cancel",
+ yes_callback=None,
+ no_callback=None,
+ cancel_callback=None,
+ do_after_callable=None):
+ self.message_label.setText(message)
+ self.yes_button.setText(yes_text)
+ self.no_button.setText(no_text)
+ if cancel_text:
+ self.cancel_button.setText(cancel_text)
+ self.cancel_button.show()
+ else:
+ self.cancel_button.hide()
+ self.yes_callback = yes_callback
+ self.no_callback = no_callback
+ self.cancel_callback = cancel_callback
+ self.do_after_callable = do_after_callable
+ self.show()
+ def on_yes(self):
+ try:
+ if self.yes_callback:
+ self.yes_callback()
+ except Exception as e:
+ aprint(f'Error in on_yes: {e}')
+ import traceback
+ traceback.print_exc()
+ finally:
+ self.hide()
+ if self.do_after_callable:
+ self.do_after_callable()
+ def on_no(self):
+ try:
+ if self.no_callback:
+ self.no_callback()
+ except Exception as e:
+ aprint(f'Error in on_no: {e}')
+ import traceback
+ traceback.print_exc()
+ finally:
+ self.hide()
+ if self.do_after_callable:
+ self.do_after_callable()
+ def on_cancel(self):
+ try:
+ if self.cancel_callback:
+ self.cancel_callback()
+ except Exception as e:
+ aprint(f'Error in on_cancel: {e}')
+ import traceback
+ traceback.print_exc()
+ finally:
+ self.hide()
+ if self.do_after_callable:
+ self.do_after_callable()
+import sys
+import tempfile
+import traceback
+from qtpy.QtWidgets import QApplication
+from microplugin.microplugin_window import MicroPluginMainWindow
+def myExceptionHook(exctype, value, tb):
+ traceback.print_exception(exctype, value, tb)
+ # You can also log to a file here or show a dialog to the user
+# Override the default excepthook with our custom function
+sys.excepthook = myExceptionHook
+# Enable garbage collection debugging:
+if __name__ == "__main__":
+ app = QApplication(sys.argv)
+ # temp folder as obtained with tempfile.TemporaryDirectory() in the main script:
+ temp_folder = tempfile.TemporaryDirectory()
+ # Turn off the singleton pattern:
+ MicroPluginMainWindow._singleton_pattern_active = False
+ # Instantiate the MicroPluginMainWindow:
+ mainWindow = MicroPluginMainWindow(napari_viewer=None,
+ folder_path=temp_folder.name)
+ # Show the MicroPluginMainWindow:
+ mainWindow.show()
+ sys.exit(app.exec_())
+from pathlib import Path
+from typing import Union
+from black import FileMode, format_file_in_place, WriteBack
+def format_code(code: str) -> str:
+ """Format the code using black."""
+ try:
+ from black import format_str, FileMode, InvalidInput
+ # Format the code string using Black
+ formatted_code_str = format_str(code, mode=FileMode())
+ return formatted_code_str
+ except Exception as e:
+ import traceback
+ print(traceback.format_exc())
+ return code
+def format_file(file_path: Union[str, Path]) -> None:
+ """Format the file using black."""
+ try:
+ # Ensure file_path is a Path object
+ if isinstance(file_path, str):
+ file_path = Path(file_path)
+ # Format the file using Black
+ format_file_in_place(file_path,
+ fast=False,
+ mode=FileMode(),
+ write_back=WriteBack.YES)
+ except Exception as e:
+ import traceback
+ print(traceback.format_exc())
+ finally:
+ pass
+import os
+import sys
+from typing import Tuple, Optional
+from arbol import aprint
+from napari._qt.qt_resources import get_current_stylesheet
+from qtpy.QtCore import Qt
+from qtpy.QtWidgets import QApplication
+from microplugin.code_editor.code_snippet_editor_window import \
+ CodeSnippetEditorWindow
+class MicroPluginMainWindow(CodeSnippetEditorWindow):
+ _singleton_pattern_active = True
+ _singleton_instance = None
+ _singleton_instance_initialized = False
+ def __new__(cls, *args, **kwargs):
+ if cls._singleton_pattern_active:
+ if cls._singleton_instance is None:
+ # Call __new__ of the parent class and save the instance:
+ cls._singleton_instance = super(MicroPluginMainWindow, cls).__new__(cls)
+ return cls._singleton_instance
+ else:
+ # We still want the last instance to be recorded:
+ cls._singleton_instance = super().__new__(cls)
+ return cls._singleton_instance
+ def __init__(self,
+ napari_viewer,
+ folder_path: Optional[str] = None,
+ size: Optional[Tuple[int, int]] = None,
+ parent=None,
+ *args,
+ **kwargs):
+ # If the singleton instance is already initialized, just return it:
+ if MicroPluginMainWindow._singleton_pattern_active and MicroPluginMainWindow._singleton_instance_initialized:
+ return
+ if folder_path is None:
+ # Local import to avoid circular import:
+ from napari_chatgpt.utils.configuration.app_configuration import \
+ AppConfiguration
+ # Get configuration
+ config = AppConfiguration('microplugins')
+ # default folder is microplugins in the user's home directory:
+ default_folder_path = '~/microplugins'
+ # Get the folder path:
+ folder_path = config.get('folder', default_folder_path)
+ # Make sure that the folder exists:
+ folder_path = os.path.expanduser(folder_path)
+ if not os.path.exists(folder_path):
+ os.makedirs(folder_path)
+ super(MicroPluginMainWindow, self).__init__(folder_path=folder_path,
+ title="Micro-Plugins Editor",
+ size=size,
+ variables={'viewer':napari_viewer},
+ parent=parent,
+ *args,
+ **kwargs)
+ # Disable discovery worker until we send:
+ self.code_editor_widget.client.discover_worker.is_enabled = False
+ # Make sure that when user closes the window, it just hides the window, it does not really close it:
+ self.setAttribute(Qt.WA_DeleteOnClose, False)
+ # Get current stylesheet from napari:
+ current_style = get_current_stylesheet()
+ self.setStyleSheet(current_style)
+ # Set window size and position to be centered and occupy specified screen space
+ try:
+ screen = QApplication.primaryScreen().geometry()
+ desired_width = screen.width() * 0.5 # 50% of screen width
+ desired_height = screen.height() * 0.6 # 60% of screen height
+ left = (screen.width() - desired_width) / 2
+ top = (screen.height() - desired_height) / 2
+ self.setGeometry(int(left), int(top), int(desired_width), int(desired_height))
+ except Exception as e:
+ aprint(f'Error setting window size and position: {e}')
+ import traceback
+ traceback.print_exc()
+ # LLM settings:
+ self.llm_model_name = None
+ # Set the instance as initialized:
+ MicroPluginMainWindow._singleton_instance_initialized = True
+ @staticmethod
+ def add_snippet(filename: str,
+ code: Optional[str] = None):
+ # Create a new file with the given code:
+ MicroPluginMainWindow._singleton_instance.code_editor_widget.new_file(filename=filename, code=code, postfix_if_exists='_new_from_omega')
+if __name__ == "__main__":
+ app = QApplication(sys.argv)
+ mainWindow = MicroPluginMainWindow()
+ mainWindow.show()
+ sys.exit(app.exec_())
+import os
+import socket
+import time
+from qtpy.QtCore import Slot, QObject, Signal
+class BroadcastWorker(QObject):
+ error = Signal(Exception)
+ def __init__(self,
+ sock,
+ multicast_groups,
+ port,
+ broadcast_interval: int = 1):
+ super().__init__()
+ # Store the socket object:
+ self.sock = sock
+ # Multicast group address:
+ self.multicast_groups = multicast_groups
+ # Port number that the server listens on:
+ self.port = port
+ # Broadcast interval:
+ self.broadcast_interval = broadcast_interval
+ # Flags to control the worker:
+ self.is_enabled = True
+ self.is_running = True
+ def set_enabled(self, enabled):
+ self.is_enabled = enabled
+ def stop(self):
+ self.is_running = False
+ @Slot()
+ def broadcast(self):
+ # Run the broadcast loop:
+ while self.is_running:
+ try:
+ if self.is_enabled:
+ for self.multicast_group in self.multicast_groups:
+ # get hostname:
+ hostname = socket.gethostname()
+ # get username:
+ username = os.getlogin()
+ # Format the message to include hostname and port
+ message = f"{username}:{hostname}:{self.port}".encode()
+ # Send the message to the multicast group:
+ self.sock.sendto(message, self.multicast_group)
+ else:
+ # If the broadcast is disabled, just sleep for a while
+ pass
+ time.sleep(self.broadcast_interval)
+ # Handle exceptions and emit an error signal:
+ except Exception as e:
+ import traceback
+ traceback.print_exc()
+ self.error.emit(e)
+ # Note: exception handling is within the loop so that the thread doesn't die
+import json
+import os
+import socket
+from threading import Lock
+from time import sleep
+from arbol import aprint
+from qtpy.QtCore import QObject, Signal, Slot, QThread
+from microplugin.network.code_drop_server import CodeDropServer
+from microplugin.network.discover_worker import DiscoverWorker
+class CodeDropClient(QObject):
+ def __init__(self, multicast_groups=None):
+ super().__init__()
+ if multicast_groups is None:
+ multicast_groups = CodeDropServer._code_drop_multicast_groups
+ self.multicast_groups = multicast_groups
+ self.servers = {} # Mapping server names to addresses
+ # Store thread and worker references to prevent premature garbage collection
+ self.discover_thread = None
+ self.discover_worker = None
+ self.send_thread = None
+ self.send_worker = None
+ # initialise locks:
+ self.sending_lock = Lock()
+ self.init_discovery()
+ def init_discovery(self):
+ # Create a worker and move it to a thread
+ self.discover_thread = QThread()
+ self.discover_thread.setTerminationEnabled(True)
+ self.discover_thread.setObjectName("DiscoverThread")
+ self.discover_worker = DiscoverWorker(self.multicast_groups)
+ self.discover_worker.moveToThread(self.discover_thread)
+ # Ensure the thread is properly stopped and cleaned up before exiting
+ self.discover_thread.started.connect(
+ self.discover_worker.discover_servers)
+ self.discover_worker.server_discovered.connect(self.update_servers)
+ self.discover_worker.error.connect(self.handle_error)
+ # Cleanup on completion
+ # self.discover_worker.finished.connect(self.discover_thread.quit)
+ # self.discover_worker.finished.connect(self.discover_worker.deleteLater)
+ # self.discover_thread.finished.connect(self.discover_thread.deleteLater)
+ def start_discovering(self):
+ if self.discover_thread is not None:
+ # Start the thread and begin discovering servers:
+ self.discover_thread.start()
+ def stop_discovering(self):
+ if self.discover_worker and self.discover_thread:
+ # Ensure there's a stop method to signal the worker to terminate:
+ self.discover_worker.stop()
+ self.discover_thread.quit()
+ self.discover_thread.wait()
+ self.discover_thread = None
+ def update_servers(self, user_name, server_name, server_address,
+ server_port):
+ # Server name and port are the key:
+ key = f"{server_name}:{server_port}"
+ self.servers[key] = (user_name, server_address, server_port)
+ # Update your GUI or data structure with new server information here
+ def send_code_message(self,
+ server_address: str,
+ server_port: int,
+ filename: str,
+ code: str):
+ # get hostname:
+ hostname = socket.gethostname()
+ # Get username (login) from the system:
+ username = os.getlogin()
+ # Message dict:
+ message_dict = {
+ 'hostname': hostname,
+ 'username': username,
+ 'filename': filename,
+ 'code': code}
+ # Convert dict to JSON string:
+ message_str = json.dumps(message_dict)
+ # Send message:
+ self.send_message_by_address(server_address,
+ server_port,
+ message_str)
+ def send_message_by_address(self,
+ server_address: str,
+ server_port: int,
+ message: str):
+ with self.sending_lock:
+ # Check if there's already a thread running for sending messages:
+ max_number_of_attempts: int = 10
+ while self.send_thread is not None and self.send_thread.isRunning():
+ self.send_thread.quit()
+ self.send_thread.wait()
+ aprint(
+ "A send thread is already running. Wait for it to finish.")
+ sleep(0.1)
+ max_number_of_attempts -= 1
+ # If the thread is taking too long, stop waiting and don't send the message:
+ if max_number_of_attempts == 0:
+ aprint(
+ "Max number of attempts reached. Can't send message.")
+ return
+ # Create a QThread each time for sending messages
+ self.send_thread = QThread()
+ self.send_thread.setTerminationEnabled(True)
+ self.send_thread.setObjectName("SendThread")
+ # Create a worker to send the message and move it to the thread:
+ self.send_worker = self.create_send_worker(server_address,
+ server_port, message)
+ self.send_worker.moveToThread(self.send_thread)
+ # Connect the thread started signal to the worker's send method:
+ self.send_thread.started.connect(self.send_worker.send)
+ # Start the thread:
+ self.send_thread.start()
+ aprint(
+ f"Sending message of length: {len(message)} to {server_address}:{server_port}")
+ def create_send_worker(self, server_address, server_port, message):
+ parent_self = self
+ class SendWorker(QObject):
+ finished = Signal()
+ @Slot()
+ def send(self):
+ with parent_self.sending_lock:
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect((server_address, server_port))
+ sock.sendall(message.encode())
+ aprint(
+ f"Message of length: {len(message)} sent to {server_address}:{server_port}")
+ except Exception as e:
+ aprint(f"Error sending message: {e}")
+ import traceback
+ traceback.print_exc()
+ parent_self.handle_error(e)
+ finally:
+ sock.close()
+ self.finished.emit()
+ parent_self.send_worker = None
+ return SendWorker()
+ def handle_error(self, e):
+ aprint(f"Error: {e}")
+ def stop(self):
+ self.stop_discovering()
+import random
+import socket
+import struct
+from typing import Optional, Callable
+from arbol import aprint
+from qtpy.QtCore import QObject, QThread
+from microplugin.network.broadcast_worker import BroadcastWorker
+from microplugin.network.receive_worker import ReceiveWorker
+class CodeDropServer(QObject):
+ _code_drop_multicast_groups = [('', 5007), ('', 5008)]
+ def __init__(self,
+ callback: Callable,
+ multicast_groups=None,
+ server_port: Optional[int] = None):
+ super().__init__()
+ if multicast_groups is None:
+ # choose a random multicast group:
+ multicast_groups = CodeDropServer._code_drop_multicast_groups
+ aprint(f"Multicast group: {multicast_groups}")
+ if server_port is None:
+ # choose a random valid port number:
+ server_port = self._find_port()
+ aprint(f"Server port chosen: {server_port}")
+ # Store the multicast group:
+ self.multicast_groups = multicast_groups
+ # This machine's hostname:
+ self.server_hostname = socket.gethostname()
+ # Store the server port number:
+ self.server_port = server_port
+ # Store the callback function:
+ self.callback = callback
+ # Create a socket for broadcasting:
+ self.broadcast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.broadcast_socket.setsockopt(socket.IPPROTO_IP,
+ socket.IP_MULTICAST_TTL, struct.pack('b', 32) )
+ # Setup the broadcast worker and thread:
+ self.broadcast_thread = QThread()
+ self.broadcast_thread.setTerminationEnabled(True)
+ self.broadcast_thread.setObjectName("BroadcastThread")
+ self.broadcast_worker = BroadcastWorker(self.broadcast_socket,
+ self.multicast_groups,
+ self.server_port)
+ self.broadcast_worker.moveToThread(self.broadcast_thread)
+ self.broadcast_thread.started.connect(self.broadcast_worker.broadcast)
+ self.broadcast_worker.error.connect(self.handle_error)
+ # Setup the receive worker and thread:
+ self.receive_thread = QThread()
+ self.receive_thread.setTerminationEnabled(True)
+ self.receive_thread.setObjectName("ReceiveThread")
+ self.receive_worker = ReceiveWorker(self.server_port)
+ self.receive_worker.moveToThread(self.receive_thread)
+ self.receive_thread.started.connect(
+ self.receive_worker.receive_messages)
+ self.receive_worker.message_received.connect(
+ lambda addr, msg: self.callback(addr, msg))
+ self.receive_worker.error.connect(self.handle_error)
+ def _find_port(self):
+ port = 5000 + random.randint(0, 100)
+ while True:
+ try:
+ # check if the port is already in use:
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.bind(('', port))
+ s.close()
+ break
+ except OSError:
+ port = 5000 + random.randint(0, 100)
+ return port
+ def start_broadcasting(self):
+ self.broadcast_thread.start()
+ def start_receiving(self):
+ self.receive_thread.start()
+ def stop_broadcasting(self):
+ if self.broadcast_worker and self.broadcast_thread:
+ # Stop the broadcast worker and thread:
+ self.broadcast_worker.stop()
+ self.broadcast_thread.quit()
+ self.broadcast_thread.wait()
+ def stop_receiving(self):
+ if self.receive_worker and self.receive_thread:
+ # Stop the receive worker and thread:
+ self.receive_worker.stop()
+ self.receive_thread.quit()
+ self.receive_thread.wait()
+ def stop(self):
+ self.stop_broadcasting()
+ self.stop_receiving()
+ def handle_error(self, e):
+ aprint(f"Error: {e}")
+import sys
+from qtpy.QtWidgets import QApplication
+from microplugin.network.code_drop_client import CodeDropClient
+def on_server_discovered(server_name, server_address, server_port):
+ print(f"Discovered server: {server_name} at {server_address}")
+ client.send_message_by_address(server_address, server_port,
+ "Hello, Server!")
+if __name__ == "__main__":
+ app = QApplication(sys.argv)
+ client = CodeDropClient()
+ client.discover_worker.server_discovered.connect(on_server_discovered)
+ client.start_discovering()
+ # open a widget to send messages:
+ # client_ui = CodeDropClientWidget(client)
+ # client_ui.show()
+ sys.exit(app.exec_())
+ print(client)
+import sys
+from qtpy.QtWidgets import QApplication
+from microplugin.network.code_drop_server import CodeDropServer
+def server_message_received(addr, message):
+ print(f"Message from {addr}: {message}")
+if __name__ == "__main__":
+ app = QApplication(sys.argv)
+ server = CodeDropServer(server_message_received)
+ server.start_broadcasting()
+ server.start_receiving()
+ sys.exit(app.exec_())
+import socket
+import struct
+from time import sleep
+from arbol import aprint
+from qtpy.QtCore import Slot, QObject, Signal
+class DiscoverWorker(QObject):
+ # Signal for discovered servers:
+ server_discovered = Signal(str, str, str,
+ int) # user_name, server_name, server_addr, server_port
+ # Signal for errors:
+ error = Signal(Exception)
+ # Signal to indicate the worker has finished its work
+ finished = Signal()
+ def __init__(self, multicast_groups):
+ super().__init__()
+ self.multicast_groups = multicast_groups
+ self.is_running = True
+ self.is_enabled = True
+ def stop(self):
+ self.is_running = False
+ def close(self):
+ self.stop()
+ @Slot()
+ def discover_servers(self):
+ try:
+ available_multicast_group = None
+ # Trying to bind to any of multicast groups (usefull for testing purposes):
+ for multicast_group in self.multicast_groups:
+ try:
+ # Create a socket to listen for multicast messages:
+ broadcast_listening_socket = socket.socket(socket.AF_INET,
+ socket.SOCK_DGRAM,
+ socket.IPPROTO_UDP)
+ broadcast_listening_socket.setsockopt(socket.SOL_SOCKET,
+ socket.SO_REUSEADDR,
+ 1)
+ # Bind the socket to the multicast group:
+ broadcast_listening_socket.bind(('', multicast_group[1]))
+ # Store the multicast group that worked:
+ available_multicast_group = multicast_group
+ aprint(f"Bound to multicast group: {multicast_group}")
+ # If we reach this point without any exceptions, we can assume the socket is ready to receive messages:
+ break
+ except OSError as e:
+ aprint(
+ f"Error binding to multicast group {multicast_group}: {e}")
+ import traceback
+ traceback.print_exc()
+ aprint(f"Most likely the multicast group is already in use by another instance of Omega! Only affects sending of code snippets.")
+ # Tell the kernel to add the multicast group to the multicast group:
+ mreq = struct.pack("4sl",
+ socket.inet_aton(available_multicast_group[0]),
+ socket.INADDR_ANY)
+ # Join the multicast group:
+ broadcast_listening_socket.setsockopt(socket.IPPROTO_IP,
+ mreq)
+ # Set a timeout of 5 seconds
+ broadcast_listening_socket.settimeout(1.0)
+ # Counter use to keep track of how many times we timedout
+ counter = 0
+ while self.is_running:
+ try:
+ if self.is_enabled:
+ try:
+ # Receive the data and sender's address:
+ data, addr = broadcast_listening_socket.recvfrom(1024)
+ if data:
+ # Directly use the received data, assuming it's in 'hostname:port' format
+ server_info = data.decode().strip()
+ user_name, server_name, server_port = server_info.split(
+ ':')
+ server_addr = addr[0]
+ server_port = int(server_port)
+ self.server_discovered.emit(user_name, server_name,
+ server_addr, server_port)
+ else:
+ break # No more data, stop the loop
+ except socket.timeout:
+ counter = counter + 1
+ if counter > 30:
+ aprint("No servers discovered received within the last 30 seconds.")
+ counter = 0
+ else:
+ # If discovery is disabled, just sleep for a while:
+ sleep(0.5)
+ except Exception as e:
+ import traceback
+ traceback.print_exc()
+ self.error.emit(e)
+ # Note: exception handling is within the loop so that the thread doesn't die
+ except Exception as e:
+ import traceback
+ traceback.print_exc()
+ self.error.emit(e)
+ finally:
+ broadcast_listening_socket.close()
+ self.finished.emit() # Emit finished signal when done
+ return
+import socket
+from arbol import aprint
+from qtpy.QtCore import Slot, QObject, Signal
+class ReceiveWorker(QObject):
+ # Signal for received messages:
+ message_received = Signal(tuple, str) # Signal for received messages
+ # Signal for errors:
+ error = Signal(Exception)
+ def __init__(self, port):
+ super().__init__()
+ self.port = port
+ self.is_running = True
+ def stop(self):
+ self.is_running = False
+ @Slot()
+ def receive_messages(self):
+ try:
+ aprint(f"Listening for messages on port: {self.port}")
+ # Create a socket to listen for incoming connections:
+ server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ # Bind the socket to the port:
+ server_socket.bind(('', self.port))
+ # Listen for incoming connections:
+ server_socket.listen(5)
+ # Set a timeout of 1 seconds
+ server_socket.settimeout(1.0)
+ while self.is_running:
+ try:
+ client_socket = None
+ # Accept the connection:
+ client_socket, addr = server_socket.accept()
+ aprint(f"Connection from: {addr}")
+ chunks = []
+ while True:
+ chunk = client_socket.recv(1024)
+ if not chunk:
+ break # Connection is closed, and we've received the complete message.
+ chunks.append(chunk)
+ # Combine the chunks to form the complete message:
+ message = b''.join(chunks).decode()
+ # Emit the message received signal:
+ self.message_received.emit(addr, message)
+ except socket.timeout:
+ # No biggie! Just keep listening:
+ pass
+ except Exception as e:
+ # That's a biggie! Emit the error signal:
+ import traceback
+ traceback.print_exc()
+ self.error.emit(e)
+ finally:
+ # Close the client socket:
+ if client_socket:
+ client_socket.close()
+ except Exception as e:
+ import traceback
+ traceback.print_exc()
+ self.error.emit(e)
+ finally:
+ server_socket.close()
+import sys
+from PyQt5.QtCore import Qt
+from PyQt5.QtWidgets import QApplication
+from PyQt5.QtWidgets import QGridLayout, QPushButton, QStyle, QWidget
+QApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True) # Enable high-DPI scaling
+QApplication.setAttribute(Qt.AA_UseHighDpiPixmaps, True) # Use high-DPI icons
+class Window(QWidget):
+ def __init__(self):
+ super(Window, self).__init__()
+ icons = sorted([attr for attr in dir(QStyle) if attr.startswith("SP_")])
+ layout = QGridLayout()
+ for n, name in enumerate(icons):
+ btn = QPushButton(name)
+ pixmapi = getattr(QStyle, name)
+ icon = self.style().standardIcon(pixmapi)
+ btn.setIcon(icon)
+ layout.addWidget(btn, n / 4, n % 4)
+ self.setLayout(layout)
+app = QApplication(sys.argv)
+w = Window()
\ No newline at end of file
result = chat(messages)
-from langchain.agents import initialize_agent, AgentType, AgentExecutor
+from langchain.agents import AgentExecutor
from langchain_community.chat_models import ChatOpenAI
from napari_chatgpt.omega.tools.search.web_search_tool import WebSearchTool
from napari_chatgpt.omega.tools.special.python_repl import \
-from langchain.tools import BearlyInterpreterTool, DuckDuckGoSearchRun
llm = ChatOpenAI(temperature=0, model="gpt-4-1106-preview")
# Define a list of tools offered by the agent
import base64
-import openai
-import os
from openai.resources.chat import Completions
-from openai.types.chat import ChatCompletion
# Updated file path to a JPEG image
#image_path = "/Users/royer/Downloads/image.jpeg"
diff --git a/src/napari_chatgpt/_sandbox/qtawesome_demo.py b/src/napari_chatgpt/_sandbox/qtawesome_demo.py
+# -*- coding: utf-8 -*-
+# Standard library imports
+import sys
+from qtawesome import icon, IconWidget, Spin, Pulse, font, set_global_defaults
+# Third party imports
+from qtpy import QtCore, QtWidgets
+class AwesomeExample(QtWidgets.QDialog):
+ def __init__(self):
+ super().__init__()
+ # Label for supported fonts
+ supported_fonts_label = QtWidgets.QLabel('Supported fonts (prefix)')
+ supported_fonts_label.setAlignment(QtCore.Qt.AlignCenter)
+ # Get FontAwesome 5.x icons by name in various styles by name
+ fa5_icon = icon('fa5.file-alt')
+ fa5_button = QtWidgets.QPushButton(fa5_icon, 'Font Awesome regular (fa5)')
+ fa5s_icon = icon('fa5s.flag')
+ fa5s_button = QtWidgets.QPushButton(fa5s_icon, 'Font Awesome solid (fa5s)')
+ fa5b_icon = icon('fa5b.github')
+ fa5b_button = QtWidgets.QPushButton(fa5b_icon, 'Font Awesome brands (fa5b)')
+ # Get Elusive icons by name
+ asl_icon = icon('ei.asl')
+ elusive_button = QtWidgets.QPushButton(asl_icon, 'Elusive Icons (ei)')
+ # Get Material Design icons by name
+ apn_icon = icon('mdi6.access-point-network')
+ mdi6_button = QtWidgets.QPushButton(apn_icon, 'Material Design (mdi, mdi6)')
+ # Get Phosphor by name
+ mic_icon = icon('ph.microphone-fill')
+ ph_button = QtWidgets.QPushButton(mic_icon, 'Phosphor Icons (ph)')
+ # Get Remix Icon by name
+ truck_icon = icon('ri.truck-fill')
+ ri_button = QtWidgets.QPushButton(truck_icon, 'Remix Icons (ri)')
+ # Get Microsoft's Codicons by name
+ squirrel_icon = icon('msc.squirrel')
+ msc_button = QtWidgets.QPushButton(squirrel_icon, 'Codicons (msc)')
+ # Label for style options and animations
+ styles_label = QtWidgets.QLabel('Styles')
+ styles_label.setAlignment(QtCore.Qt.AlignCenter)
+ # Rotated
+ rot_icon = icon('mdi.access-point-network', rotated=45)
+ rot_button = QtWidgets.QPushButton(rot_icon, 'Rotated Icons')
+ # Horizontal flip
+ hflip_icon = icon('mdi.account-alert', hflip=True)
+ hflip_button = QtWidgets.QPushButton(hflip_icon, 'Horizontally Flipped Icons')
+ # Vertical flip
+ vflip_icon = icon('mdi.account-alert', vflip=True)
+ vflip_button = QtWidgets.QPushButton(vflip_icon, 'Vertically Flipped Icons')
+ # Styling
+ styling_icon = icon('fa5s.music',
+ active='fa5s.balance-scale',
+ color='blue',
+ color_active='orange')
+ music_button = QtWidgets.QPushButton(styling_icon, 'Changing colors')
+ # Setting an alpha of 165 to the color of this icon. Alpha must be a number
+ # between 0 and 255.
+ icon_with_alpha = icon('mdi.heart', color=('red', 120))
+ heart_button = QtWidgets.QPushButton(icon_with_alpha, 'Setting alpha')
+ # Toggle
+ toggle_icon = icon('fa5s.home', selected='fa5s.balance-scale',
+ color_off='black',
+ color_off_active='blue',
+ color_on='orange',
+ color_on_active='yellow')
+ toggle_button = QtWidgets.QPushButton(toggle_icon, 'Toggle')
+ toggle_button.setCheckable(True)
+ iconwidget = IconWidget()
+ spin_icon = icon('mdi.loading', color='red',
+ animation=Spin(iconwidget))
+ iconwidget.setIcon(spin_icon)
+ iconwidget.setIconSize(QtCore.QSize(32, 32))
+ iconwidgetholder = QtWidgets.QWidget()
+ lo = QtWidgets.QHBoxLayout()
+ lo.addWidget(iconwidget)
+ lo.addWidget(QtWidgets.QLabel('IconWidget'))
+ iconwidgetholder.setLayout(lo)
+ iconwidget2 = IconWidget('mdi.web', color='blue', size=QtCore.QSize(16, 16))
+ # Icon drawn with the `image` option
+ drawn_image_icon = icon('ri.truck-fill',
+ options=[{'draw': 'image'}])
+ drawn_image_button = QtWidgets.QPushButton(drawn_image_icon,
+ 'Icon drawn as an image')
+ # Stack icons
+ camera_ban = icon('fa5s.camera', 'fa5s.ban',
+ options=[{'scale_factor': 0.5,
+ 'active': 'fa5s.balance-scale'},
+ {'color': 'red', 'opacity': 0.7}])
+ stack_button = QtWidgets.QPushButton(camera_ban, 'Stack')
+ stack_button.setIconSize(QtCore.QSize(32, 32))
+ # Stack and offset icons
+ saveall = icon('fa5.save', 'fa5.save',
+ options=[{'scale_factor': 0.8,
+ 'offset': (0.2, 0.2),
+ 'color': 'gray'},
+ {'scale_factor': 0.8}])
+ saveall_button = QtWidgets.QPushButton(saveall, 'Stack, offset')
+ # Spin icons
+ spin_button = QtWidgets.QPushButton(' Spinning icon')
+ animation1 = Spin(spin_button)
+ spin_icon = icon('fa5s.spinner', color='red', animation=animation1)
+ spin_button.setIcon(spin_icon)
+ timer1 = QtCore.QTimer()
+ timer1.singleShot(3000, animation1.stop)
+ # Pulse icons
+ pulse_button = QtWidgets.QPushButton(' Pulsing icon')
+ animation2 = Pulse(pulse_button, autostart=False)
+ pulse_icon = icon('fa5s.spinner', color='green',
+ animation=animation2)
+ pulse_button.setIcon(pulse_icon)
+ timer2 = QtCore.QTimer()
+ timer2.singleShot(1500, animation2.start)
+ timer3 = QtCore.QTimer()
+ timer3.singleShot(6000, animation2.stop)
+ # Stacked spin icons
+ stack_spin_button = QtWidgets.QPushButton('Stack spin')
+ options = [{'scale_factor': 0.4,
+ 'animation': Spin(stack_spin_button)},
+ {'color': 'blue'}]
+ stack_spin_icon = icon('ei.asl', 'fa5.square',
+ options=options)
+ stack_spin_button.setIcon(stack_spin_icon)
+ stack_spin_button.setIconSize(QtCore.QSize(32, 32))
+ # Render a label with this font
+ label = QtWidgets.QLabel(chr(0xf19c) + ' ' + 'Label')
+ label.setFont(font('fa', 16))
+ # Layout
+ grid = QtWidgets.QGridLayout()
+ fonts_widgets = [
+ supported_fonts_label,
+ fa5_button,
+ fa5s_button,
+ fa5b_button,
+ elusive_button,
+ mdi6_button,
+ ph_button,
+ ri_button,
+ msc_button,
+ ]
+ styled_widgets = [
+ styles_label,
+ music_button,
+ heart_button,
+ rot_button,
+ hflip_button,
+ vflip_button,
+ toggle_button,
+ drawn_image_button
+ ]
+ animated_widgets = [
+ spin_button,
+ pulse_button,
+ stack_button,
+ saveall_button,
+ stack_spin_button,
+ ]
+ other_widgets = [
+ label,
+ iconwidgetholder,
+ iconwidget2
+ ]
+ for idx, w in enumerate(fonts_widgets):
+ grid.addWidget(w, idx, 0)
+ for idx, w in enumerate(styled_widgets):
+ grid.addWidget(w, idx, 1)
+ for idx, w in enumerate(animated_widgets):
+ grid.addWidget(w, idx + len(styled_widgets), 1)
+ for idx, w in enumerate(other_widgets):
+ grid.addWidget(w, idx + len(styled_widgets) + len(animated_widgets), 1)
+ title = 'Awesome'
+ args = ' '.join(sys.argv[1:]).strip()
+ if args:
+ title += ' (' + args + ')'
+ self.setLayout(grid)
+ self.setWindowTitle(title)
+ self.setMinimumWidth(520)
+ self.show()
+def main():
+ global_defaults = {}
+ for arg in sys.argv[1:]:
+ try:
+ key, val = arg.split('=', maxsplit=1)
+ global_defaults[key] = val
+ except:
+ pass
+ if global_defaults:
+ set_global_defaults(**global_defaults)
+ app = QtWidgets.QApplication(sys.argv)
+ # Enable High DPI display with PyQt5
+ if hasattr(QtCore.Qt, 'AA_UseHighDpiPixmaps'):
+ app.setAttribute(QtCore.Qt.AA_UseHighDpiPixmaps)
+ # Timer needed to close the example application
+ # when testing
+ QtCore.QTimer.singleShot(10000, app.exit)
+ _ = AwesomeExample()
+ sys.exit(app.exec_())
+if __name__ == '__main__':
+ main()
\ No newline at end of file
import sys
-from PyQt5.QtWidgets import QApplication, QVBoxLayout, QWidget, QSlider, QLabel, QHBoxLayout
-from PyQt5.QtCore import Qt
import numpy as np
+from PyQt5.QtCore import Qt
+from PyQt5.QtWidgets import QApplication, QVBoxLayout, QWidget, QSlider, QLabel, \
+ QHBoxLayout
from vispy import scene
from vispy.scene import visuals
class OrthoViewWidget(QWidget):
def __init__(self, data, parent=None):
super(OrthoViewWidget, self).__init__(parent)
@@ -10,13 +10,9 @@ def test_omega_q_widget(make_napari_viewer, capsys):
viewer = make_napari_viewer()
viewer.add_image(np.random.random((100, 100)))
# create our widget, passing in the viewer
- my_widget = OmegaQWidget(viewer)
+ my_widget = OmegaQWidget(viewer, add_code_editor=False)
- # # call our widget method
- # my_widget._on_click()
- #
- # # read captured output and check that it's as we expected
- # captured = capsys.readouterr()
- # assert 'Omega' in captured.out
+ # read captured output and check that it's as we expected
+ captured = capsys.readouterr()
+ assert 'Omega' in captured.out
import traceback
from typing import TYPE_CHECKING, List
-from PyQt5.QtCore import Qt
-from PyQt5.QtWidgets import QApplication, QLabel, QCheckBox
-from PyQt5.QtWidgets import QVBoxLayout, QComboBox
from napari.viewer import Viewer
+from qtpy.QtCore import Qt
+from qtpy.QtWidgets import QApplication, QLabel, QCheckBox
from qtpy.QtWidgets import QPushButton, QWidget
+from qtpy.QtWidgets import QVBoxLayout, QComboBox
-from napari_chatgpt.chat_server.chat_server import NapariChatServer
+from microplugin.microplugin_window import MicroPluginMainWindow
from napari_chatgpt.utils.configuration.app_configuration import \
from napari_chatgpt.utils.ollama.ollama_server import is_ollama_running, \
@@ -24,6 +24,8 @@
from napari_chatgpt.utils.openai.model_list import get_openai_model_list
from napari_chatgpt.utils.python.installed_packages import \
+from napari_chatgpt.utils.qt.one_time_disclaimer_dialog import \
+ show_one_time_disclaimer_dialog
from napari_chatgpt.utils.qt.warning_dialog import show_warning_dialog
@@ -37,13 +39,15 @@
_creativity_mapping['moderately creative'] = 0.05
_creativity_mapping['creative'] = 0.1
+# Ensure the singleton pattern is on:
+MicroPluginMainWindow._singleton_pattern_active = True
class OmegaQWidget(QWidget):
# your QWidget.__init__ can optionally request the napari viewer instance
# in one of two ways:
# 1. use a parameter called `napari_viewer`, as done here
# 2. use a type annotation of 'napari.viewer.Viewer' for any parameter
- def __init__(self, napari_viewer):
+ def __init__(self, napari_viewer, add_code_editor=True):
aprint("OmegaQWidget instantiated!")
@@ -54,6 +58,7 @@ def __init__(self, napari_viewer):
self.viewer = napari_viewer
# Napari chat server instance:
+ from napari_chatgpt.chat_server.chat_server import NapariChatServer
self.server: NapariChatServer = None
# Create a QVBoxLayout instance
@@ -76,11 +81,28 @@ def __init__(self, napari_viewer):
+ # Instantiate the MicroPluginMainWindow:
+ if add_code_editor:
+ self.micro_plugin_main_window = MicroPluginMainWindow(napari_viewer=napari_viewer)
+ # Add the start Omega:
+ # Add the show editor button:
+ if add_code_editor:
+ self._show_editor_button()
# Set the layout on the application's window
+ # Make sure that when the viewer window closes this widget closes too:
+ try:
+ self.viewer.window._qt_window.destroyed.connect(self.close)
+ except Exception as e:
+ aprint(f"Error: {e}")
+ aprint("Could not connect to viewer's closed signal.")
+ traceback.print_exc()
def _model_selection(self):
aprint("Setting up model selection UI.")
@@ -305,7 +327,7 @@ def _autofix_widgets(self):
"When checked Omega will try to fix its own \n"
"coding mistakes when making widgets. \n"
"Works so-so with ChatGPT 3.5, but works well with ChatGPT 4.\n"
- "This involves a LLM call which can incur additional\n"
+ "This requires API calls which may incur additional\n"
"cost in time and possibly money.")
# Add the install_missing_packages checkbox to the layout:
@@ -322,8 +344,8 @@ def _tutorial_mode(self):
self.tutorial_mode_checkbox.setChecked(config.get('tutorial_mode_checkbox', False))
"When checked Omega will actively asks questions \n"
- "to clarify and disambiguate the request, and \n"
- "will propose multiple options and be didactic. ")
+ "to clarify and disambiguate the request, will propose \n"
+ "multiple options and try to be as didactic as possible. ")
# Add the install_missing_packages checkbox to the layout:
@@ -367,8 +389,8 @@ def _start_omega_button(self):
aprint("Setting up start Omega button UI.")
# Start Omega button:
- self.start_omega_button = QPushButton("Start Omega")
- self.start_omega_button.clicked.connect(self._on_click)
+ self.start_omega_button = QPushButton("Start Conversing with Omega")
+ self.start_omega_button.clicked.connect(self._start_omega)
"Start Omega, this will open a browser window.\n"
"You can restart Omega with new settings by\n"
@@ -377,10 +399,41 @@ def _start_omega_button(self):
# Omega button:
- def _on_click(self):
+ def _show_editor_button(self):
+ aprint("Setting up start Omega button UI.")
+ # Start Omega button:
+ self.show_editor_button = QPushButton("Show Omega's Code Editor")
+ self.show_editor_button.clicked.connect(self._show_editor)
+ self.show_editor_button.setToolTip(
+ "Shows Omega's microplugin code snippet editor. \n"
+ "All code generated by Omega is added to the editor\n"
+ "so you can conveniently find it again after restarting\n"
+ "napari, edit the code, reformat it, check for 'safety',\n"
+ "send it to colleagues across the local network, and\n"
+ "rerun the code. Running code for widgets adds the widget\n"
+ "back to the viewer.\n")
+ # Omega button:
+ self.layout.addWidget(self.show_editor_button)
+ def _start_omega(self):
with asection("Starting Omega now!"):
+ # First we show the Omega's disclaimer that explains that
+ # Omega is research software that can make changes to your data
+ # and machine if instructed to do so or if it misunderstands the
+ # requests.
+ show_one_time_disclaimer_dialog(
+ "Omega is research software that can make changes to your data "
+ "and machine if instructed to do so or if it misunderstands "
+ "your requests. It is not perfect and can make mistakes. "
+ "By clicking 'I agree' you acknowledge that you understand "
+ "the potential dangers and agree to use Omega at your own risk. "
+ "You can find more information about Omega's disclaimer "
+ "and terms of use at disclaimer."
+ )
# Stop previous instance if it exists:
if self.server:
aprint("Server already started")
@@ -433,6 +486,48 @@ def _on_click(self):
aprint("Omega failed to start. Please check the console for more information.")
+ def _show_editor(self):
+ try:
+ if not self.micro_plugin_main_window:
+ aprint("MicroPluginMainWindow not instantiated.")
+ return
+ with asection("Showing editor now!"):
+ # Set LLM parameters to self.micro_plugin_main_window:
+ self.micro_plugin_main_window.code_editor_widget.llm_model_name = self.model_combo_box.currentText()
+ # Show the editor:
+ self.micro_plugin_main_window.show()
+ # Make sure to bring the window to the front:
+ self.micro_plugin_main_window.raise_()
+ except Exception as e:
+ aprint(f"Error: {e}")
+ aprint("Omega failed to start. Please check the console for more information.")
+ traceback.print_exc()
+ def setStyleSheet(self, style):
+ # Set the stylesheet for the micro plugin main window:
+ if self.micro_plugin_main_window:
+ self.micro_plugin_main_window.setStyleSheet(style)
+ # Call the parent class method:
+ super().setStyleSheet(style)
+ def close(self):
+ if self.server:
+ self.server.stop()
+ if self.micro_plugin_main_window:
+ self.micro_plugin_main_window.hide()
+ self.micro_plugin_main_window.close()
+ super().close()
def main():
from time import sleep
import napari
-from PyQt5.QtCore import QTimer
from arbol import aprint, asection
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from fastapi.templating import Jinja2Templates
from langchain.memory import ConversationTokenBufferMemory, \
from langchain.schema import get_buffer_string, BaseMemory
+from qtpy.QtCore import QTimer
from starlette.staticfiles import StaticFiles
from uvicorn import Config, Server
@@ -26,7 +26,6 @@
from napari_chatgpt.llm.llms import instantiate_LLMs
from napari_chatgpt.omega.memory.memory import OmegaMemory
from napari_chatgpt.omega.napari_bridge import NapariBridge, _set_viewer_info
from napari_chatgpt.omega.omega_init import initialize_omega_agent
from napari_chatgpt.utils.api_keys.api_key import set_api_key
from napari_chatgpt.utils.configuration.app_configuration import \
@@ -69,7 +68,7 @@ def __init__(self,
# Instantiate FastAPI:
self.app = FastAPI()
- # get configuration
+ # Get configuration
config = AppConfiguration('omega')
# port:
