Skip to content

Commit

Permalink
Modified
Browse files Browse the repository at this point in the history
Signed-off-by: noopur <[email protected]>
  • Loading branch information
noopurintel committed Dec 16, 2024
1 parent dd307cb commit 634e3f1
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 172 deletions.
40 changes: 1 addition & 39 deletions .github/workflows/tr_docker_gramine_direct.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,8 @@ jobs:
run: |
cd example_workspace
fx collaborator create -d 1 -n charlie --silent
fx collaborator create -d 2 -n delta --silent
fx collaborator generate-cert-request -n charlie --silent
fx collaborator generate-cert-request -n delta --silent
fx collaborator certify --request-pkg col_charlie_to_agg_cert_request.zip --silent
fx collaborator certify --request-pkg col_delta_to_agg_cert_request.zip --silent
# Pack the collaborator's private key, signed cert, and data.yaml into a tarball
tarfiles="plan/data.yaml agg_to_col_charlie_signed_cert.zip"
Expand All @@ -63,19 +60,9 @@ jobs:
tar -cf cert_col_charlie.tar $tarfiles
tarfiles="plan/data.yaml agg_to_col_delta_signed_cert.zip"
for entry in cert/client/*; do
if [[ "$entry" == *.key ]]; then
tarfiles="$tarfiles $entry"
fi
done
tar -cf cert_col_delta.tar $tarfiles
# Clean up
rm -f $tarfiles
rm -f col_charlie_to_agg_cert_request.zip
rm -f col_delta_to_agg_cert_request.zip
- name: Create signed cert for aggregator
run: |
Expand All @@ -100,7 +87,7 @@ jobs:
set -x
docker run --rm \
--network host --name aggregator \
--network host \
--security-opt seccomp=unconfined \
--mount type=bind,source=./cert_agg.tar,target=/certs.tar \
--env KERAS_HOME=/tmp \
Expand All @@ -113,28 +100,3 @@ jobs:
--mount type=bind,source=./cert_col_charlie.tar,target=/certs.tar \
--env KERAS_HOME=/tmp \
example_workspace bash -c "tar -xf /certs.tar && fx collaborator certify --import agg_to_col_charlie_signed_cert.zip && gramine-direct fx collaborator start -n charlie"
docker run --rm \
--network host \
--security-opt seccomp=unconfined \
--mount type=bind,source=./cert_col_delta.tar,target=/certs.tar \
--env KERAS_HOME=/tmp \
example_workspace bash -c "tar -xf /certs.tar && fx collaborator certify --import agg_to_col_delta_signed_cert.zip && gramine-direct fx collaborator start -n delta"
docker run --rm \
--network host \
--device=/dev/sgx_enclave \
-v /var/run/aesmd/aesm.socket:/var/run/aesmd/aesm.socket \
--security-opt seccomp=unconfined \
--mount type=bind,source=./cert_col_charlie.tar,target=/certs.tar \
--env KERAS_HOME=/tmp \
example_workspace bash -c "tar -xf /certs.tar && gramine-sgx fx collaborator certify --import agg_to_col_charlie_signed_cert.zip && gramine-direct fx collaborator start -n charlie"
docker run --rm \
--network host \
--device=/dev/sgx_enclave \
-v /var/run/aesmd/aesm.socket:/var/run/aesmd/aesm.socket \
--security-opt seccomp=unconfined \
--mount type=bind,source=./cert_col_delta.tar,target=/certs.tar \
--env KERAS_HOME=/tmp \
example_workspace bash -c "tar -xf /certs.tar && gramine-sgx fx collaborator certify --import agg_to_col_delta_signed_cert.zip && gramine-direct fx collaborator start -n delta"
73 changes: 21 additions & 52 deletions .github/workflows/tr_docker_native.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,99 +28,68 @@ jobs:
- name: Create workspace image
run: |
fx workspace create --prefix two_machines --template keras_cnn_mnist
cd two_machines
fx plan initialize -a aggregator
fx workspace dockerize --save --revision https://github.com/noopurintel/openfl.git@noopur/dockerize-ws
fx workspace create --prefix example_workspace --template keras_cnn_mnist
cd example_workspace
fx plan initialize -a localhost
fx workspace dockerize --save --revision https://github.com/${GITHUB_REPOSITORY}.git@${{ github.event.pull_request.head.sha }}
- name: Create certificate authority for workspace
run: |
cd two_machines
cd example_workspace
fx workspace certify
- name: Create signed cert for collaborator charlie
- name: Create signed cert for collaborator
run: |
cd two_machines
cd example_workspace
fx collaborator create -d 1 -n charlie --silent
fx collaborator generate-cert-request -n charlie --silent
fx collaborator certify --request-pkg col_charlie_to_agg_cert_request.zip --silent
# Pack the collaborator's private key, signed cert, and data.yaml into a tarball
tarfiles1="plan/data.yaml agg_to_col_charlie_signed_cert.zip"
tarfiles="plan/data.yaml agg_to_col_charlie_signed_cert.zip"
for entry in cert/client/*; do
if [[ "$entry" == *.key ]]; then
tarfiles1="$tarfiles1 $entry"
tarfiles="$tarfiles $entry"
fi
done
tar -cf cert_col_charlie.tar $tarfiles1
- name: Create signed cert for collaborator darwin
run: |
cd two_machines
fx collaborator create -d 2 -n darwin --silent
fx collaborator generate-cert-request -n darwin --silent
fx collaborator certify --request-pkg col_darwin_to_agg_cert_request.zip --silent
# Pack the collaborator's private key, signed cert, and data.yaml into a tarball
tarfiles2="plan/data.yaml agg_to_col_darwin_signed_cert.zip"
for entry in cert/client/*; do
if [[ "$entry" == *.key ]]; then
tarfiles2="$tarfiles2 $entry"
fi
done
# it will have key file for charlie as well
# todo - remove the key file for charlie
tar -cf cert_col_darwin.tar $tarfiles2
tar -cf cert_col_charlie.tar $tarfiles
- name: Cleanup files
run: |
# Clean up
rm -f $tarfiles1
rm -f $tarfiles2
rm -f $tarfiles
rm -f col_charlie_to_agg_cert_request.zip
rm -f col_darwin_to_agg_cert_request.zip
- name: Create signed cert for aggregator
run: |
cd two_machines
fx aggregator generate-cert-request --fqdn aggregator
fx aggregator certify --fqdn aggregator --silent
cd example_workspace
fx aggregator generate-cert-request --fqdn localhost
fx aggregator certify --fqdn localhost --silent
# Pack all files that aggregator needs to start training
tar -cf cert_agg.tar plan cert save
# Remove the directories after archiving
rm -rf plan cert save
- name: Load workspace image (on another machine, if applicable)
- name: Load workspace image
run: |
cd two_machines
# scp two_machines.tar and relevant cert tar file to the machine
docker load -i two_machines.tar
cd example_workspace
docker load -i example_workspace.tar
- name: Run aggregator and collaborator
run: |
cd two_machines
cd example_workspace
set -x
docker run --rm \
--network host --name aggregator \
--network host \
--mount type=bind,source=./cert_agg.tar,target=/certs.tar \
two_machines bash -c "tar -xf /certs.tar && fx aggregator start"
example_workspace bash -c "tar -xf /certs.tar && fx aggregator start" &
# TODO: Run with two collaborators instead.
docker run --rm \
--network host --name charlie \
--network host \
--mount type=bind,source=./cert_col_charlie.tar,target=/certs.tar \
two_machines bash -c "tar -xf /certs.tar && fx collaborator certify --import agg_to_col_charlie_signed_cert.zip && fx collaborator start -n charlie"
# Need to copy cert_col_darwin.tar to another machine
docker run --rm \
--network host --name darwin \
--mount type=bind,source=./cert_col_darwin.tar,target=/certs.tar \
two_machines bash -c "tar -xf /certs.tar && fx collaborator certify --import agg_to_col_darwin_signed_cert.zip && fx collaborator start -n darwin"
# EXCEPTION : [Errno 2] No such file or directory: 'cert/client/col_darwin.key'
example_workspace bash -c "tar -xf /certs.tar && fx collaborator certify --import agg_to_col_charlie_signed_cert.zip && fx collaborator start -n charlie"
4 changes: 0 additions & 4 deletions tests/end_to_end/models/model_owner.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,6 @@ def dockerize_workspace(self):
)
fh.verify_cmd_output(output, return_code, error, error_msg, "Workspace dockerized successfully")

return_code, output, error = ssh.run_command("docker images", work_dir=self.workspace_path)
if return_code != 0:
raise Exception(f"Failed to list the docker images: {error}")
log.info(f"List of docker images: {output}")
except Exception as e:
raise ex.WorkspaceDockerizationException(f"{error_msg}: {e}")

Expand Down
30 changes: 1 addition & 29 deletions tests/end_to_end/test_suites/task_runner_dockerized_ws_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,8 @@ def test_federation_via_dockerized_workspace(request, fx_federation_tr_dws):
request (Fixture): Pytest fixture
fx_federation (Fixture): Pytest fixture
"""
executor = concurrent.futures.ThreadPoolExecutor()
try:
results = [
executor.submit(
ssh.run_command,
cmd="tar -xf /certs.tar",
work_dir=participant.workspace_path,
)
for participant in [fx_federation_tr_dws.aggregator] + fx_federation_tr_dws.collaborators
]
if not all([f.result() for f in results]):
raise Exception("Failed to extract certificates for one or more participants")
except Exception as e:
raise e

try:
results = [
executor.submit(
collaborator.import_pki,
zip_name=f"agg_to_col_{collaborator.name}_signed_cert.zip"
)
for collaborator in fx_federation_tr_dws.collaborators
]
if not all([f.result() for f in results]):
raise Exception("Failed to import and certify the CSR for one or more collaborators")
except Exception as e:
raise e

# Start the federation
results = fed_helper.run_federation(fx_federation_tr_dws)
results = fed_helper.run_federation_for_dws(fx_federation_tr_dws, use_tls=request.config.use_tls)

# Verify the completion of the federation run
assert fed_helper.verify_federation_run_completion(fx_federation_tr_dws, results, request.config.num_rounds), "Federation completion failed"
62 changes: 36 additions & 26 deletions tests/end_to_end/utils/common_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,24 @@ def fx_federation_tr(request):
test_env = fh.get_test_env_from_markers(request)

if test_env not in ["task_runner_docker", "task_runner_basic"]:
raise ValueError("Use fx_federation_tr_dws for this test environment: task_runner_dockerized_ws")
raise ValueError(
"Use fx_federation_tr_dws for this test environment: task_runner_dockerized_ws"
)

collaborators = []
executor = concurrent.futures.ThreadPoolExecutor()

model_name, workspace_path, local_bind_path, agg_domain_name = fh.federation_env_setup_and_validate(request)
model_name, workspace_path, local_bind_path, agg_domain_name = (
fh.federation_env_setup_and_validate(request)
)

agg_workspace_path = constants.AGG_WORKSPACE_PATH.format(workspace_path)

# Create model owner object and the workspace for the model
# Workspace name will be same as the model name
model_owner = mo_model.ModelOwner(model_name, request.config.log_memory_usage, workspace_path=agg_workspace_path)
model_owner = mo_model.ModelOwner(
model_name, request.config.log_memory_usage, workspace_path=agg_workspace_path
)

# Create workspace for given model name
fh.create_persistent_store(model_owner.name, local_bind_path)
Expand Down Expand Up @@ -85,7 +91,7 @@ def fx_federation_tr(request):
aggregator = agg_model.Aggregator(
agg_domain_name=agg_domain_name,
workspace_path=agg_workspace_path,
container_id=model_owner.container_id, # None in case of non-docker environment
container_id=model_owner.container_id, # None in case of non-docker environment
)

# Generate the sign request and certify the aggregator in case of TLS
Expand Down Expand Up @@ -136,18 +142,24 @@ def fx_federation_tr_dws(request):
Note: As this is a function level fixture, thus no import is required at test level.
"""
if fh.get_test_env_from_markers(request) != "task_runner_dockerized_ws":
raise ValueError("Use fx_federation_tr_dws for this test environment: task_runner_dockerized_ws")
raise ValueError(
"Use fx_federation_tr_dws for this test environment: task_runner_dockerized_ws"
)

collaborators = []
executor = concurrent.futures.ThreadPoolExecutor()

model_name, workspace_path, local_bind_path, agg_domain_name = fh.federation_env_setup_and_validate(request)
model_name, workspace_path, local_bind_path, agg_domain_name = (
fh.federation_env_setup_and_validate(request)
)

agg_workspace_path = constants.AGG_WORKSPACE_PATH.format(workspace_path)

# Create model owner object and the workspace for the model
# Workspace name will be same as the model name
model_owner = mo_model.ModelOwner(model_name, request.config.log_memory_usage, workspace_path=agg_workspace_path)
model_owner = mo_model.ModelOwner(
model_name, request.config.log_memory_usage, workspace_path=agg_workspace_path
)

# Create workspace for given model name
fh.create_persistent_store(model_owner.name, local_bind_path)
Expand Down Expand Up @@ -177,7 +189,7 @@ def fx_federation_tr_dws(request):
aggregator = agg_model.Aggregator(
agg_domain_name=agg_domain_name,
workspace_path=agg_workspace_path,
container_id=model_owner.container_id, # None in case of non-docker environment
container_id=model_owner.container_id, # None in case of non-docker environment
)

futures = [
Expand All @@ -194,35 +206,33 @@ def fx_federation_tr_dws(request):
if request.config.use_tls:
fh.setup_pki_for_collaborators(collaborators, model_owner, local_bind_path)

fh.create_tarball_for_collaborators(collaborators, local_bind_path)
fh.create_tarball_for_collaborators(
collaborators, local_bind_path, use_tls=request.config.use_tls
)

# Generate the sign request and certify the aggregator in case of TLS
if request.config.use_tls:
aggregator.generate_sign_request()
model_owner.certify_aggregator(agg_domain_name)
local_agg_ws_path = constants.AGG_WORKSPACE_PATH.format(local_bind_path)
return_code, output, error = ssh.run_command(f"tar -cf cert_agg.tar plan cert save", work_dir=local_agg_ws_path)
if return_code != 0:
raise Exception(f"Failed to create tar for aggregator: {error}")

local_agg_ws_path = constants.AGG_WORKSPACE_PATH.format(local_bind_path)
return_code, output, error = ssh.run_command(
f"tar -cf cert_agg.tar plan cert save", work_dir=local_agg_ws_path
)
if return_code != 0:
raise Exception(f"Failed to create tar for aggregator: {error}")

# When no name is provided 'fx workspace dockerize --save ..' will use the last folder name
# which is workspace in this case for tar and image name.
image_name = "workspace"
model_owner.load_workspace(workspace_tar_name=f"{image_name}.tar")

futures = [
executor.submit(
dh.start_docker_container,
container_name=participant.name,
workspace_path=workspace_path,
local_bind_path=local_bind_path,
image=image_name,
mount_mapping=["cert_agg.tar:/certs.tar"] if participant.name == "aggregator" else [f"cert_col_{participant.name}.tar:/certs.tar"],
)
for participant in collaborators + [aggregator]
]
results = [f.result() for f in futures]
log.info(f"Result of starting docker containers: {results}")
fh.start_docker_containers_for_dws(
participants=[aggregator] + collaborators,
workspace_path=workspace_path,
local_bind_path=local_bind_path,
image_name=image_name,
)

# Return the federation fixture
return federation_fixture(
Expand Down
Loading

0 comments on commit 634e3f1

Please sign in to comment.