Skip to content

Commit

Permalink
debug
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Oct 18, 2023
1 parent 3f0643e commit 3d4a262
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 5 deletions.
6 changes: 2 additions & 4 deletions src/engine/src/core/Server.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
from utils import bytes_to_json, load_plugins, lbuffer_str as lbuf
from errors import NoAvailableWorkers, WorkflowTerminated

from pprint import pprint

logger = logging.getLogger("server")

Expand Down Expand Up @@ -152,9 +151,8 @@ def _start_worker(self, body, connection, channel, delivery_tag):
acked = False # Indicates that the message as been acked
try:
# Decode the message body, then convert to an object.
json_request = json.loads(bytes_to_json(body))
pprint(json_request)
request = WorkflowSubmissionRequest(**json_request)
deserialized_request = json.loads(bytes_to_json(body))
request = WorkflowSubmissionRequest(**deserialized_request)

# Get a workflow executor worker. If there are none available,
# this will raise a "NoWorkersAvailabe" error which is handled
Expand Down
3 changes: 2 additions & 1 deletion src/engine/src/core/workflows/executors/WorkflowExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,10 +435,11 @@ def _set_tasks(self, tasks):
# Add all tasks to the queue
self.state.queue = [ task for task in self.state.tasks ]

@interceptable
@interceptable()
def _prepare_pipeline(self):
# Create all of the directories needed for the pipeline to run and persist results and cache
self._prepare_pipeline_fs()
print(self.state.ctx)

# template_mapper = TemplateMapper(cache_dir=self.state.ctx.pipeline.git_cache_dir)
# if self.state.ctx.pipeline.uses != None:
Expand Down

0 comments on commit 3d4a262

Please sign in to comment.