Skip to content

Commit

Permalink
Linting, reverted many test changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
Danidite committed Oct 31, 2024
1 parent a47f561 commit f2c54be
Show file tree
Hide file tree
Showing 15 changed files with 81 additions and 79 deletions.
8 changes: 4 additions & 4 deletions benchmarks/dna_visualization/.caribou/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ regions_and_providers:
allowed_regions:
- provider: "aws"
region: "us-east-1"
# - provider: "aws"
# region: "us-west-1"
# - provider: "aws"
# region: "us-west-2"
- provider: "aws"
region: "us-west-1"
- provider: "aws"
region: "us-west-2"
- provider: "aws"
region: "ca-central-1"
disallowed_regions:
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/dna_visualization/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


@workflow.serverless_function(
name="Visualize",
name="visualize",
entry_point=True,
)
def visualize(event: dict[str, Any]) -> dict[str, Any]:
Expand Down
8 changes: 4 additions & 4 deletions benchmarks/image_processing/.caribou/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ regions_and_providers:
allowed_regions:
- provider: "aws"
region: "us-east-1"
# - provider: "aws"
# region: "us-west-1"
# - provider: "aws"
# region: "us-west-2"
- provider: "aws"
region: "us-west-1"
- provider: "aws"
region: "us-west-2"
- provider: "aws"
region: "ca-central-1"
disallowed_regions:
Expand Down
10 changes: 6 additions & 4 deletions caribou/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,14 @@

# Log-Syncer parameters
## Forgetting factors
FORGETTING_TIME_DAYS = 130 # 30 days
FORGETTING_TIME_DAYS = 30 # 30 days
FORGETTING_NUMBER = 5000 # 5000 invocations
KEEP_ALIVE_DATA_COUNT = 10 # Keep sample it is part of any of the 10 samples for any execution or transmission
MIN_TIME_BETWEEN_SYNC = 15 # 15 Minutes
MIN_TIME_BETWEEN_SYNC = 15 # In Minutes

## Grace period for the log-syncer
## Used as lambda insights can be delayed
BUFFER_LAMBDA_INSIGHTS_GRACE_PERIOD = 15 # 15 minutes
BUFFER_LAMBDA_INSIGHTS_GRACE_PERIOD = 15 # In minutes

## Successor task types
REDIRECT_ONLY_TASK_TYPE = "REDIRECT_ONLY"
Expand All @@ -162,4 +162,6 @@
GO_PATH = pathlib.Path(__file__).parents[2].resolve() / "caribou-go"

# AWS Lambda Timeout
AWS_TIMEOUT_SECONDS = 800 # Lambda functions must terminate in 900 seconds, we leave 100 seconds as buffer time
AWS_TIMEOUT_SECONDS = (
800 # Lambda functions must terminate in 900 seconds, we leave some time as buffer time (For other operations)
)
2 changes: 1 addition & 1 deletion caribou/common/models/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ def get_datastore_client(self) -> RemoteClient:
return self._data_store_client

def get_framework_cli_remote_client(self) -> AWSRemoteClient:
return self._framework_cli_remote_client
return self._framework_cli_remote_client
12 changes: 5 additions & 7 deletions caribou/common/models/remote_client/aws_remote_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
CARIBOU_WORKFLOW_IMAGES_TABLE,
DEPLOYMENT_RESOURCES_BUCKET,
GLOBAL_SYSTEM_REGION,
REMOTE_CARIBOU_CLI_FUNCTION_NAME,
SYNC_MESSAGES_TABLE,
SYNC_PREDECESSOR_COUNTER_TABLE,
REMOTE_CARIBOU_CLI_FUNCTION_NAME
)
from caribou.common.models.remote_client.remote_client import RemoteClient
from caribou.common.utils import compress_json_str, decompress_json_str
Expand Down Expand Up @@ -1139,7 +1139,7 @@ def create_timer_rule(
Targets=[{"Id": f"{lambda_function_name}-target", "Arn": lambda_arn, "Input": event_payload}],
)

def invoke_remote_framework_internal_action(self, action_type: str, action_events: dict[Any]) -> None:
def invoke_remote_framework_internal_action(self, action_type: str, action_events: dict[str, Any]) -> None:
payload = {
"action": "internal_action",
"type": action_type,
Expand All @@ -1148,14 +1148,12 @@ def invoke_remote_framework_internal_action(self, action_type: str, action_event

self.invoke_remote_framework_with_payload(payload, invocation_type="Event")

def invoke_remote_framework_with_payload(self, payload: dict[Any], invocation_type: str = "Event") -> None:
def invoke_remote_framework_with_payload(self, payload: dict[str, Any], invocation_type: str = "Event") -> None:
# Get the boto3 lambda client
lambda_client = self._client("lambda")
remote_framework_cli_name = REMOTE_CARIBOU_CLI_FUNCTION_NAME

# Invoke the lambda function with the payload
lambda_client.invoke(
FunctionName=remote_framework_cli_name,
InvocationType=invocation_type,
Payload=json.dumps(payload)
)
FunctionName=remote_framework_cli_name, InvocationType=invocation_type, Payload=json.dumps(payload)
)
3 changes: 2 additions & 1 deletion caribou/common/models/remote_client/remote_client_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from caribou.common.models.remote_client.remote_client import RemoteClient
from caribou.common.provider import Provider


class RemoteClientFactory:
@staticmethod
def get_remote_client(provider: str, region: str) -> RemoteClient:
Expand All @@ -23,4 +24,4 @@ def get_remote_client(provider: str, region: str) -> RemoteClient:

@staticmethod
def get_framework_cli_remote_client(region: str) -> AWSRemoteClient:
return AWSRemoteClient(region)
return AWSRemoteClient(region)
30 changes: 14 additions & 16 deletions caribou/deployment/client/cli/cli.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import json
import os
from typing import Optional
from typing import Any, Optional

from caribou.common.models.endpoints import Endpoints
import click
from cron_descriptor import Options, get_description

from caribou.common.models.endpoints import Endpoints
from caribou.common.setup.setup_tables import main as setup_tables_func
from caribou.common.teardown.teardown_tables import main as teardown_tables_func
from caribou.data_collector.components.carbon.carbon_collector import CarbonCollector
Expand All @@ -15,17 +14,17 @@
from caribou.deployment.client import __version__ as CARIBOU_VERSION
from caribou.deployment.client.cli.new_workflow import create_new_workflow_directory
from caribou.deployment.client.remote_cli.remote_cli import (
action_type_to_function_name,
deploy_remote_framework,
get_all_available_timed_cli_functions,
get_all_default_timed_cli_functions,
get_cli_invoke_payload,
is_aws_framework_deployed,
remove_aws_timers,
remove_remote_framework,
report_timer_schedule_expression,
setup_aws_timers,
valid_framework_dir,
is_aws_framework_deployed,
get_cli_invoke_payload,
action_type_to_function_name,
)
from caribou.deployment.common.config.config import Config
from caribou.deployment.common.deploy.deployer import Deployer
Expand Down Expand Up @@ -90,22 +89,18 @@ def run(_: click.Context, argument: Optional[str], workflow_id: str) -> None:
@click.option("-r", "--remote", is_flag=True, help="Run the command on the remote framework.")
@click.pass_context
def data_collect(_: click.Context, collector: str, workflow_id: Optional[str], remote: bool) -> None:
if collector in ("workflow"):
if workflow_id is None:
raise click.ClickException("Workflow id must be provided for the workflow collector.")

if remote:
framework_cli_remote_client = Endpoints().get_framework_cli_remote_client()
framework_deployed: bool = is_aws_framework_deployed(framework_cli_remote_client, verbose=False)
if not framework_deployed:
raise click.ClickException("The remote framework is not deployed.")

print(f"Running {collector} collector on the remote framework.")

# Now we know the framework is deployed, we can run the command
action: str = "data_collect"
function_type: str = action_type_to_function_name(action)
event_payload = get_cli_invoke_payload(function_type)
event_payload: dict[str, Any] = get_cli_invoke_payload(function_type)

# Now add the collector and workflow_id
event_payload["collector"] = collector
Expand All @@ -126,7 +121,10 @@ def data_collect(_: click.Context, collector: str, workflow_id: Optional[str], r
print("Running performance collector")
performance_collector = PerformanceCollector()
performance_collector.run()
if collector in ("workflow"): # For workflow collector, we need to provide the workflow_id
if collector in ("workflow"):
if workflow_id is None:
raise click.ClickException("Workflow id must be provided for the workflow collector.")

print("Running workflow collector")
workflow_collector = WorkflowCollector()
workflow_collector.run_on_workflow(workflow_id)
Expand All @@ -140,7 +138,7 @@ def log_sync(remote: bool) -> None:
framework_deployed: bool = is_aws_framework_deployed(framework_cli_remote_client, verbose=False)
if not framework_deployed:
raise click.ClickException("The remote framework is not deployed.")

print("Running log syncer on the remote framework.")

# Now we know the framework is deployed, we can run the command
Expand All @@ -161,7 +159,7 @@ def manage_deployments(remote: bool) -> None:
framework_deployed: bool = is_aws_framework_deployed(framework_cli_remote_client, verbose=False)
if not framework_deployed:
raise click.ClickException("The remote framework is not deployed.")

print("Running deployment manager on the remote framework.")

# Now we know the framework is deployed, we can run the command
Expand Down Expand Up @@ -253,7 +251,7 @@ def remove(workflow_id: str, remote: bool) -> None:
framework_deployed: bool = is_aws_framework_deployed(framework_cli_remote_client, verbose=False)
if not framework_deployed:
raise click.ClickException("The remote framework is not deployed.")

print(f"Removing workflow {workflow_id} on the remote framework.")

# Now we know the framework is deployed, we can run the command
Expand Down
8 changes: 5 additions & 3 deletions caribou/deployment/client/remote_cli/remote_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,17 +256,18 @@ def get_cli_invoke_payload(function_name: str) -> dict[str, str]:

return function_name_to_payload[function_name]


def action_type_to_function_name(action_type: str) -> str:
'''
"""
Only for direct translations of action types to function names.
Aka: No custom logic or additional parameters nor data collection.
'''
"""
action_type_to_function_name = {
"log_sync": "log_syncer",
"manage_deployments": "deployment_manager",
"run_deployment_migrator": "deployment_migrator",
"data_collect": "data_collector",
"remove_workflow": "remove_workflow"
"remove_workflow": "remove_workflow",
}

function_name: Optional[str] = action_type_to_function_name.get(action_type, None)
Expand All @@ -276,6 +277,7 @@ def action_type_to_function_name(action_type: str) -> str:

return function_name


def setup_aws_timers(new_rules: list[tuple[str, str]]) -> None:
"""Create or update CloudWatch Event rules for Lambda functions."""
aws_remote_client = AWSRemoteClient(GLOBAL_SYSTEM_REGION)
Expand Down
32 changes: 13 additions & 19 deletions caribou/deployment/client/remote_cli/remote_cli_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def caribou_cli(event: dict[str, Any], context: dict[str, Any]) -> dict[str, Any
handler = action_handlers.get(action, handle_default)
return handler(event)


def handle_run_deployment_migrator(event: dict[str, Any]) -> dict[str, Any]: # pylint: disable=unused-argument
function_deployment_monitor = DeploymentMigrator(deployed_remotely=True)
function_deployment_monitor.check()
Expand Down Expand Up @@ -153,11 +154,11 @@ def handle_default(event: dict[str, Any]) -> dict[str, Any]: # pylint: disable=


def handle_internal_action(event: dict[str, Any]) -> dict[str, Any]:
'''
"""
Handle special actions that are not part of the standard CLI actions.
These actions are apart of internal operations and are not intended for
direct use by the user or timer.
'''
"""
action_type = event.get("type", None)
if not action_type:
logger.error("No action_type specified (Should never happen, please report this)!")
Expand All @@ -175,11 +176,13 @@ def handle_internal_action(event: dict[str, Any]) -> dict[str, Any]:
action_event = event.get("event", None)
return handler(action_event)


## Special action handlers (For specific actions that are not part of the standard CLI actions)
def _handle_default_special(event: dict[str, Any]) -> dict[str, Any]: # pylint: disable=unused-argument
logger.error("Unknown special action (Should never happen, please report this)!")
return {"status": 400, "message": "Unknown special action"}


def _handle_check_workflow(event: dict[str, Any]) -> dict[str, Any]:
workflow_id: Optional[str] = event.get("workflow_id", None)
if workflow_id is None:
Expand All @@ -193,10 +196,8 @@ def _handle_check_workflow(event: dict[str, Any]) -> dict[str, Any]:

deployment_manager = DeploymentManager(deployment_metrics_calculator_type, deployed_remotely=True)
deployment_manager.check_workflow(workflow_id)
return {
"status": 200,
"message": f"Workflow {workflow_id} checked"
}
return {"status": 200, "message": f"Workflow {workflow_id} checked"}


def _handle_run_deployment_algorithm(event: dict[str, Any]) -> dict[str, Any]:
deployment_metrics_calculator_type: Optional[str] = event.get("deployment_metrics_calculator_type", None)
Expand All @@ -213,18 +214,16 @@ def _handle_run_deployment_algorithm(event: dict[str, Any]) -> dict[str, Any]:
if solve_hours is None:
logger.error("No solve_hours specified")
return {"status": 400, "message": "No solve_hours specified"}

leftover_tokens: Optional[int] = event.get("leftover_tokens", None)
if leftover_tokens is None:
logger.error("No leftover_tokens specified")
return {"status": 400, "message": "No leftover_tokens specified"}

deployment_manager = DeploymentManager(deployment_metrics_calculator_type, deployed_remotely=True)
deployment_manager.run_deployment_algorithm(workflow_id, solve_hours, leftover_tokens)
return {
"status": 200,
"message": f"Deployment algorithm performed on {workflow_id}"
}
return {"status": 200, "message": f"Deployment algorithm performed on {workflow_id}"}


def _handle_re_deploy_workflow(event: dict[str, Any]) -> dict[str, Any]:
workflow_id: Optional[str] = event.get("workflow_id", None)
Expand All @@ -234,10 +233,8 @@ def _handle_re_deploy_workflow(event: dict[str, Any]) -> dict[str, Any]:

re_deployment_server = ReDeploymentServer(workflow_id)
re_deployment_server.run()
return {
"status": 200,
"message": f"Workflow {workflow_id} re-deployed"
}
return {"status": 200, "message": f"Workflow {workflow_id} re-deployed"}


def _handle_sync_workflow(event: dict[str, Any]) -> dict[str, Any]:
workflow_id: Optional[str] = event.get("workflow_id", None)
Expand All @@ -247,7 +244,4 @@ def _handle_sync_workflow(event: dict[str, Any]) -> dict[str, Any]:

log_syncer = LogSyncer(deployed_remotely=True)
log_syncer.sync_workflow(workflow_id)
return {
"status": 200,
"message": f"Workflow {workflow_id} synced"
}
return {"status": 200, "message": f"Workflow {workflow_id} synced"}
2 changes: 1 addition & 1 deletion caribou/deployment/common/deploy/deployment_packager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
import boto3
import pip
import yaml
import zstandard

import caribou
from caribou.common.models.remote_client.remote_client import RemoteClient
from caribou.deployment.common.config.config import Config
from caribou.deployment.common.deploy.models.workflow import Workflow
import zstandard


class DeploymentPackager:
Expand Down
4 changes: 3 additions & 1 deletion caribou/endpoint/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ def remove(self) -> None:
)

# Remove entry from the deployment manager workflow info table
self._endpoints.get_deployment_manager_client().remove_key(DEPLOYMENT_MANAGER_WORKFLOW_INFO_TABLE, self._workflow_id)
self._endpoints.get_deployment_manager_client().remove_key(
DEPLOYMENT_MANAGER_WORKFLOW_INFO_TABLE, self._workflow_id
)

# Remove entry from the workflow staging area table (Pending re-deployment queue)
self._endpoints.get_deployment_manager_client().remove_key(
Expand Down
Loading

0 comments on commit f2c54be

Please sign in to comment.