Skip to content

Commit

Permalink
Use run returned from pipeline dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
nathandf committed Nov 7, 2024
1 parent ffe4deb commit 0d1b0a9
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/api/src/backend/services/PipelineDispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def dispatch(self, service_request: dict, pipeline, pipeline_run=None):
logger.error(message)
logger.exception(e.__cause__)
raise ServerError(message=message)

return pipeline_run

def _uuid_convert(self, obj):
Expand Down
16 changes: 4 additions & 12 deletions src/api/src/backend/views/PipelineRuns.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from backend.models import PipelineRun, Pipeline, TERMINAL_STATUSES
from backend.helpers.PipelineDispatchRequestBuilder import PipelineDispatchRequestBuilder
from backend.services.PipelineDispatcher import service as pipeline_dispatcher
from backend.views.http.responses.models import ModelResponse
from backend.errors.api import ServerError
from backend.utils import logger
from backend.services.CredentialsService import service as credentials_service
Expand Down Expand Up @@ -76,25 +77,17 @@ def post(self, request, group_id, pipeline_id, pipeline_run_uuid):
run=pipeline_run,
)
# Dispatch the request
pipeline_dispatcher.dispatch(pipeline_dispatch_request, pipeline, pipeline_run=pipeline_run)
run = pipeline_dispatcher.dispatch(pipeline_dispatch_request, pipeline, pipeline_run=pipeline_run)
except ServerError as e:
return ServerErrorResp(message=str(e))
except Exception as e:
return ServerErrorResp(message=str(e))


# Format the started at and last_modified
run = model_to_dict(pipeline_run)

run["started_at"] = run["started_at"].strftime("%Y-%m-%d %H:%M:%S") if run["started_at"] else None
run["last_modified"] = run["last_modified"].strftime("%Y-%m-%d %H:%M:%S") if run["last_modified"] else None

return BaseResponse(
status=200,
success=True,
message="success",
result=run
)
# Respond with the pipeline run
return ModelResponse(run)

# TODO catch the specific error thrown by the group service
except (DatabaseError, IntegrityError, OperationalError) as e:
Expand All @@ -103,7 +96,6 @@ def post(self, request, group_id, pipeline_id, pipeline_run_uuid):
except Exception as e:
logger.exception(e.__cause__)
return ServerError(message=e)


def get(self, request, group_id, pipeline_id, pipeline_run_uuid=None, *_, **__):
try:
Expand Down
47 changes: 24 additions & 23 deletions src/engine/src/Server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@
DUPLICATE_SUBMISSION_POLICY_DEFER,
PLUGINS,
)
from utils import lbuffer_str
from owe_python_sdk.schema import WorkflowSubmissionRequest, EmptyObject

from workers import WorkerPool
from workflows import WorkflowExecutor
from utils import deserialize_message, load_plugins
from utils import deserialize_message, load_plugins, lbuffer_str
from errors import NoAvailableWorkers, WorkflowTerminated


Expand All @@ -56,16 +57,16 @@ def __call__(self):
workers, establishes a connection with RabbitMQ, creates the channel,
exchanges, and queues, and begins consuming from the inbound queue"""

logger.info(f"Starting server")
logger.info(f"{lbuffer_str('[SERVER]')} Starting server")

# Initialize plugins
logger.info(f"Loading plugins {PLUGINS}")
logger.info(f"{lbuffer_str('[SERVER]')} Loading plugins {PLUGINS}")
self.plugins = load_plugins(PLUGINS)

# Create a worker pool that consists of the workflow executors that will
# run the pipelines
# TODO catch error for worker classes that dont inherit from "Worker"
logger.info(f"Starting {STARTING_WORKERS} workers. Max workers ({MAX_WORKERS})")
logger.info(f"{lbuffer_str('[SERVER]')} Starting {STARTING_WORKERS} workers. Max workers ({MAX_WORKERS})")
self.worker_pool = WorkerPool(
worker_cls=WorkflowExecutor,
starting_worker_count=STARTING_WORKERS,
Expand All @@ -74,8 +75,8 @@ def __call__(self):
"plugins": self.plugins
}
)
logger.debug(f"Worker initialization complete")
logger.debug(f"Available workers ({self.worker_pool.count()})")
logger.debug(f"{lbuffer_str('[SERVER]')} Worker initialization complete")
logger.debug(f"{lbuffer_str('[SERVER]')} Available workers ({self.worker_pool.count()})")

# Connect to the message broker
connection = self._connect()
Expand All @@ -102,7 +103,7 @@ def __call__(self):
)
)

logger.debug(f"Server started and ready to recieve workflow submissions.")
logger.debug(f"{lbuffer_str('[SERVER]')} Server started and ready to recieve workflow submissions.")

channel.start_consuming()

Expand All @@ -111,17 +112,17 @@ def __call__(self):
thread.join()

connection.close()
logger.info(f"Closing connection to message broker")
logger.info(f"{lbuffer_str('[SERVER]')} Closing connection to message broker")

# Occurs when basic_consume recieves the wrong args
except ValueError as e:
logger.critical(f"Critical Workflow Executor Error: {e}")
logger.critical(f"{lbuffer_str('[SERVER]')} Critical Error: {e}")
# Cathes all ampq errors from .start_consuming()
except AMQPError as e:
logger.error(f"{e.__class__.__name__} - {e}")
logger.error(f"{lbuffer_str('[SERVER]')} {e.__class__.__name__} - {e}")
# Catch all other exceptions
except Exception as e:
logger.error(e)
logger.error(f"{lbuffer_str('[SERVER]')} {e}")

def _on_message_callback(self, channel, method, _, body, args):
'''
Expand All @@ -137,7 +138,7 @@ def _on_message_callback(self, channel, method, _, body, args):
try:
request = WorkflowSubmissionRequest(**deserialize_message(body))
except JSONDecodeError as e:
logger.error(e)
logger.error(f"{lbuffer_str('[SERVER]')} {e}")
channel.basic_reject(method.delivery_tag, requeue=False)
return

Expand All @@ -150,7 +151,7 @@ def _on_message_callback(self, channel, method, _, body, args):
for plugin in self.plugins:
request = plugin.dispatch("request", request)
except Exception as e:
logger.error(e)
logger.error(f"{lbuffer_str('[SERVER]')} {e}")
channel.basic_reject(method.delivery_tag, requeue=False)
return

Expand All @@ -163,7 +164,7 @@ def _on_message_callback(self, channel, method, _, body, args):
try:
worker = self.worker_pool.check_out()
except NoAvailableWorkers:
logger.info(f"Insufficient workers available. RETRYING (10s)")
logger.info(f"{lbuffer_str('[SERVER]')} Insufficient workers available. RETRYING (10s)")
connection.add_callback_threadsafe(
partial(
self._ack_nack,
Expand Down Expand Up @@ -213,7 +214,7 @@ def _dispatch(self, worker, request):
t.join()
except Exception as e:
# Deregister and return executor back to the worker pool
logger.error(e)
logger.error(f"{lbuffer_str('[SERVER]')} {e}")

# Handle TERMINATE directive
if "TERMINATE_RUN" in directives:
Expand Down Expand Up @@ -263,7 +264,7 @@ def _connect(self):
os.environ["BROKER_USER"], os.environ["BROKER_PASSWORD"])
)

logger.info(f"Connecting to message broker")
logger.info(f"{lbuffer_str('[SERVER]')} Connecting to message broker")

connected = False
connection_attempts = 0
Expand All @@ -273,15 +274,15 @@ def _connect(self):
connection = pika.BlockingConnection(connection_parameters)
connected = True
except Exception:
logger.info(f"Connection failed ({connection_attempts})")
logger.info(f"{lbuffer_str('[SERVER]')} Connection failed ({connection_attempts})")
time.sleep(CONNECTION_RETRY_DELAY)

# Kill the build service if unable to connect
if connected == False:
logger.critical(f"Error: Maximum connection attempts reached ({MAX_CONNECTION_ATTEMPTS}). Unable to connect to message broker.")
logger.critical(f"{lbuffer_str('[SERVER]')} Error: Maximum connection attempts reached ({MAX_CONNECTION_ATTEMPTS}). Unable to connect to message broker.")
sys.exit(1)

logger.info(f"Connected to message broker established")
logger.info(f"{lbuffer_str('[SERVER]')} Connection to message broker established")

return connection

Expand Down Expand Up @@ -310,7 +311,7 @@ def _register_worker(self, request, worker):
active_worker.terminate()
self._deregister_worker(active_worker, terminated=True)
elif policy == DUPLICATE_SUBMISSION_POLICY_DEFER:
logger.info(f"Warning: Duplicate Submission Policy of 'DEFER' not implemented. Handling as 'ALLOW'")
logger.info(f"{lbuffer_str('[SERVER]')} Warning: Duplicate Submission Policy of 'DEFER' not implemented. Handling as 'ALLOW'")
pass
elif policy == DUPLICATE_SUBMISSION_POLICY_ALLOW:
pass
Expand All @@ -337,7 +338,7 @@ def _declare_queue(self, channel, queue, exclusive=True):
try:
return channel.queue_declare(queue=queue, exclusive=exclusive)
except ChannelClosedByBroker as e:
logger.critical(f"Exclusive queue declaration error for queue '{queue}' | {e}")
logger.critical(f"{lbuffer_str('[SERVER]')} Exclusive queue declaration error for queue '{queue}' | {e}")
sys.exit(1)

def _resolve_idempotency_key(self, request):
Expand Down Expand Up @@ -380,9 +381,9 @@ def _resolve_idempotency_key(self, request):
idempotency_key = idempotency_key + part_delimiter + str(key_part)
return idempotency_key
except (AttributeError, TypeError) as e:
logger.info(f"Warning: Failed to resolve idempotency key from provided constraints. {str(e)}. Defaulted to pipeline run uuid '{default_idempotency_key}'")
logger.info(f"{lbuffer_str('[SERVER]')} Warning: Failed to resolve idempotency key from provided constraints. {str(e)}. Defaulted to pipeline run uuid '{default_idempotency_key}'")
except Exception as e:
logger.info(f"Any unknown error occured resolving idempotency key | {str(e)}. Defaulted to pipeline run uuid '{default_idempotency_key}'")
logger.info(f"{lbuffer_str('[SERVER]')} Any unknown error occured resolving idempotency key | {str(e)}. Defaulted to pipeline run uuid '{default_idempotency_key}'")

return default_idempotency_key

3 changes: 1 addition & 2 deletions src/engine/src/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import sys, logging

from Server import Server
from utils import lbuffer_str
from conf.constants import (
INBOUND_EXCHANGE,
INBOUND_QUEUE,
Expand All @@ -14,7 +13,7 @@

server_logger = logging.getLogger("server")
handler = logging.StreamHandler(stream=sys.stdout)
handler.setFormatter(logging.Formatter(f"{lbuffer_str('[SERVER]')} %(message)s"))
handler.setFormatter(logging.Formatter(f"%(message)s"))
server_logger.setLevel(logging.DEBUG)
server_logger.addHandler(handler)

Expand Down

0 comments on commit 0d1b0a9

Please sign in to comment.