Skip to content

Commit

Permalink
Update Deployment Logic:
Browse files Browse the repository at this point in the history
- Kubernetes: Ensure GlobalWorker is deployed even in single worker deployments.
- Flower: Configure the server to run on the GlobalWorker.
  • Loading branch information
Kostas Filippopolitis committed Jul 15, 2024
1 parent cab4379 commit c122b0f
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 34 deletions.
16 changes: 5 additions & 11 deletions exareme2/controller/services/flower/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,12 @@ async def exec_algorithm(self, algorithm_name, algorithm_request_dto):
for worker in workers_info
]

server_task_handler, server_ip, server_id = (
task_handlers[0],
workers_info[0].ip,
workers_info[0].id,
global_worker = self.worker_landscape_aggregator.get_global_worker()
server_task_handler = self._create_worker_tasks_handler(
request_id, global_worker
)
if len(task_handlers) > 1:
global_worker = self.worker_landscape_aggregator.get_global_worker()
server_task_handler = self._create_worker_tasks_handler(
request_id, global_worker
)
server_ip = global_worker.ip
server_id = global_worker.id
server_ip = global_worker.ip
server_id = global_worker.id
# Garbage Collect
server_task_handler.garbage_collect()
for handler in task_handlers:
Expand Down
2 changes: 0 additions & 2 deletions kubernetes/templates/exareme2-globalnode.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{{ if gt .Values.localnodes 1.0}} # Single node deployment
apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down Expand Up @@ -190,4 +189,3 @@ spec:
- exareme2.worker.healthcheck
periodSeconds: 30
timeoutSeconds: 10
{{end}}
24 changes: 13 additions & 11 deletions tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,19 @@ def load_datasets(
:param worker_id_and_ports: A list of tuples containing worker identifiers and ports.
:param use_sockets: Flag to determine if data will be loaded via sockets.
"""
if len(worker_id_and_ports) == 1:
worker_id, port = worker_id_and_ports[0]
for file in filenames:
if file.endswith(".csv") and not file.endswith("test.csv"):
csv = os.path.join(dirpath, file)
message(
f"Loading dataset {pathlib.PurePath(csv).name} in MonetDB at port {port}...",
Level.HEADER,
)
cmd = f"poetry run mipdb add-dataset {csv} -d {data_model_code} -v {data_model_version} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(worker_id)}"
run(c, cmd)
return

# Load the first set of CSVs into the first worker
first_worker_csvs = sorted(
[
Expand Down Expand Up @@ -581,17 +594,6 @@ def load_test_datasets(
if not local_worker_id_and_ports:
raise Exception("Local worker config files cannot be loaded.")

# If only one local worker is specified, load the entire folder to that worker
if len(local_worker_id_and_ports) == 1:
worker_id, port = local_worker_id_and_ports[0]
cmd = f"poetry run mipdb load-folder {TEST_DATA_FOLDER} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(worker_id)}"
message(
f"Loading the folder '{TEST_DATA_FOLDER}' in MonetDB at port {port}...",
Level.HEADER,
)
run(c, cmd)
return

# Process each dataset in the TEST_DATA_FOLDER for local workers
for dirpath, dirnames, filenames in os.walk(TEST_DATA_FOLDER):
if "CDEsMetadata.json" not in filenames:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ def test_logistic_regression(get_algorithm_result):
}
input["type"] = "flower"
algorithm_result = get_algorithm_result("logistic_regression", input)
print(algorithm_result)
assert algorithm_result == {"accuracy": 0.63} or algorithm_result == {
"accuracy": 0.3819241982507289
}
assert algorithm_result == {"accuracy": 0.63}


def test_logistic_regression_with_filters(get_algorithm_result):
Expand Down Expand Up @@ -69,7 +66,4 @@ def test_logistic_regression_with_filters(get_algorithm_result):
}
input["type"] = "flower"
algorithm_result = get_algorithm_result("logistic_regression", input)
print(algorithm_result)
assert algorithm_result == {"accuracy": 0.7884615384615384} or algorithm_result == {
"accuracy": 0.22443181818181818
}
assert algorithm_result == {"accuracy": 0.7884615384615384}
14 changes: 12 additions & 2 deletions tests/algorithm_validation_tests/one_node_deployment_template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,21 @@ optional=false
enabled = false

[[workers]]
id = "localworker1"
role="LOCALWORKER"
id = "globalworker"
role="GLOBALWORKER"
rabbitmq_port=5670
monetdb_port=50000
local_monetdb_username="executor"
local_monetdb_password="executor"
public_monetdb_username="guest"
public_monetdb_password="guest"

[[workers]]
id = "localworker1"
role="LOCALWORKER"
rabbitmq_port=5671
monetdb_port=50001
local_monetdb_username="executor"
local_monetdb_password="executor"
public_monetdb_username="guest"
public_monetdb_password="guest"
Binary file modified tests/test_data/globalworker.db
Binary file not shown.
Binary file modified tests/test_data/localworker1.db
Binary file not shown.
Binary file modified tests/test_data/localworker2.db
Binary file not shown.

0 comments on commit c122b0f

Please sign in to comment.