Skip to content

Commit

Permalink
Merge pull request #12 from pydn/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
pydn authored Aug 25, 2023
2 parents 48248e0 + 9549e3f commit a1d651b
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 33 deletions.
49 changes: 35 additions & 14 deletions comfyui_to_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,21 @@
import glob
import inspect
import json
import logging
import os
import random
import sys
import re
from typing import Dict, List, Any, Callable, Tuple

import black

from utils import import_custom_nodes, add_comfyui_directory_to_sys_path, get_value_at_index

sys.path.append('../')
from utils import import_custom_nodes, find_path, add_comfyui_directory_to_sys_path, add_extra_model_paths, get_value_at_index

sys.path.append('../')
from nodes import NODE_CLASS_MAPPINGS


logging.basicConfig(level=logging.INFO)


class FileHandler:
"""Handles reading and writing files.
Expand Down Expand Up @@ -217,7 +214,7 @@ def generate_workflow(self, load_order: List, filename: str = 'generated_code_wo
continue

class_type, import_statement, class_code = self.get_class_info(class_type)
initialized_objects[class_type] = class_type.lower().strip()
initialized_objects[class_type] = self.clean_variable_name(class_type)
if class_type in self.base_node_class_mappings.keys():
import_statements.add(import_statement)
if class_type not in self.base_node_class_mappings.keys():
Expand All @@ -234,9 +231,9 @@ def generate_workflow(self, load_order: List, filename: str = 'generated_code_wo
inputs['unique_id'] = random.randint(1, 2**64)

# Create executed variable and generate code
executed_variables[idx] = f'{class_type.lower().strip()}_{idx}'
executed_variables[idx] = f'{self.clean_variable_name(class_type)}_{idx}'
inputs = self.update_inputs(inputs, executed_variables)

if is_special_function:
special_functions_code.append(self.create_function_call_code(initialized_objects[class_type], class_def.FUNCTION, executed_variables[idx], is_special_function, **inputs))
else:
Expand Down Expand Up @@ -306,11 +303,11 @@ def assemble_python_code(self, import_statements: set, speical_functions_code: L
"""
# Get the source code of the utils functions as a string
func_strings = []
for func in [add_comfyui_directory_to_sys_path, get_value_at_index]:
for func in [get_value_at_index, find_path, add_comfyui_directory_to_sys_path, add_extra_model_paths]:
func_strings.append(f'\n{inspect.getsource(func)}')
# Define static import statements required for the script
static_imports = ['import os', 'import random', 'import sys', 'from typing import Sequence, Mapping, Any, Union',
'import torch'] + func_strings + ['\n\nadd_comfyui_directory_to_sys_path()']
'import torch'] + func_strings + ['\n\nadd_comfyui_directory_to_sys_path()\nadd_extra_model_paths()\n']
# Check if custom nodes should be included
if custom_nodes:
static_imports.append(f'\n{inspect.getsource(import_custom_nodes)}\n')
Expand All @@ -328,7 +325,7 @@ def assemble_python_code(self, import_statements: set, speical_functions_code: L
final_code = black.format_str(final_code, mode=black.Mode())

return final_code

def get_class_info(self, class_type: str) -> Tuple[str, str, str]:
"""Generates and returns necessary information about class type.
Expand All @@ -339,12 +336,36 @@ def get_class_info(self, class_type: str) -> Tuple[str, str, str]:
Tuple[str, str, str]: Updated class type, import statement string, class initialization code.
"""
import_statement = class_type
variable_name = self.clean_variable_name(class_type)
if class_type in self.base_node_class_mappings.keys():
class_code = f'{class_type.lower().strip()} = {class_type.strip()}()'
class_code = f'{variable_name} = {class_type.strip()}()'
else:
class_code = f'{class_type.lower().strip()} = NODE_CLASS_MAPPINGS["{class_type}"]()'
class_code = f'{variable_name} = NODE_CLASS_MAPPINGS["{class_type}"]()'

return class_type, import_statement, class_code

@staticmethod
def clean_variable_name(class_type: str) -> str:
"""
Remove any characters from variable name that could cause errors running the Python script.
Args:
class_type (str): Class type.
Returns:
str: Cleaned variable name with no special characters or spaces
"""
# Convert to lowercase and replace spaces with underscores
clean_name = class_type.lower().strip().replace("-", "_").replace(" ", "_")

# Remove characters that are not letters, numbers, or underscores
clean_name = re.sub(r'[^a-z0-9_]', '', clean_name)

# Ensure that it doesn't start with a number
if clean_name[0].isdigit():
clean_name = "_" + clean_name

return clean_name

def get_function_parameters(self, func: Callable) -> List:
"""Get the names of a function's parameters.
Expand Down
62 changes: 43 additions & 19 deletions utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,56 @@ def import_custom_nodes() -> None:
# Initializing custom nodes
init_custom_nodes()

def add_comfyui_directory_to_sys_path() -> None:

def find_path(name: str, path: str = None) -> str:
"""
Recursively looks at parent folders starting from the current working directory until it finds 'ComfyUI'.
Once found, the directory is added to sys.path.
Recursively looks at parent folders starting from the given path until it finds the given name.
Returns the path as a Path object if found, or None otherwise.
"""
start_path = os.getcwd() # Get the current working directory
# If no path is given, use the current working directory
if path is None:
path = os.getcwd()

# Check if the current directory contains the name
if name in os.listdir(path):
path_name = os.path.join(path, name)
print(f"{name} found: {path_name}")
return path_name

# Get the parent directory
parent_directory = os.path.dirname(path)

def search_directory(path: str) -> None:
# Check if the current directory contains 'ComfyUI'
if 'ComfyUI' in os.listdir(path):
directory_path = os.path.join(path, 'ComfyUI')
sys.path.append(directory_path)
print(f"ComfyUI found and added to sys.path: {directory_path}")
# If the parent directory is the same as the current directory, we've reached the root and stop the search
if parent_directory == path:
return None

# Get the parent directory
parent_directory = os.path.dirname(path)
# Recursively call the function with the parent directory
return find_path(name, parent_directory)

# If the parent directory is the same as the current directory, we've reached the root and stop the search
if parent_directory == path:
return

# Recursively call the function with the parent directory
search_directory(parent_directory)
def add_comfyui_directory_to_sys_path() -> None:
"""
Add 'ComfyUI' to the sys.path
"""
comfyui_path = find_path('ComfyUI')
if comfyui_path is not None and os.path.isdir(comfyui_path):
sys.path.append(comfyui_path)
print(f"'{comfyui_path}' added to sys.path")

# Start the search from the current working directory
search_directory(start_path)

def add_extra_model_paths() -> None:
"""
Parse the optional extra_model_paths.yaml file and add the parsed paths to the sys.path.
"""
from main import load_extra_path_config

extra_model_paths = find_path("extra_model_paths.yaml")

if extra_model_paths is not None:
load_extra_path_config(extra_model_paths)
else:
print("Could not find the extra_model_paths config file.")



def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any:
Expand Down

0 comments on commit a1d651b

Please sign in to comment.