Skip to content

Commit

Permalink
Update Deployment Logic:
Browse files Browse the repository at this point in the history
- Kubernetes: Ensure GlobalWorker is deployed even in single worker deployments.
- Flower: Configure the server to run on the GlobalWorker.
  • Loading branch information
Kostas Filippopolitis committed Jul 15, 2024
1 parent cab4379 commit 2cb2b20
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 42 deletions.
40 changes: 32 additions & 8 deletions .github/workflows/prod_env_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ jobs:
GLOBAL_POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name')
kubectl exec $GLOBAL_POD_NAME -c db-importer -- sh -c 'mipdb add-data-model /opt/data/dementia_v_0_1/CDEsMetadata.json'
- name: Load dementia dataset csvs into localworkers and globalworker
- name: Load dementia dataset csvs into localworkers
run: |
for suffix in {0..9}; do
if [ $suffix -eq 0 ] || [ $suffix -eq 3 ] || [ $suffix -eq 6 ]; then
Expand All @@ -230,10 +230,19 @@ jobs:
POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[1].metadata.name")
else
POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name')
for dataset in edsd ppmi desd-synthdata; do
if [[ ! $dataset$suffix.csv == *test.csv ]]; then
kubectl exec $POD_NAME -c db-importer -- sh -c "mipdb add-dataset /opt/data/dementia_v_0_1/${dataset}${suffix}.csv -d dementia -v 0.1"
fi
done
fi
for dataset in edsd ppmi desd-synthdata; do
kubectl exec $POD_NAME -c db-importer -- sh -c "mipdb add-dataset /opt/data/dementia_v_0_1/${dataset}${suffix}.csv -d dementia -v 0.1"
done
done
- name: Load dementia test dataset csvs into globalworker
run: |
GLOBAL_POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name')
for dataset in edsd ppmi desd-synthdata; do
kubectl exec $GLOBAL_POD_NAME -c db-importer -- sh -c "mipdb add-dataset /opt/data/dementia_v_0_1/${dataset}-test.csv -d dementia -v 0.1"
done
- name: Load tbi data model into localworkers and globalworker
Expand All @@ -245,7 +254,7 @@ jobs:
GLOBAL_POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name')
kubectl exec $GLOBAL_POD_NAME -c db-importer -- sh -c 'mipdb add-data-model /opt/data/tbi_v_0_1/CDEsMetadata.json'
- name: Load tbi dataset csvs into localworkers and globalworker
- name: Load tbi dataset csvs into localworkers
run: |
for suffix in {0..9}; do
if [ $suffix -eq 0 ] || [ $suffix -eq 3 ] || [ $suffix -eq 6 ]; then
Expand All @@ -254,10 +263,17 @@ jobs:
POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[1].metadata.name")
else
POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name')
if [[ ! dummy_tbi$suffix.csv == *test.csv ]]; then
kubectl exec $POD_NAME -c db-importer -- sh -c "mipdb add-dataset /opt/data/tbi_v_0_1/dummy_tbi${suffix}.csv -d tbi -v 0.1"
fi
fi
kubectl exec $POD_NAME -c db-importer -- sh -c "mipdb add-dataset /opt/data/tbi_v_0_1/dummy_tbi${suffix}.csv -d tbi -v 0.1"
done
- name: Load tbi test dataset csvs into globalworker
run: |
GLOBAL_POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name')
kubectl exec $GLOBAL_POD_NAME -c db-importer -- sh -c "mipdb add-dataset /opt/data/tbi_v_0_1/dummy_tbi-test.csv -d tbi -v 0.1"
- name: Load longitudinal dementia data model into localworkers and globalworker
run: |
for i in 0 1; do
Expand All @@ -267,7 +283,7 @@ jobs:
GLOBAL_POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name')
kubectl exec $GLOBAL_POD_NAME -c db-importer -- sh -c 'mipdb add-data-model /opt/data/longitudinal_dementia_v_0_1/CDEsMetadata.json'
- name: Load longitudinal dementia datasets into localworkers and globalworker
- name: Load longitudinal dementia datasets into localworkers
run: |
for suffix in {0..2}; do
if [ $suffix -eq 0 ]; then
Expand All @@ -276,10 +292,18 @@ jobs:
POD_NAME=$(kubectl get pods -l=nodeType=localworker -o json | jq -r ".items[1].metadata.name")
else
POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name')
if [[ ! longitudinal_dementia$suffix.csv == *test.csv ]]; then
kubectl exec $POD_NAME -c db-importer -- sh -c "mipdb add-dataset /opt/data/longitudinal_dementia_v_0_1/longitudinal_dementia${suffix}.csv -d longitudinal_dementia -v 0.1"
fi
fi
kubectl exec $POD_NAME -c db-importer -- sh -c "mipdb add-dataset /opt/data/longitudinal_dementia_v_0_1/longitudinal_dementia${suffix}.csv -d longitudinal_dementia -v 0.1"
done
- name: Load longitudinal dementia test dataset csvs into globalworker
run: |
GLOBAL_POD_NAME=$(kubectl get pods -l=nodeType=globalworker -o json | jq -r '.items[0].metadata.name')
kubectl exec $GLOBAL_POD_NAME -c db-importer -- sh -c "mipdb add-dataset /opt/data/longitudinal_dementia_v_0_1/longitudinal_dementia-test.csv -d longitudinal_dementia -v 0.1"
- name: Controller logs
run: kubectl logs -l app=exareme2-controller --tail -1

Expand Down
16 changes: 5 additions & 11 deletions exareme2/controller/services/flower/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,12 @@ async def exec_algorithm(self, algorithm_name, algorithm_request_dto):
for worker in workers_info
]

server_task_handler, server_ip, server_id = (
task_handlers[0],
workers_info[0].ip,
workers_info[0].id,
global_worker = self.worker_landscape_aggregator.get_global_worker()
server_task_handler = self._create_worker_tasks_handler(
request_id, global_worker
)
if len(task_handlers) > 1:
global_worker = self.worker_landscape_aggregator.get_global_worker()
server_task_handler = self._create_worker_tasks_handler(
request_id, global_worker
)
server_ip = global_worker.ip
server_id = global_worker.id
server_ip = global_worker.ip
server_id = global_worker.id
# Garbage Collect
server_task_handler.garbage_collect()
for handler in task_handlers:
Expand Down
5 changes: 3 additions & 2 deletions kubernetes/templates/exareme2-globalnode.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{{ if gt .Values.localnodes 1.0}} # Single node deployment
apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down Expand Up @@ -26,6 +25,9 @@ spec:
- name: csv-data
hostPath:
path: {{ .Values.db.csvs_location }}
- name: credentials
hostPath:
path: {{ .Values.db.credentials_location }}
containers:
- name: monetdb
image: {{ .Values.exareme2_images.repository }}/exareme2_db:{{ .Values.exareme2_images.version }}
Expand Down Expand Up @@ -190,4 +192,3 @@ spec:
- exareme2.worker.healthcheck
periodSeconds: 30
timeoutSeconds: 10
{{end}}
24 changes: 13 additions & 11 deletions tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,19 @@ def load_datasets(
:param worker_id_and_ports: A list of tuples containing worker identifiers and ports.
:param use_sockets: Flag to determine if data will be loaded via sockets.
"""
if len(worker_id_and_ports) == 1:
worker_id, port = worker_id_and_ports[0]
for file in filenames:
if file.endswith(".csv") and not file.endswith("test.csv"):
csv = os.path.join(dirpath, file)
message(
f"Loading dataset {pathlib.PurePath(csv).name} in MonetDB at port {port}...",
Level.HEADER,
)
cmd = f"poetry run mipdb add-dataset {csv} -d {data_model_code} -v {data_model_version} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(worker_id)}"
run(c, cmd)
return

# Load the first set of CSVs into the first worker
first_worker_csvs = sorted(
[
Expand Down Expand Up @@ -581,17 +594,6 @@ def load_test_datasets(
if not local_worker_id_and_ports:
raise Exception("Local worker config files cannot be loaded.")

# If only one local worker is specified, load the entire folder to that worker
if len(local_worker_id_and_ports) == 1:
worker_id, port = local_worker_id_and_ports[0]
cmd = f"poetry run mipdb load-folder {TEST_DATA_FOLDER} --copy_from_file {not use_sockets} {get_monetdb_configs_in_mipdb_format(port)} {get_sqlite_path(worker_id)}"
message(
f"Loading the folder '{TEST_DATA_FOLDER}' in MonetDB at port {port}...",
Level.HEADER,
)
run(c, cmd)
return

# Process each dataset in the TEST_DATA_FOLDER for local workers
for dirpath, dirnames, filenames in os.walk(TEST_DATA_FOLDER):
if "CDEsMetadata.json" not in filenames:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ def test_logistic_regression(get_algorithm_result):
}
input["type"] = "flower"
algorithm_result = get_algorithm_result("logistic_regression", input)
print(algorithm_result)
assert algorithm_result == {"accuracy": 0.63} or algorithm_result == {
"accuracy": 0.3819241982507289
}
assert algorithm_result == {"accuracy": 0.63}


def test_logistic_regression_with_filters(get_algorithm_result):
Expand Down Expand Up @@ -69,7 +66,4 @@ def test_logistic_regression_with_filters(get_algorithm_result):
}
input["type"] = "flower"
algorithm_result = get_algorithm_result("logistic_regression", input)
print(algorithm_result)
assert algorithm_result == {"accuracy": 0.7884615384615384} or algorithm_result == {
"accuracy": 0.22443181818181818
}
assert algorithm_result == {"accuracy": 0.7884615384615384}
14 changes: 12 additions & 2 deletions tests/algorithm_validation_tests/one_node_deployment_template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,21 @@ optional=false
enabled = false

[[workers]]
id = "localworker1"
role="LOCALWORKER"
id = "globalworker"
role="GLOBALWORKER"
rabbitmq_port=5670
monetdb_port=50000
local_monetdb_username="executor"
local_monetdb_password="executor"
public_monetdb_username="guest"
public_monetdb_password="guest"

[[workers]]
id = "localworker1"
role="LOCALWORKER"
rabbitmq_port=5671
monetdb_port=50001
local_monetdb_username="executor"
local_monetdb_password="executor"
public_monetdb_username="guest"
public_monetdb_password="guest"
Binary file removed tests/test_data/globalworker.db
Binary file not shown.
Binary file removed tests/test_data/localworker1.db
Binary file not shown.
Binary file removed tests/test_data/localworker2.db
Binary file not shown.

0 comments on commit 2cb2b20

Please sign in to comment.