Skip to content

Commit

Permalink
Added an environmental variable for controller_port.
Browse files Browse the repository at this point in the history
  • Loading branch information
KFilippopolitis committed May 28, 2024
1 parent b75e75f commit c687ab1
Show file tree
Hide file tree
Showing 22 changed files with 77 additions and 31 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
ip = "172.17.0.1"
log_level = "DEBUG"
framework_log_level ="INFO"
controller_port = 5000
monetdb_image = "madgik/exareme2_db:dev"
rabbitmq_image = "madgik/exareme2_rabbitmq:dev"
Expand Down
53 changes: 41 additions & 12 deletions exareme2/algorithms/flower/flower_data_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@
import pymonetdb
import requests
from pydantic import BaseModel
from pydantic import ValidationError
from sklearn import preprocessing
from sklearn.impute import SimpleImputer

# Constants for project directories and environment configurations
PROJECT_ROOT = Path(__file__).resolve().parents[3]
CONTROLLER_PORT = os.getenv("CONTROLLER_PORT", 5000)
RESULT_URL = f"http://127.0.0.1:{CONTROLLER_PORT}/flower/result"
INPUT_URL = f"http://127.0.0.1:{CONTROLLER_PORT}/flower/input"
CDES_URL = f"http://127.0.0.1:{CONTROLLER_PORT}/cdes_metadata"
HEADERS = {"Content-type": "application/json", "Accept": "text/plain"}


class Inputdata(BaseModel):
Expand Down Expand Up @@ -80,18 +86,41 @@ def preprocess_data(inputdata, full_data):


def post_result(result: dict) -> None:
url = "http://127.0.0.1:5000/flower/result"
headers = {"Content-type": "application/json", "Accept": "text/plain"}
requests.post(url, data=json.dumps(result), headers=headers)
try:
response = requests.post(RESULT_URL, data=json.dumps(result), headers=HEADERS)
response.raise_for_status()
except requests.RequestException as e:
error_msg = {"error": str(e)}
requests.post(RESULT_URL, data=json.dumps(error_msg), headers=HEADERS)


def get_input() -> Inputdata:
response = requests.get("http://127.0.0.1:5000/flower/input")
return Inputdata.parse_raw(response.text)


def get_enumerations(data_model, variable_name):
response = requests.get("http://127.0.0.1:5000/cdes_metadata")
cdes_metadata = json.loads(response.text)
enumerations = cdes_metadata[data_model][variable_name]["enumerations"]
return [code for code, label in enumerations.items()]
try:
response = requests.get(INPUT_URL)
response.raise_for_status()
input_data = Inputdata.parse_raw(response.text)
return input_data
except (requests.RequestException, json.JSONDecodeError, ValidationError) as e:
error_msg = {"error": str(e)}
requests.post(RESULT_URL, data=json.dumps(error_msg), headers=HEADERS)


def get_enumerations(data_model: str, variable_name: str) -> list:
try:
response = requests.get(CDES_URL)
response.raise_for_status()
cdes_metadata = response.json()
if data_model not in cdes_metadata:
raise KeyError(f"'{data_model}' key not found in cdes_metadata")

if variable_name not in cdes_metadata[data_model]:
raise KeyError(f"'{variable_name}' key not found in {data_model}")

enumerations = cdes_metadata[data_model][variable_name].get("enumerations")
if enumerations is not None:
return [code for code, label in enumerations.items()]
else:
raise KeyError(f"'enumerations' key not found in {variable_name}")
except (requests.RequestException, KeyError, json.JSONDecodeError) as e:
error_msg = {"error": str(e)}
requests.post(RESULT_URL, data=json.dumps(error_msg), headers=HEADERS)
1 change: 1 addition & 0 deletions exareme2/worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ WORKER_IDENTIFIER=globalworker
WORKER_ROLE=GLOBALWORKER
LOG_LEVEL=INFO
FRAMEWORK_LOG_LEVEL=INFO
CONTROLLER_PORT=5000
PROTECT_LOCAL_DATA=false
RABBITMQ_IP=172.17.0.1
RABBITMQ_PORT=5670
Expand Down
1 change: 1 addition & 0 deletions exareme2/worker/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ role = "$WORKER_ROLE"

log_level = "$LOG_LEVEL"
framework_log_level = "$FRAMEWORK_LOG_LEVEL"
controller_port = "$CONTROLLER_PORT"

[privacy]
minimum_row_count = 10
Expand Down
33 changes: 14 additions & 19 deletions exareme2/worker/flower/starter/flower_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from exareme2 import ALGORITHM_FOLDERS
from exareme2.algorithms.flower.process_manager import FlowerProcess
from exareme2.worker import config as worker_config
from exareme2.worker.utils.logger import get_logger
Expand All @@ -10,7 +9,7 @@


@initialise_logger
def start_flower_client(request_id: str, algorithm_name, worker_id) -> int:
def start_flower_client(request_id: str, algorithm_name) -> int:
env_vars = {
"MONETDB_IP": worker_config.monetdb.ip,
"MONETDB_PORT": worker_config.monetdb.port,
Expand All @@ -19,35 +18,31 @@ def start_flower_client(request_id: str, algorithm_name, worker_id) -> int:
"MONETDB_DB": worker_config.monetdb.database,
"SERVER_ADDRESS": SERVER_ADDRESS,
"NUMBER_OF_CLIENTS": worker_config.monetdb.database,
"CONTROLLER_PORT": worker_config.controller_port,
}
with open(f"/tmp/exareme2/{worker_id}.out", "a") as f:
process = FlowerProcess(
f"{algorithm_name}/client.py", env_vars=env_vars, stderr=f, stdout=f
)
running_processes[request_id] = process
logger = get_logger()
process = FlowerProcess(f"{algorithm_name}/client.py", env_vars=env_vars)
running_processes[request_id] = process
logger = get_logger()

logger.info("Starting client.py")
pid = process.start(logger)
logger.info("Starting client.py")
pid = process.start(logger)
logger.info(f"Started client.py process id: {pid}")
return pid


@initialise_logger
def start_flower_server(
request_id: str, algorithm_name: str, number_of_clients: int, worker_id
request_id: str, algorithm_name: str, number_of_clients: int
) -> int:
env_vars = {
"SERVER_ADDRESS": SERVER_ADDRESS,
"NUMBER_OF_CLIENTS": number_of_clients,
"CONTROLLER_PORT": worker_config.controller_port,
}
with open(f"/tmp/exareme2/{worker_id}.out", "a") as f:
process = FlowerProcess(
f"{algorithm_name}/server.py", env_vars=env_vars, stderr=f, stdout=f
)
running_processes[request_id] = process
logger = get_logger()
logger.info("Starting server.py")
pid = process.start(logger)
process = FlowerProcess(f"{algorithm_name}/server.py", env_vars=env_vars)
running_processes[request_id] = process
logger = get_logger()
logger.info("Starting server.py")
pid = process.start(logger)
logger.info(f"Started server.py process id: {pid}")
return pid
2 changes: 2 additions & 0 deletions kubernetes/templates/exareme2-globalnode.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ spec:
value: {{ .Values.log_level }}
- name: FRAMEWORK_LOG_LEVEL
value: {{ .Values.framework_log_level }}
- name: CONTROLLER_PORT
value: {{ .Values.controller_port }}
- name: PROTECT_LOCAL_DATA
value: "false" # The GLOBALWORKER does not need to secure its data, since they are not private.
- name: CELERY_TASKS_TIMEOUT
Expand Down
2 changes: 2 additions & 0 deletions kubernetes/templates/exareme2-localnode.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ spec:
value: {{ .Values.log_level }}
- name: FRAMEWORK_LOG_LEVEL
value: {{ .Values.framework_log_level }}
- name: CONTROLLER_PORT
value: {{ .Values.controller_port }}
- name: PROTECT_LOCAL_DATA
value: "true"
- name: CELERY_TASKS_TIMEOUT
Expand Down
1 change: 1 addition & 0 deletions kubernetes/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ exareme2_images:

log_level: INFO
framework_log_level: ERROR
controller_port: 5000

max_concurrent_experiments: 32

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
ip = "172.17.0.1"
log_level = "INFO"
framework_log_level ="INFO"
controller_port = 5000
monetdb_image = "madgik/exareme2_db:testing"
rabbitmq_image = "madgik/exareme2_rabbitmq:testing"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
ip = "172.17.0.1"
log_level = "INFO"
framework_log_level ="INFO"
controller_port = 5000
monetdb_image = "madgik/exareme2_db:testing"
rabbitmq_image = "madgik/exareme2_rabbitmq:testing"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ exareme2_images:

log_level: DEBUG
framework_log_level: INFO
controller_port : 5000

max_concurrent_experiments: 32

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ exareme2_images:

log_level: DEBUG
framework_log_level: INFO
controller_port : 5000

db:
credentials_location: /opt/exareme2/credentials
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
identifier = "testsmpcglobalworker"
log_level = "DEBUG"
framework_log_level = "INFO"
controller_port = 4501
role = "GLOBALWORKER"
monetdb_nclients = 64

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
identifier = "testsmpclocalworker1"
log_level = "DEBUG"
framework_log_level = "INFO"
controller_port = 4501
role = "LOCALWORKER"
monetdb_nclients = 64

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
identifier = "testsmpclocalworker2"
log_level = "DEBUG"
framework_log_level = "INFO"
controller_port = 4501
role = "LOCALWORKER"
monetdb_nclients = 64

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
identifier = "testglobalworker"
log_level = "DEBUG"
framework_log_level = "INFO"
controller_port = 4500
role = "GLOBALWORKER"
monetdb_nclients = 128

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
identifier = "testlocalworker1"
log_level = "DEBUG"
framework_log_level = "INFO"
controller_port = 4500
role = "LOCALWORKER"
monetdb_nclients = 64

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
identifier = "testlocalworker2"
log_level = "DEBUG"
framework_log_level = "INFO"
controller_port = 4500
role = "LOCALWORKER"
monetdb_nclients = 64

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
identifier = "testlocalworkertmp"
log_level = "DEBUG"
framework_log_level = "INFO"
controller_port = 4500
role = "LOCALWORKER"
monetdb_nclients = 64

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
identifier = "testsmpcglobalworker"
log_level = "DEBUG"
framework_log_level = "INFO"
controller_port = 4501
role = "GLOBALWORKER"
monetdb_nclients = 64

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
identifier = "testsmpclocalworker1"
log_level = "DEBUG"
framework_log_level = "INFO"
controller_port = 4501
role = "LOCALWORKER"
monetdb_nclients = 64

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
identifier = "testsmpclocalworker2"
log_level = "DEBUG"
framework_log_level = "INFO"
controller_port = 4501
role = "LOCALWORKER"
monetdb_nclients = 64

Expand Down

0 comments on commit c687ab1

Please sign in to comment.