Skip to content

Commit

Permalink
chore: cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
phil65 committed Dec 26, 2024
1 parent 02e5e8c commit f61faf0
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 23 deletions.
12 changes: 10 additions & 2 deletions src/llmling/config/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,16 @@ def from_file(cls, path: str | os.PathLike[str]) -> Self:
msg = f"Failed to validate configuration from {path}"
raise exceptions.ConfigError(msg) from exc
else:
msg = "Loaded raw configuration: version=%s, resources=%d"
logger.debug(msg, config.version, len(config.resources))
logger.debug("Loaded raw configuration:")
msg = "version=%s, resources=%d, tools=%d, toolsets=%d, prompts=%d"
logger.debug(
msg,
config.version,
len(config.resources),
len(config.tools),
len(config.toolsets),
len(config.prompts),
)
return config


Expand Down
15 changes: 3 additions & 12 deletions src/llmling/core/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@ class ErrorStrategy(str, Enum):
RETRY = "retry" # Retry the step


class ExecutionMode(str, Enum):
SEQUENTIAL = "sequential" # Execute steps one after another
PARALLEL = "parallel" # Execute steps in parallel
CONCURRENT = "concurrent" # Execute steps as they become ready


class StepCondition(BaseModel):
"""Condition for conditional execution."""

Expand Down Expand Up @@ -98,7 +92,7 @@ class Pipeline(BaseModel):

input: str | dict[str, Any]
steps: list[PipelineStep]
mode: ExecutionMode = ExecutionMode.SEQUENTIAL
mode: Literal["sequential", "parallel"] = "sequential"
max_parallel: int = 5 # Max concurrent steps
collect_metrics: bool = False # Collect execution metrics

Expand Down Expand Up @@ -368,10 +362,7 @@ async def execute(self, **params: Any) -> Any:
results: StepResults = {}

match pipeline.mode:
case ExecutionMode.SEQUENTIAL:
case "sequential":
return await self._execute_sequential(pipeline, results)
case ExecutionMode.PARALLEL:
return await self._execute_parallel(pipeline, results)
case ExecutionMode.CONCURRENT:
# TODO: Implement dynamic scheduling based on dependencies
case "parallel":
return await self._execute_parallel(pipeline, results)
7 changes: 2 additions & 5 deletions src/llmling/processors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,8 @@ async def process(self, context: ProcessingContext) -> ProcessorResult:
content = str(result)
is_async = calling.is_async_callable(self._callable)
meta = {"function": self.config.import_path, "is_async": is_async}
return ProcessorResult(
content=content,
original_content=context.original_content,
metadata=meta,
)
orig = context.original_content
return ProcessorResult(content=content, original_content=orig, metadata=meta)
except Exception as exc:
msg = f"Processing failed: {exc}"
raise exceptions.ProcessorError(msg) from exc
Expand Down
7 changes: 3 additions & 4 deletions src/llmling/processors/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ def _validate_item(self, item: Any) -> BaseProcessor:
case ProcessorConfig():
return Processor(item) # Creates function-based processor
case _ if callable(item):
config = ProcessorConfig(
import_path=f"{item.__module__}.{item.__qualname__}",
async_execution=asyncio.iscoroutinefunction(item),
)
path = f"{item.__module__}.{item.__qualname__}"
is_coro = asyncio.iscoroutinefunction(item)
config = ProcessorConfig(import_path=path, async_execution=is_coro)
return Processor(config)
case _:
msg = f"Invalid processor type: {type(item)}"
Expand Down

0 comments on commit f61faf0

Please sign in to comment.