Skip to content

Commit

Permalink
Refactor fx workspace export and fx workspace dockerize
Browse files Browse the repository at this point in the history
Signed-off-by: Shah, Karan <[email protected]>
  • Loading branch information
MasterSkepticista committed Oct 22, 2024
1 parent a95d104 commit f2acf53
Showing 1 changed file with 130 additions and 140 deletions.
270 changes: 130 additions & 140 deletions openfl/interface/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from sys import executable
from typing import Tuple, Union

import docker
from click import Choice
from click import Path as ClickPath
from click import echo, group, option, pass_context
Expand Down Expand Up @@ -169,71 +168,6 @@ def create(prefix, template):
print_tree(prefix, level=3)


@workspace.command(name="export")
def export_() -> str:
"""
Exports the OpenFL workspace (in current directory)
to an archive.
\b
The archive contains the following files/dirs copied as-is:
- `src`: All experiment source code.
- `plan`: The FL plan.
- `save`: Model initial weights.
- `requirements.txt`: Package list required for the experiment.
This archive does *not* copy `data` or `logs` directories. It creates
empty placeholders which can be mounted or populated by respective collaborators.
This command takes no arguments.
"""
plan_file = os.path.abspath(os.path.join("plan", "plan.yaml"))
if not os.path.isfile(plan_file):
raise FileNotFoundError(
f"{plan_file} does not exist in the current directory.\n"
"Please ensure this command is being run from a workspace."
)
plan.freeze_plan(plan_file)

# Create a staging area.
_IGNORE_FILE_PATTERNS = [
"__pycache__",
"*.crt",
"*.key",
"*.csr",
"*.srl",
"*.pem",
"*.pbuf",
]
workspace_name = os.path.basename(os.getcwd())
tmp_dir = os.path.join(tempfile.mkdtemp(), "openfl", workspace_name)
ignore = shutil.ignore_patterns(*_IGNORE_FILE_PATTERNS)

# Export the minimum required files to set up a collaborator
# os.makedirs(os.path.join(tmp_dir, 'save'), exist_ok=True)
os.makedirs(os.path.join(tmp_dir, "logs"), exist_ok=True)
os.makedirs(os.path.join(tmp_dir, "data"), exist_ok=True)
shutil.copytree("src", os.path.join(tmp_dir, "src"), ignore=ignore)
shutil.copytree("plan", os.path.join(tmp_dir, "plan"), ignore=ignore)
shutil.copytree("save", os.path.join(tmp_dir, "save"))
shutil.copy2("requirements.txt", os.path.join(tmp_dir, "requirements.txt"))

_ws_identifier_file = ".workspace"
if not os.path.isfile(_ws_identifier_file):
openfl_ws_identifier_file = os.path.join(WORKSPACE, "workspace", _ws_identifier_file)
logging.warning(
f"`{_ws_identifier_file}` is missing, " f"copying {openfl_ws_identifier_file} as-is."
)
shutil.copy2(openfl_ws_identifier_file, tmp_dir)
shutil.copy2(_ws_identifier_file, tmp_dir)

# Create Zip archive of directory
_ARCHIVE_FORMAT = "zip"
shutil.make_archive(workspace_name, _ARCHIVE_FORMAT, tmp_dir)
archive = f"{workspace_name}.{_ARCHIVE_FORMAT}"
logging.info(f"Export: {archive} created")


@workspace.command(name="import")
@option(
"--archive",
Expand Down Expand Up @@ -406,89 +340,117 @@ def _get_dir_hash(path):
return hash_


# Commands for workspace packaging and distribution
# -------------------------------------------------

### fx workspace export
@workspace.command(name="export")
def export_() -> str:
"""
Exports the OpenFL workspace (in current directory)
to an archive.
\b
The archive contains the following files/dirs copied as-is:
- `src`: All experiment source code.
- `plan`: The FL plan directory.
- `save`: Model initial weights.
- `requirements.txt`: Package list required for the experiment.
This archive does *not* copy `data`, `logs`, or secrets.
This command takes no arguments.
"""
plan_file = os.path.abspath(os.path.join("plan", "plan.yaml"))
if not os.path.isfile(plan_file):
raise FileNotFoundError(
f"{plan_file} does not exist in the current directory.\n"
"Please ensure this command is being run from a workspace."
)
plan.freeze_plan(plan_file)

# Create a staging area.
workspace_name = os.path.basename(os.getcwd())
tmp_dir = os.path.join(tempfile.mkdtemp(), "openfl", workspace_name)
ignore = shutil.ignore_patterns(
*["__pycache__", "*.crt", "*.key", "*.csr", "*.srl", "*.pem", "*.pbuf"]
)

# Export the minimum required files to set up a collaborator
# os.makedirs(os.path.join(tmp_dir, 'save'), exist_ok=True)
os.makedirs(os.path.join(tmp_dir, "logs"), exist_ok=True)
os.makedirs(os.path.join(tmp_dir, "data"), exist_ok=True)
shutil.copytree("src", os.path.join(tmp_dir, "src"), ignore=ignore)
shutil.copytree("plan", os.path.join(tmp_dir, "plan"), ignore=ignore)
shutil.copytree("save", os.path.join(tmp_dir, "save"))
shutil.copy2("requirements.txt", os.path.join(tmp_dir, "requirements.txt"))

_ws_identifier_file = ".workspace"
if not os.path.isfile(_ws_identifier_file):
openfl_ws_identifier_file = os.path.join(WORKSPACE, "workspace", _ws_identifier_file)
logging.warning(
f"`{_ws_identifier_file}` is missing, " f"copying {openfl_ws_identifier_file} as-is."
)
shutil.copy2(openfl_ws_identifier_file, tmp_dir)
shutil.copy2(_ws_identifier_file, tmp_dir)

# Create Zip archive of directory
_ARCHIVE_FORMAT = "zip"
shutil.make_archive(workspace_name, _ARCHIVE_FORMAT, tmp_dir)
archive = f"{workspace_name}.{_ARCHIVE_FORMAT}"
logging.info(f"Export: {archive} created")
return archive

### fx workspace dockerize
@workspace.command(name="dockerize")
@option(
"-b",
"--base_image",
"--save",
required=False,
help="The tag for openfl base image",
default="openfl",
)
@option(
"--save/--no-save",
required=False,
help="Save the Docker image into the workspace",
help="Export the docker image as <workspace_name>.tar file.",
default=True,
)
@pass_context
def dockerize_(context, base_image, save):
"""Pack the workspace as a Docker image.
This command is the alternative to `workspace export`.
It should be called after plan initialization from the workspace dir.
User is expected to be in docker group.
If your machine is behind a proxy, make sure you set it up in
~/.docker/config.json.
Args:
context: The context in which the command is being invoked.
base_image (str): The tag for openfl base image.
save (bool): Whether to save the Docker image into the workspace.
"""

# Specify the Dockerfile.workspace loaction
openfl_docker_dir = os.path.join(SITEPACKS, "openfl-docker")
dockerfile_workspace = "Dockerfile.workspace"
# Apparently, docker's python package does not support
# scenarios when the dockerfile is placed outside the build context
copyfile(
os.path.join(openfl_docker_dir, dockerfile_workspace),
dockerfile_workspace,
def dockerize_(context, save):
"""Package current workspace as a Docker image."""

# Build OpenFL base image.
base_image_build_cmd = (
"DOCKER_BUILDKIT=1 docker build {options} "
"-t {image_name} "
"-f {dockerfile} "
"{build_context}"
).format(
options="",
image_name="openfl",
dockerfile=os.path.join(SITEPACKS, "openfl-docker", "Dockerfile.base"),
build_context=SITEPACKS,
)
_execute(base_image_build_cmd)

# Create workspace archive.
archive = context.invoke(export_)
workspace_name, _ = archive.split(".")
ws_image_build_cmd = (
"DOCKER_BUILDKIT=1 docker build {options} "
"--build-arg WORKSPACE_NAME={workspace_name} "
"-t {image_name} "
"-f {dockerfile} "
"{build_context}"
).format(
options="",
image_name=workspace_name,
workspace_name=workspace_name,
dockerfile=os.path.join(SITEPACKS, "openfl-docker", "Dockerfile.workspace"),
build_context=".",
)
_execute(ws_image_build_cmd)

workspace_path = os.getcwd()
workspace_name = os.path.basename(workspace_path)

# Exporting the workspace
context.invoke(export_)
workspace_archive = workspace_name + ".zip"

build_args = {"WORKSPACE_NAME": workspace_name, "BASE_IMAGE": base_image}

cli = docker.APIClient()
echo("Building the Docker image")
try:
for line in cli.build(
path=str(workspace_path),
tag=workspace_name,
buildargs=build_args,
dockerfile=dockerfile_workspace,
timeout=3600,
decode=True,
):
if "stream" in line:
print(f'> {line["stream"]}', end="")
elif "error" in line:
echo("Failed to build the Docker image:")
echo(line)
sys.exit(1)
finally:
os.remove(workspace_archive)
os.remove(dockerfile_workspace)
echo("The workspace image has been built successfully!")

# Saving the image to a tarball
# Export workspace as tarball (optional)
if save:
workspace_image_tar = workspace_name + "_image.tar"
echo("Saving the Docker image...")
client = docker.from_env(timeout=3600)
image = client.images.get(f"{workspace_name}")
resp = image.save(named=True)
with open(workspace_image_tar, "wb") as f:
for chunk in resp:
f.write(chunk)
echo(f"{workspace_name} image saved to {workspace_path}/{workspace_image_tar}")
logging.info("Saving workspace docker image...")
save_image_cmd = "docker save {image_name} -o {image_name}.tar"
_execute(save_image_cmd.format(image_name=workspace_name))
logging.info(f"Docker image saved to file: {workspace_name}.tar")


@workspace.command(name="graminize")
Expand Down Expand Up @@ -654,3 +616,31 @@ def apply_template_plan(prefix, template):
template_plan = Plan.parse(WORKSPACE / template / "plan" / "plan.yaml")

Plan.dump(prefix / "plan" / "plan.yaml", template_plan.config)


def _execute(cmd: str, verbose=True) -> None:
"""Executes `cmd` as a subprocess
Args:
cmd (str): Command to be executed.
Raises:
Exception: If return code is nonzero
Returns:
`stdout` of the command as list of messages
"""
logging.info(f"Executing: {cmd}")
process = subprocess.Popen(cmd, shell=True, stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
stdout_log = list()
for line in process.stdout:
msg = line.rstrip().decode("utf-8")
stdout_log.append(msg)
if verbose:
logging.info(msg)

process.communicate()
if process.returncode != 0:
raise Exception(f"`{cmd}` failed with return code {process.returncode}")

return stdout_log

0 comments on commit f2acf53

Please sign in to comment.