From 6b69126d987aa3f223cebdfa9e8b3945773c6272 Mon Sep 17 00:00:00 2001 From: Patrick Foley Date: Mon, 23 Dec 2024 11:41:20 -0800 Subject: [PATCH] Bring python versions and and callbacks into 1.7 release (#1228) * Bump Python version to 3.10 - 3.12 (#1213) * update python version to 3.12 Signed-off-by: yes * dummy commit Signed-off-by: yes * dummy commit Signed-off-by: yes * update python version to 3.12 Signed-off-by: yes * dummy commit Signed-off-by: yes * dummy commit Signed-off-by: yes * removed files Signed-off-by: yes * reverted doc change Signed-off-by: yes * added missing requirements for Workflow Interface Tests Signed-off-by: yes * added tensorboard Signed-off-by: yes --------- Signed-off-by: yes * Introduce `callbacks` API (#1195) * Get rid of kwargs Signed-off-by: Shah, Karan * Use module-level logger Signed-off-by: Shah, Karan * Reduce keras verbosity Signed-off-by: Shah, Karan * Remove all log_metric and log_memory_usage traces; add callback hooks Signed-off-by: Shah, Karan * Add `openfl.callbacks` module Signed-off-by: Shah, Karan * Include round_num for task callbacks Signed-off-by: Shah, Karan * Add tensordb to callbacks Signed-off-by: Shah, Karan * No round_num on task callbacks Signed-off-by: Shah, Karan * Remove task boundary callbacks Signed-off-by: Shah, Karan * Remove tb/model_ckpt. Add memory_profiler Signed-off-by: Shah, Karan * Restore psutil and tbX Signed-off-by: Shah, Karan * Format code Signed-off-by: Shah, Karan * Define default callbacks Signed-off-by: Shah, Karan * Add write_logs for bwd compat Signed-off-by: Shah, Karan * Add log_metric_callback for bwd compat Signed-off-by: Shah, Karan * Migrate to module-level logger for collaborator Signed-off-by: Shah, Karan * Review comments Signed-off-by: Shah, Karan * Add metric_writer Signed-off-by: Shah, Karan * Add collaborator side metric logging Signed-off-by: Shah, Karan * Make log dirs on exp begin Signed-off-by: Shah, Karan * Do not print use_tls Signed-off-by: Shah, Karan * Assume reportable metric to be a scalar Signed-off-by: Shah, Karan * Add aggregator side callbacks Signed-off-by: Shah, Karan * do_task test returns mock dict Signed-off-by: Shah, Karan * Consistency changes Signed-off-by: Shah, Karan * Add documentation hooks Signed-off-by: Shah, Karan * Update docstring Signed-off-by: Shah, Karan * Update docs hook Signed-off-by: Shah, Karan * Remove all traces of log_metric_callback and write_metric Signed-off-by: Shah, Karan * Do on_round_begin if not time_to_quit Signed-off-by: Shah, Karan --------- Signed-off-by: Shah, Karan --------- Signed-off-by: yes Signed-off-by: Shah, Karan Co-authored-by: Shailesh Tanwar <135304487+tanwarsh@users.noreply.github.com> Co-authored-by: Karan Shah --- .github/workflows/docker-bench-security.yml | 2 +- .github/workflows/double_ws_export.yml | 2 +- .../workflows/experimental_workflow_tests.yml | 2 +- .github/workflows/hadolint.yml | 2 +- .github/workflows/lint.yml | 2 +- .github/workflows/pki.yml | 4 +- .github/workflows/pytest_coverage.yml | 2 +- .github/workflows/straggler-handling.yml | 2 +- .github/workflows/task_runner_basic_e2e.yml | 2 +- .github/workflows/taskrunner.yml | 2 +- .../workflows/taskrunner_eden_pipeline.yml | 2 +- .../workflows/tr_docker_gramine_direct.yml | 2 +- .github/workflows/tr_docker_native.yml | 2 +- .github/workflows/ubuntu.yml | 4 +- .github/workflows/windows.yml | 4 +- Jenkinsfile | 4 +- docs/about/features_index/taskrunner.rst | 2 +- .../running_the_federation.notebook.rst | 2 +- .../running_the_federation_with_gandlf.rst | 2 +- docs/openfl.callbacks.rst | 16 ++ docs/openfl.rst | 3 +- .../404_Keras_MNIST_with_FedProx.ipynb | 2 +- .../101_torch_cnn_mnist/plan/plan.yaml | 2 - .../101_torch_cnn_mnist/src/utils.py | 20 -- .../102_aggregator_validation/plan/plan.yaml | 2 - .../102_aggregator_validation/src/utils.py | 20 -- .../104_keras_mnist/plan/plan.yaml | 2 - .../104_keras_mnist/src/utils.py | 20 -- .../plan/plan.yaml | 2 - .../src/utils.py | 22 --- .../plan/plan.yaml | 2 - .../src/utils.py | 20 -- .../vertical_fl/plan/plan.yaml | 2 - .../vertical_fl/src/utils.py | 20 -- .../vertical_fl_two_party/plan/plan.yaml | 2 - .../vertical_fl_two_party/src/utils.py | 20 -- .../plan/plan.yaml | 2 - .../src/mnist_utils.py | 16 -- .../torch_cnn_mnist_fed_eval/plan/plan.yaml | 2 - .../src/mnist_utils.py | 16 -- .../plan/plan.yaml | 2 - .../src/mnist_utils.py | 16 -- .../torch_llm_horovod/plan/plan.yaml | 2 - .../torch_llm_horovod/src/emotion_utils.py | 16 -- .../workspace/plan/defaults/aggregator.yaml | 1 - openfl/callbacks/__init__.py | 7 + openfl/callbacks/callback.py | 56 ++++++ openfl/callbacks/callback_list.py | 95 ++++++++++ openfl/callbacks/lambda_callback.py | 38 ++++ openfl/callbacks/memory_profiler.py | 62 ++++++ openfl/callbacks/metric_writer.py | 69 +++++++ openfl/component/aggregator/aggregator.py | 179 ++++++++---------- openfl/component/collaborator/collaborator.py | 137 ++++++++------ openfl/federated/plan/plan.py | 15 +- openfl/transport/grpc/aggregator_server.py | 1 - openfl/utilities/logs.py | 82 -------- setup.py | 4 +- ...ements_experimental_localruntime_tests.txt | 4 + .../collaborator/test_collaborator.py | 3 +- 59 files changed, 540 insertions(+), 508 deletions(-) create mode 100644 docs/openfl.callbacks.rst delete mode 100644 openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/101_torch_cnn_mnist/src/utils.py delete mode 100644 openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/102_aggregator_validation/src/utils.py delete mode 100644 openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/104_keras_mnist/src/utils.py delete mode 100644 openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/301_torch_cnn_mnist_watermarking/src/utils.py delete mode 100644 openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/501_pytorch_tinyimagenet_transfer_learning/src/utils.py delete mode 100644 openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl/src/utils.py delete mode 100644 openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl_two_party/src/utils.py create mode 100644 openfl/callbacks/__init__.py create mode 100644 openfl/callbacks/callback.py create mode 100644 openfl/callbacks/callback_list.py create mode 100644 openfl/callbacks/lambda_callback.py create mode 100644 openfl/callbacks/memory_profiler.py create mode 100644 openfl/callbacks/metric_writer.py diff --git a/.github/workflows/docker-bench-security.yml b/.github/workflows/docker-bench-security.yml index 1d1b56b486..a64282a309 100644 --- a/.github/workflows/docker-bench-security.yml +++ b/.github/workflows/docker-bench-security.yml @@ -19,7 +19,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/double_ws_export.yml b/.github/workflows/double_ws_export.yml index d4d6b0459c..a33aea7d64 100644 --- a/.github/workflows/double_ws_export.yml +++ b/.github/workflows/double_ws_export.yml @@ -26,7 +26,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/experimental_workflow_tests.yml b/.github/workflows/experimental_workflow_tests.yml index d99cec2e5f..84166ab362 100644 --- a/.github/workflows/experimental_workflow_tests.yml +++ b/.github/workflows/experimental_workflow_tests.yml @@ -25,7 +25,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/hadolint.yml b/.github/workflows/hadolint.yml index 6e90292158..a529491bdb 100644 --- a/.github/workflows/hadolint.yml +++ b/.github/workflows/hadolint.yml @@ -22,7 +22,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 13d53e885d..1556bad3b3 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -22,7 +22,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install linters run: | python -m pip install --upgrade pip diff --git a/.github/workflows/pki.yml b/.github/workflows/pki.yml index d8ecbecb03..704fbbfcea 100644 --- a/.github/workflows/pki.yml +++ b/.github/workflows/pki.yml @@ -26,7 +26,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip @@ -43,7 +43,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/pytest_coverage.yml b/.github/workflows/pytest_coverage.yml index 9371f74e13..18fa9f25fe 100644 --- a/.github/workflows/pytest_coverage.yml +++ b/.github/workflows/pytest_coverage.yml @@ -27,7 +27,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/straggler-handling.yml b/.github/workflows/straggler-handling.yml index 47b7f05709..4f2bdb7dd1 100644 --- a/.github/workflows/straggler-handling.yml +++ b/.github/workflows/straggler-handling.yml @@ -29,7 +29,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/task_runner_basic_e2e.yml b/.github/workflows/task_runner_basic_e2e.yml index c75deb5eb3..b50eedd526 100644 --- a/.github/workflows/task_runner_basic_e2e.yml +++ b/.github/workflows/task_runner_basic_e2e.yml @@ -37,7 +37,7 @@ jobs: # There are open issues for some of the models, so excluding them for now: # model_name: [ "torch_cnn_mnist", "keras_cnn_mnist", "torch_cnn_histology" ] model_name: ["torch_cnn_mnist", "keras_cnn_mnist"] - python_version: ["3.9", "3.10", "3.11"] + python_version: ["3.10", "3.11", "3.12"] fail-fast: false # do not immediately fail if one of the combinations fail env: diff --git a/.github/workflows/taskrunner.yml b/.github/workflows/taskrunner.yml index 1e3a3ab6c2..59abb67251 100644 --- a/.github/workflows/taskrunner.yml +++ b/.github/workflows/taskrunner.yml @@ -21,7 +21,7 @@ jobs: strategy: matrix: os: ['ubuntu-latest', 'windows-latest'] - python-version: ["3.9", "3.10", "3.11"] + python-version: ["3.10", "3.11", "3.12"] runs-on: ${{ matrix.os }} timeout-minutes: 15 diff --git a/.github/workflows/taskrunner_eden_pipeline.yml b/.github/workflows/taskrunner_eden_pipeline.yml index 44de314a5f..b6d2426c46 100644 --- a/.github/workflows/taskrunner_eden_pipeline.yml +++ b/.github/workflows/taskrunner_eden_pipeline.yml @@ -25,7 +25,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/tr_docker_gramine_direct.yml b/.github/workflows/tr_docker_gramine_direct.yml index 855a059c98..d02526edb7 100644 --- a/.github/workflows/tr_docker_gramine_direct.yml +++ b/.github/workflows/tr_docker_gramine_direct.yml @@ -20,7 +20,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/tr_docker_native.yml b/.github/workflows/tr_docker_native.yml index 36dfcf9107..b3382553ae 100644 --- a/.github/workflows/tr_docker_native.yml +++ b/.github/workflows/tr_docker_native.yml @@ -20,7 +20,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/ubuntu.yml b/.github/workflows/ubuntu.yml index 4c3c99af43..c968e85f11 100644 --- a/.github/workflows/ubuntu.yml +++ b/.github/workflows/ubuntu.yml @@ -21,7 +21,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip @@ -43,7 +43,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml index 1e5b245cc1..341b93b7f1 100644 --- a/.github/workflows/windows.yml +++ b/.github/workflows/windows.yml @@ -20,7 +20,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip @@ -41,7 +41,7 @@ jobs: - name: Set up Python 3 uses: actions/setup-python@v3 with: - python-version: "3.9" + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/Jenkinsfile b/Jenkinsfile index c1904d0453..73f919c844 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -81,7 +81,7 @@ pipeline { SNYK_ALLOW_LONG_PROJECT_NAME = true SNYK_USE_MULTI_PROC = true SNYK_DEBUG = true - SNYK_PYTHON_VERSION = '3.9' + SNYK_PYTHON_VERSION = '3.10' BANDIT_SOURCE_PATH = 'openfl/ openfl-workspace/ openfl-tutorials/' BANDIT_SEVERITY_LEVEL = 'high' @@ -114,7 +114,7 @@ pipeline { stage('Build Package') { agent { docker { - image 'python:3.9' + image 'python:3.10' } } steps { diff --git a/docs/about/features_index/taskrunner.rst b/docs/about/features_index/taskrunner.rst index 66abcf6868..2097c72f32 100644 --- a/docs/about/features_index/taskrunner.rst +++ b/docs/about/features_index/taskrunner.rst @@ -142,7 +142,7 @@ Bare Metal Approach STEP 1: Create a Workspace ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -1. Start a Python 3.9 (>=3.9, <3.12) virtual environment and confirm OpenFL is available. +1. Start a Python 3.10 (>=3.10, <3.13) virtual environment and confirm OpenFL is available. .. code-block:: shell diff --git a/docs/developer_guide/running_the_federation.notebook.rst b/docs/developer_guide/running_the_federation.notebook.rst index e15d3b91de..44e18e1380 100644 --- a/docs/developer_guide/running_the_federation.notebook.rst +++ b/docs/developer_guide/running_the_federation.notebook.rst @@ -17,7 +17,7 @@ You will start a Jupyter\* \ lab server and receive a URL you can use to access Start the Tutorials =================== -1. Start a Python\* \ 3.9 (>=3.9, <3.12) virtual environment and confirm OpenFL is available. +1. Start a Python\* \ 3.10 (>=3.10, <3.13) virtual environment and confirm OpenFL is available. .. code-block:: python diff --git a/docs/developer_guide/running_the_federation_with_gandlf.rst b/docs/developer_guide/running_the_federation_with_gandlf.rst index e2d2cd37f9..3e8dc7707f 100644 --- a/docs/developer_guide/running_the_federation_with_gandlf.rst +++ b/docs/developer_guide/running_the_federation_with_gandlf.rst @@ -113,7 +113,7 @@ However, continue with the following procedure for details in creating a federat STEP 1: Install GaNDLF prerequisites and Create a Workspace ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -1. Start a Python 3.9 (>=3.9, <3.12) virtual environment and confirm OpenFL is available. +1. Start a Python 3.10 (>=3.10, <3.13) virtual environment and confirm OpenFL is available. .. code-block:: python diff --git a/docs/openfl.callbacks.rst b/docs/openfl.callbacks.rst new file mode 100644 index 0000000000..e6ac82f65a --- /dev/null +++ b/docs/openfl.callbacks.rst @@ -0,0 +1,16 @@ +``openfl.callbacks`` module +=========================== + +.. currentmodule:: openfl.callbacks + +.. automodule:: openfl.callbacks + +.. autosummary:: + :toctree: _autosummary + :recursive: + + Callback + CallbackList + LambdaCallback + MetricWriter + MemoryProfiler diff --git a/docs/openfl.rst b/docs/openfl.rst index 9d053c2173..a4dd53dc5c 100644 --- a/docs/openfl.rst +++ b/docs/openfl.rst @@ -1,6 +1,6 @@ .. currentmodule:: openfl -Public API: ``openfl`` package +API Reference: ``openfl`` =========================== Subpackages @@ -10,6 +10,7 @@ Subpackages :maxdepth: 1 openfl.component + openfl.callbacks openfl.cryptography openfl.experimental openfl.databases diff --git a/openfl-tutorials/experimental/workflow/404_Keras_MNIST_with_FedProx.ipynb b/openfl-tutorials/experimental/workflow/404_Keras_MNIST_with_FedProx.ipynb index 69dbd4159a..9ada0f38b7 100644 --- a/openfl-tutorials/experimental/workflow/404_Keras_MNIST_with_FedProx.ipynb +++ b/openfl-tutorials/experimental/workflow/404_Keras_MNIST_with_FedProx.ipynb @@ -51,7 +51,7 @@ "id": "4dbb89b6", "metadata": {}, "source": [ - "First we start by installing the necessary dependencies for the workflow interface. Note that this tuorial uses Keras 3, make sure you use python 3.9 or higher." + "First we start by installing the necessary dependencies for the workflow interface. Note that this tuorial uses Keras 3, make sure you use python 3.10 or higher." ] }, { diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/101_torch_cnn_mnist/plan/plan.yaml b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/101_torch_cnn_mnist/plan/plan.yaml index b4a50d8c62..ab0440875d 100644 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/101_torch_cnn_mnist/plan/plan.yaml +++ b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/101_torch_cnn_mnist/plan/plan.yaml @@ -6,8 +6,6 @@ aggregator : template : openfl.experimental.workflow.component.Aggregator settings : rounds_to_train : 1 - log_metric_callback : - template : src.utils.write_metric collaborator : diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/101_torch_cnn_mnist/src/utils.py b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/101_torch_cnn_mnist/src/utils.py deleted file mode 100644 index 1e56f3e68d..0000000000 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/101_torch_cnn_mnist/src/utils.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (C) 2020-2021 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from torch.utils.tensorboard import SummaryWriter - - -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/102_aggregator_validation/plan/plan.yaml b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/102_aggregator_validation/plan/plan.yaml index c42ce135c9..413c871ab2 100644 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/102_aggregator_validation/plan/plan.yaml +++ b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/102_aggregator_validation/plan/plan.yaml @@ -6,8 +6,6 @@ aggregator : template : openfl.experimental.workflow.component.Aggregator settings : rounds_to_train : 1 - log_metric_callback : - template : src.utils.write_metric collaborator : diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/102_aggregator_validation/src/utils.py b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/102_aggregator_validation/src/utils.py deleted file mode 100644 index 1e56f3e68d..0000000000 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/102_aggregator_validation/src/utils.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (C) 2020-2021 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from torch.utils.tensorboard import SummaryWriter - - -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/104_keras_mnist/plan/plan.yaml b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/104_keras_mnist/plan/plan.yaml index dca3c92bbf..e119a0b047 100644 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/104_keras_mnist/plan/plan.yaml +++ b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/104_keras_mnist/plan/plan.yaml @@ -6,8 +6,6 @@ aggregator : template : openfl.experimental.workflow.component.Aggregator settings : rounds_to_train : 1 - log_metric_callback : - template : src.utils.write_metric collaborator : diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/104_keras_mnist/src/utils.py b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/104_keras_mnist/src/utils.py deleted file mode 100644 index 96fe885713..0000000000 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/104_keras_mnist/src/utils.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (C) 2020-2021 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from tensorflow.summary import SummaryWriter - - -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/301_torch_cnn_mnist_watermarking/plan/plan.yaml b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/301_torch_cnn_mnist_watermarking/plan/plan.yaml index 0f37dc3a77..0f0ddcb802 100644 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/301_torch_cnn_mnist_watermarking/plan/plan.yaml +++ b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/301_torch_cnn_mnist_watermarking/plan/plan.yaml @@ -6,8 +6,6 @@ aggregator : template : openfl.experimental.workflow.component.Aggregator settings : rounds_to_train : 1 - log_metric_callback : - template : src.utils.write_metric collaborator : diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/301_torch_cnn_mnist_watermarking/src/utils.py b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/301_torch_cnn_mnist_watermarking/src/utils.py deleted file mode 100644 index a3db4c1ecf..0000000000 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/301_torch_cnn_mnist_watermarking/src/utils.py +++ /dev/null @@ -1,22 +0,0 @@ -# Copyright (C) 2020-2021 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -"""You may copy this file as the starting point of your own model.""" - -from torch.utils.tensorboard import SummaryWriter - - -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/501_pytorch_tinyimagenet_transfer_learning/plan/plan.yaml b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/501_pytorch_tinyimagenet_transfer_learning/plan/plan.yaml index 7f3a47bb39..c2239a0a19 100644 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/501_pytorch_tinyimagenet_transfer_learning/plan/plan.yaml +++ b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/501_pytorch_tinyimagenet_transfer_learning/plan/plan.yaml @@ -6,8 +6,6 @@ aggregator : template : openfl.experimental.workflow.component.Aggregator settings : rounds_to_train : 1 - log_metric_callback : - template : src.utils.write_metric collaborator : diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/501_pytorch_tinyimagenet_transfer_learning/src/utils.py b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/501_pytorch_tinyimagenet_transfer_learning/src/utils.py deleted file mode 100644 index 1e56f3e68d..0000000000 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/501_pytorch_tinyimagenet_transfer_learning/src/utils.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (C) 2020-2021 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from torch.utils.tensorboard import SummaryWriter - - -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl/plan/plan.yaml b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl/plan/plan.yaml index 061afcc41d..da42ea4dc9 100644 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl/plan/plan.yaml +++ b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl/plan/plan.yaml @@ -6,8 +6,6 @@ aggregator : template : openfl.experimental.workflow.component.aggregator.Aggregator settings : rounds_to_train : 1 - log_metric_callback : - template : src.utils.write_metric collaborator : diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl/src/utils.py b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl/src/utils.py deleted file mode 100644 index 1e56f3e68d..0000000000 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl/src/utils.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (C) 2020-2021 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from torch.utils.tensorboard import SummaryWriter - - -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl_two_party/plan/plan.yaml b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl_two_party/plan/plan.yaml index f30c5481ea..7819432251 100644 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl_two_party/plan/plan.yaml +++ b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl_two_party/plan/plan.yaml @@ -6,8 +6,6 @@ aggregator : template : openfl.experimental.workflow.component.aggregator.Aggregator settings : rounds_to_train : 10 - log_metric_callback : - template : src.utils.write_metric collaborator : diff --git a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl_two_party/src/utils.py b/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl_two_party/src/utils.py deleted file mode 100644 index 1e56f3e68d..0000000000 --- a/openfl-workspace/experimental/workflow/AggregatorBasedWorkflow/vertical_fl_two_party/src/utils.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (C) 2020-2021 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from torch.utils.tensorboard import SummaryWriter - - -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) diff --git a/openfl-workspace/torch_cnn_mnist_eden_compression/plan/plan.yaml b/openfl-workspace/torch_cnn_mnist_eden_compression/plan/plan.yaml index 96eb8a35f9..283a3dc97c 100644 --- a/openfl-workspace/torch_cnn_mnist_eden_compression/plan/plan.yaml +++ b/openfl-workspace/torch_cnn_mnist_eden_compression/plan/plan.yaml @@ -9,8 +9,6 @@ aggregator : best_state_path : save/torch_cnn_mnist_best.pbuf last_state_path : save/torch_cnn_mnist_last.pbuf rounds_to_train : 10 - log_metric_callback : - template : src.mnist_utils.write_metric collaborator : diff --git a/openfl-workspace/torch_cnn_mnist_eden_compression/src/mnist_utils.py b/openfl-workspace/torch_cnn_mnist_eden_compression/src/mnist_utils.py index 16ee801b4d..a03e1e6da2 100644 --- a/openfl-workspace/torch_cnn_mnist_eden_compression/src/mnist_utils.py +++ b/openfl-workspace/torch_cnn_mnist_eden_compression/src/mnist_utils.py @@ -6,27 +6,11 @@ from logging import getLogger import numpy as np -from torch.utils.tensorboard import SummaryWriter from torchvision import datasets from torchvision import transforms logger = getLogger(__name__) -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) - def one_hot(labels, classes): """ diff --git a/openfl-workspace/torch_cnn_mnist_fed_eval/plan/plan.yaml b/openfl-workspace/torch_cnn_mnist_fed_eval/plan/plan.yaml index 02db7dff3e..580ce79760 100644 --- a/openfl-workspace/torch_cnn_mnist_fed_eval/plan/plan.yaml +++ b/openfl-workspace/torch_cnn_mnist_fed_eval/plan/plan.yaml @@ -8,8 +8,6 @@ aggregator : init_state_path : save/torch_cnn_mnist_init.pbuf best_state_path : save/torch_cnn_mnist_best.pbuf last_state_path : save/torch_cnn_mnist_last.pbuf - log_metric_callback : - template : src.mnist_utils.write_metric collaborator : defaults : plan/defaults/collaborator.yaml diff --git a/openfl-workspace/torch_cnn_mnist_fed_eval/src/mnist_utils.py b/openfl-workspace/torch_cnn_mnist_fed_eval/src/mnist_utils.py index 1eccd2a95d..95fa35fa6f 100644 --- a/openfl-workspace/torch_cnn_mnist_fed_eval/src/mnist_utils.py +++ b/openfl-workspace/torch_cnn_mnist_fed_eval/src/mnist_utils.py @@ -6,27 +6,11 @@ from logging import getLogger import numpy as np -from torch.utils.tensorboard import SummaryWriter from torchvision import datasets from torchvision import transforms logger = getLogger(__name__) -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) - def one_hot(labels, classes): """ diff --git a/openfl-workspace/torch_cnn_mnist_straggler_check/plan/plan.yaml b/openfl-workspace/torch_cnn_mnist_straggler_check/plan/plan.yaml index a42b064e56..b2b12f047a 100644 --- a/openfl-workspace/torch_cnn_mnist_straggler_check/plan/plan.yaml +++ b/openfl-workspace/torch_cnn_mnist_straggler_check/plan/plan.yaml @@ -9,8 +9,6 @@ aggregator : best_state_path : save/torch_cnn_mnist_best.pbuf last_state_path : save/torch_cnn_mnist_last.pbuf rounds_to_train : 6 - log_metric_callback : - template : src.mnist_utils.write_metric collaborator : diff --git a/openfl-workspace/torch_cnn_mnist_straggler_check/src/mnist_utils.py b/openfl-workspace/torch_cnn_mnist_straggler_check/src/mnist_utils.py index 16ee801b4d..a03e1e6da2 100644 --- a/openfl-workspace/torch_cnn_mnist_straggler_check/src/mnist_utils.py +++ b/openfl-workspace/torch_cnn_mnist_straggler_check/src/mnist_utils.py @@ -6,27 +6,11 @@ from logging import getLogger import numpy as np -from torch.utils.tensorboard import SummaryWriter from torchvision import datasets from torchvision import transforms logger = getLogger(__name__) -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter('./logs/cnn_mnist', flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f'{node_name}/{task_name}/{metric_name}', metric, round_number) - def one_hot(labels, classes): """ diff --git a/openfl-workspace/torch_llm_horovod/plan/plan.yaml b/openfl-workspace/torch_llm_horovod/plan/plan.yaml index 037b27f437..86ae2579b8 100644 --- a/openfl-workspace/torch_llm_horovod/plan/plan.yaml +++ b/openfl-workspace/torch_llm_horovod/plan/plan.yaml @@ -9,8 +9,6 @@ aggregator : best_state_path : save/torch_llm_best.pbuf last_state_path : save/torch_llm_last.pbuf rounds_to_train : 5 - log_metric_callback : - template : src.emotion_utils.write_metric collaborator : diff --git a/openfl-workspace/torch_llm_horovod/src/emotion_utils.py b/openfl-workspace/torch_llm_horovod/src/emotion_utils.py index 8eeed70e8b..48fb330c72 100644 --- a/openfl-workspace/torch_llm_horovod/src/emotion_utils.py +++ b/openfl-workspace/torch_llm_horovod/src/emotion_utils.py @@ -6,26 +6,10 @@ from logging import getLogger from datasets import Dataset, load_dataset -from torch.utils.tensorboard import SummaryWriter from transformers import AutoTokenizer, DataCollatorWithPadding logger = getLogger(__name__) -writer = None - - -def get_writer(): - """Create global writer object.""" - global writer - if not writer: - writer = SummaryWriter("./logs/llm", flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback.""" - get_writer() - writer.add_scalar(f"{node_name}/{task_name}/{metric_name}", metric, round_number) - def get_emotion_dataset(tokenizer): dataset = load_dataset("dair-ai/emotion", cache_dir="dataset", revision="9ce6303") diff --git a/openfl-workspace/workspace/plan/defaults/aggregator.yaml b/openfl-workspace/workspace/plan/defaults/aggregator.yaml index 0bb76e099d..43d923b996 100644 --- a/openfl-workspace/workspace/plan/defaults/aggregator.yaml +++ b/openfl-workspace/workspace/plan/defaults/aggregator.yaml @@ -1,4 +1,3 @@ template : openfl.component.Aggregator settings : db_store_rounds : 2 - write_logs : true diff --git a/openfl/callbacks/__init__.py b/openfl/callbacks/__init__.py new file mode 100644 index 0000000000..8cd3f911ba --- /dev/null +++ b/openfl/callbacks/__init__.py @@ -0,0 +1,7 @@ +# Copyright 2020-2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +from openfl.callbacks.callback import Callback +from openfl.callbacks.callback_list import CallbackList +from openfl.callbacks.lambda_callback import LambdaCallback +from openfl.callbacks.memory_profiler import MemoryProfiler +from openfl.callbacks.metric_writer import MetricWriter diff --git a/openfl/callbacks/callback.py b/openfl/callbacks/callback.py new file mode 100644 index 0000000000..133d64a7cf --- /dev/null +++ b/openfl/callbacks/callback.py @@ -0,0 +1,56 @@ +# Copyright 2020-2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + + +class Callback: + """Base class for callbacks. + + Callbacks can be used to perform actions at different stages of the + Federated Learning process. To create a custom callback, subclass + `openfl.callbacks.Callback` and implement the necessary methods. + + Callbacks can be triggered on the aggregator and collaborator side + for the following events: + * At the beginning of an experiment + * At the beginning of a round + * At the end of a round + * At the end of an experiment + + Attributes: + params: Additional parameters saved for use within the callback. + tensor_db: The `TensorDB` instance of the respective participant. + """ + + def __init__(self): + self.params = None + self.tensor_db = None + + def set_params(self, params): + self.params = params + + def set_tensor_db(self, tensor_db): + self.tensor_db = tensor_db + + def on_round_begin(self, round_num: int, logs=None): + """Callback function to be executed at the beginning of a round. + + Subclasses need to implement actions to be taken here. + """ + + def on_round_end(self, round_num: int, logs=None): + """Callback function to be executed at the end of a round. + + Subclasses need to implement actions to be taken here. + """ + + def on_experiment_begin(self, logs=None): + """Callback function to be executed at the beginning of an experiment. + + Subclasses need to implement actions to be taken here. + """ + + def on_experiment_end(self, logs=None): + """Callback function to be executed at the end of an experiment. + + Subclasses need to implement actions to be taken here. + """ diff --git a/openfl/callbacks/callback_list.py b/openfl/callbacks/callback_list.py new file mode 100644 index 0000000000..29661fff6a --- /dev/null +++ b/openfl/callbacks/callback_list.py @@ -0,0 +1,95 @@ +# Copyright 2020-2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +from openfl.callbacks.callback import Callback +from openfl.callbacks.memory_profiler import MemoryProfiler +from openfl.callbacks.metric_writer import MetricWriter + + +class CallbackList(Callback): + """An ensemble of callbacks. + + This class allows multiple callbacks to be used together, by sequentially + calling each callback's respective methods. + + Attributes: + callbacks: A list of `openfl.callbacks.Callback` instances. + add_memory_profiler: If True, adds a `MemoryProfiler` callback to the list. + add_metric_writer: If True, adds a `MetricWriter` callback to the list. + tensor_db: Optional `TensorDB` instance of the respective participant. + If provided, callbacks can access TensorDB for various actions. + params: Additional parameters saved for use within the callbacks. + """ + + def __init__( + self, + callbacks: list, + add_memory_profiler=False, + add_metric_writer=False, + tensor_db=None, + **params, + ): + super().__init__() + self.callbacks = _flatten(callbacks) if callbacks else [] + + self._add_default_callbacks(add_memory_profiler, add_metric_writer) + + self.set_tensor_db(tensor_db) + self.set_params(params) + + def set_params(self, params): + self.params = params + if params: + for callback in self.callbacks: + callback.set_params(params) + + def set_tensor_db(self, tensor_db): + self.tensor_db = tensor_db + if tensor_db: + for callback in self.callbacks: + callback.set_tensor_db(tensor_db) + + def _add_default_callbacks(self, add_memory_profiler, add_metric_writer): + """Add default callbacks to callbacks list if not already present.""" + self._memory_profiler = None + self._metric_writer = None + + for cb in self.callbacks: + if isinstance(cb, MemoryProfiler): + self._memory_profiler = cb + if isinstance(cb, MetricWriter): + self._metric_writer = cb + + if add_memory_profiler and self._memory_profiler is None: + self._memory_profiler = MemoryProfiler() + self.callbacks.append(self._memory_profiler) + + if add_metric_writer and self._metric_writer is None: + self._metric_writer = MetricWriter() + self.callbacks.append(self._metric_writer) + + def on_round_begin(self, round_num: int, logs=None): + for callback in self.callbacks: + callback.on_round_begin(round_num, logs) + + def on_round_end(self, round_num: int, logs=None): + for callback in self.callbacks: + callback.on_round_end(round_num, logs) + + def on_experiment_begin(self, logs=None): + for callback in self.callbacks: + callback.on_experiment_begin(logs) + + def on_experiment_end(self, logs=None): + for callback in self.callbacks: + callback.on_experiment_end(logs) + + +def _flatten(l): + """Flatten a possibly-nested tree of lists.""" + if not isinstance(l, (list, tuple)): + return [l] + for elem in l: + if isinstance(elem, list): + yield from _flatten(elem) + else: + yield elem diff --git a/openfl/callbacks/lambda_callback.py b/openfl/callbacks/lambda_callback.py new file mode 100644 index 0000000000..d1b0542f83 --- /dev/null +++ b/openfl/callbacks/lambda_callback.py @@ -0,0 +1,38 @@ +# Copyright 2020-2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +from openfl.callbacks.callback import Callback + + +class LambdaCallback(Callback): + """Custom on-the-fly callbacks. + + This callback can be constructed with functions that will be called + at the appropriate time during the life-cycle of a Federated Learning experiment. + Certain callbacks may expect positional arguments, for example: + + * on_round_begin: expects `round_num` as a positional argument. + * on_round_end: expects `round_num` as a positional argument. + + Args: + on_round_begin: called at the beginning of every round. + on_round_end: called at the end of every round. + on_experiment_begin: called at the beginning of an experiment. + on_experiment_end: called at the end of an experiment. + """ + + def __init__( + self, + on_round_begin=None, + on_round_end=None, + on_experiment_begin=None, + on_experiment_end=None, + ): + super().__init__() + if on_round_begin is not None: + self.on_round_begin = on_round_begin + if on_round_end is not None: + self.on_round_end = on_round_end + if on_experiment_begin is not None: + self.on_experiment_begin = on_experiment_begin + if on_experiment_end is not None: + self.on_experiment_end = on_experiment_end diff --git a/openfl/callbacks/memory_profiler.py b/openfl/callbacks/memory_profiler.py new file mode 100644 index 0000000000..71b6d18488 --- /dev/null +++ b/openfl/callbacks/memory_profiler.py @@ -0,0 +1,62 @@ +# Copyright 2020-2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +import json +import logging +import os + +import psutil + +from openfl.callbacks.callback import Callback + +logger = logging.getLogger(__name__) + + +class MemoryProfiler(Callback): + """Profile memory usage of the current process at the end of each round. + + Attributes: + log_dir: If set, writes logs as lines of JSON. + """ + + def __init__(self, log_dir: str = "./logs/"): + super().__init__() + self.log_dir = None + if log_dir: + os.makedirs(log_dir, exist_ok=True) + self.log_dir = log_dir + + def on_round_end(self, round_num: int, logs=None): + origin = self.params["origin"] + + info = _get_memory_usage() + info["round_number"] = round_num + info["origin"] = origin + + logger.info(f"Round {round_num}: Memory usage: {info}") + if self.log_dir: + with open(os.path.join(self.log_dir, f"{origin}_memory_usage.json"), "a") as f: + f.write(json.dumps(info) + "\n") + + +def _get_memory_usage() -> dict: + process = psutil.Process(os.getpid()) + virtual_memory = psutil.virtual_memory() + swap_memory = psutil.swap_memory() + info = { + "process_memory": round(process.memory_info().rss / (1024**2), 2), + "virtual_memory/total": round(virtual_memory.total / (1024**2), 2), + "virtual_memory/available": round(virtual_memory.available / (1024**2), 2), + "virtual_memory/percent": virtual_memory.percent, + "virtual_memory/used": round(virtual_memory.used / (1024**2), 2), + "virtual_memory/free": round(virtual_memory.free / (1024**2), 2), + "virtual_memory/active": round(virtual_memory.active / (1024**2), 2), + "virtual_memory/inactive": round(virtual_memory.inactive / (1024**2), 2), + "virtual_memory/buffers": round(virtual_memory.buffers / (1024**2), 2), + "virtual_memory/cached": round(virtual_memory.cached / (1024**2), 2), + "virtual_memory/shared": round(virtual_memory.shared / (1024**2), 2), + "swap_memory/total": round(swap_memory.total / (1024**2), 2), + "swap_memory/used": round(swap_memory.used / (1024**2), 2), + "swap_memory/free": round(swap_memory.free / (1024**2), 2), + "swap_memory/percent": swap_memory.percent, + } + return info diff --git a/openfl/callbacks/metric_writer.py b/openfl/callbacks/metric_writer.py new file mode 100644 index 0000000000..fc9a2daa35 --- /dev/null +++ b/openfl/callbacks/metric_writer.py @@ -0,0 +1,69 @@ +# Copyright 2020-2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +import json +import logging +import os + +from tensorboardX import SummaryWriter + +from openfl.callbacks.callback import Callback + +logger = logging.getLogger(__name__) + + +class MetricWriter(Callback): + """Log scalar metrics at the end of each round. + + Attributes: + log_dir: Path to write logs as lines of JSON. Defaults to `./logs`. + use_tensorboard: If True, writes scalar summaries to TensorBoard under `log_dir`. + """ + + def __init__(self, log_dir: str = "./logs/", use_tensorboard: bool = True): + super().__init__() + self.log_dir = log_dir + self.use_tensorboard = use_tensorboard + + self._log_file_handle = None + self._summary_writer = None + + def on_experiment_begin(self, logs=None): + """Open file handles for logging.""" + os.makedirs(self.log_dir, exist_ok=True) + + if not self._log_file_handle: + self._log_file_handle = open( + os.path.join(self.log_dir, self.params["origin"] + "_metrics.txt"), "a" + ) + + if self.use_tensorboard: + self._summary_writer = SummaryWriter( + os.path.join(self.log_dir, self.params["origin"] + "_tensorboard") + ) + + def on_round_end(self, round_num: int, logs=None): + """Log metrics. + + Args: + round_num: The current round number. + logs: A key-value pair of scalar metrics. + """ + logs = logs or {} + logger.info(f"Round {round_num}: Metrics: {logs}") + + self._log_file_handle.write(json.dumps(logs) + "\n") + self._log_file_handle.flush() + + if self._summary_writer: + for key, value in logs.items(): + self._summary_writer.add_scalar(key, value, round_num) + self._summary_writer.flush() + + def on_experiment_end(self, logs=None): + """Cleanup.""" + if self._log_file_handle: + self._log_file_handle.close() + self._log_file_handle = None + + if self._summary_writer: + self._summary_writer.close() diff --git a/openfl/component/aggregator/aggregator.py b/openfl/component/aggregator/aggregator.py index 1e34aa7e92..eaac9fa6a0 100644 --- a/openfl/component/aggregator/aggregator.py +++ b/openfl/component/aggregator/aggregator.py @@ -4,18 +4,21 @@ """Aggregator module.""" +import logging import queue import time -from logging import getLogger from threading import Lock +from typing import List, Optional +import openfl.callbacks as callbacks_module from openfl.component.straggler_handling_functions import CutoffTimeBasedStragglerHandling from openfl.databases import TensorDB from openfl.interface.aggregation_functions import WeightedAverage from openfl.pipelines import NoCompressionPipeline, TensorCodec from openfl.protocols import base_pb2, utils from openfl.utilities import TaskResultKey, TensorKey, change_tags -from openfl.utilities.logs import get_memory_usage, write_memory_usage_to_file, write_metric + +logger = logging.getLogger(__name__) class Aggregator: @@ -38,8 +41,7 @@ class Aggregator: tensor_db (TensorDB): Object for tensor database. db_store_rounds* (int): Rounds to store in TensorDB. logger: Object for logging. - write_logs (bool): Flag to enable log writing. - log_metric_callback: Callback for logging metrics. + write_logs (bool): Flag to enable metric writer callback. best_model_score (optional): Score of the best model. Defaults to None. metric_queue (queue.Queue): Queue for metrics. @@ -76,10 +78,10 @@ def __init__( single_col_cert_common_name=None, compression_pipeline=None, db_store_rounds=1, - write_logs=False, - log_memory_usage=False, - log_metric_callback=None, initial_tensor_dict=None, + log_memory_usage=False, + write_logs=False, + callbacks: Optional[List] = None, ): """Initializes the Aggregator. @@ -104,17 +106,13 @@ def __init__( NoCompressionPipeline. db_store_rounds (int, optional): Rounds to store in TensorDB. Defaults to 1. - write_logs (bool, optional): Whether to write logs. Defaults to - False. - log_metric_callback (optional): Callback for log metric. Defaults - to None. - **kwargs: Additional keyword arguments. + initial_tensor_dict (dict, optional): Initial tensor dictionary. + callbacks: List of callbacks to be used during the experiment. """ - self.logger = getLogger(__name__) self.round_number = 0 if single_col_cert_common_name: - self.logger.warning( + logger.warning( "You are running in single collaborator certificate mode. " "This mode is intended for development settings only and does not " "provide proper Public Key Infrastructure (PKI) security. " @@ -128,9 +126,7 @@ def __init__( ) self._end_of_round_check_done = [False] * rounds_to_train self.stragglers = [] - # Flag can be enabled to get memory usage details for ubuntu system - self.log_memory_usage = log_memory_usage - self.memory_details = [] + self.rounds_to_train = rounds_to_train # if the collaborator requests a delta, this value is set to true @@ -145,16 +141,6 @@ def __init__( # if it is set to 1 for the aggregator. self.db_store_rounds = db_store_rounds - # Gathered together logging-related objects - self.write_logs = write_logs - self.log_metric_callback = log_metric_callback - - if self.write_logs: - self.log_metric = write_metric - if self.log_metric_callback: - self.log_metric = log_metric_callback - self.logger.info("Using custom log metric: %s", self.log_metric) - self.best_model_score = None self.metric_queue = queue.Queue() @@ -165,6 +151,7 @@ def __init__( self.best_state_path = best_state_path self.last_state_path = last_state_path + # TODO: Remove. Used in deprecated interactive and native APIs self.best_tensor_dict: dict = {} self.last_tensor_dict: dict = {} @@ -195,6 +182,19 @@ def __init__( self.use_delta_updates = use_delta_updates + # Callbacks + self.callbacks = callbacks_module.CallbackList( + callbacks, + add_memory_profiler=log_memory_usage, + add_metric_writer=write_logs, + origin="aggregator", + ) + + # TODO: Aggregator has no concrete notion of round_begin. + # https://github.com/securefederatedai/openfl/pull/1195#discussion_r1879479537 + self.callbacks.on_experiment_begin() + self.callbacks.on_round_begin(self.round_number) + def _load_initial_tensors(self): """Load all of the tensors required to begin federated learning. @@ -209,9 +209,7 @@ def _load_initial_tensors(self): ) if round_number > self.round_number: - self.logger.info( - f"Starting training from round {round_number} of previously saved model" - ) + logger.info(f"Starting training from round {round_number} of previously saved model") self.round_number = round_number tensor_key_dict = { TensorKey(k, self.uuid, self.round_number, False, ("model",)): v @@ -219,7 +217,7 @@ def _load_initial_tensors(self): } # all initial model tensors are loaded here self.tensor_db.cache_tensor(tensor_key_dict) - self.logger.debug("This is the initial tensor_db: %s", self.tensor_db) + logger.debug("This is the initial tensor_db: %s", self.tensor_db) def _load_initial_tensors_from_dict(self, tensor_dict): """Load all of the tensors required to begin federated learning. @@ -236,7 +234,7 @@ def _load_initial_tensors_from_dict(self, tensor_dict): } # all initial model tensors are loaded here self.tensor_db.cache_tensor(tensor_key_dict) - self.logger.debug("This is the initial tensor_db: %s", self.tensor_db) + logger.debug("This is the initial tensor_db: %s", self.tensor_db) def _save_model(self, round_number, file_path): """Save the best or latest model. @@ -261,7 +259,7 @@ def _save_model(self, round_number, file_path): tk_name, _, _, _, _ = tk tensor_dict[tk_name] = self.tensor_db.get_tensor_from_cache(tk) if tensor_dict[tk_name] is None: - self.logger.info( + logger.info( "Cannot save model for round %s. Continuing...", round_number, ) @@ -343,13 +341,13 @@ def get_tasks(self, collaborator_name): sleep_time (int): Sleep time. time_to_quit (bool): Whether it's time to quit. """ - self.logger.debug( + logger.debug( f"Aggregator GetTasks function reached from collaborator {collaborator_name}..." ) # first, if it is time to quit, inform the collaborator if self._time_to_quit(): - self.logger.info( + logger.info( "Sending signal to collaborator %s to shutdown...", collaborator_name, ) @@ -402,7 +400,7 @@ def get_tasks(self, collaborator_name): return tasks, self.round_number, sleep_time, time_to_quit - self.logger.info( + logger.info( f"Sending tasks to collaborator {collaborator_name} for round {self.round_number}" ) sleep_time = 0 @@ -421,7 +419,7 @@ def _straggler_cutoff_time_elapsed(self) -> None: Returns: None """ - self.logger.warning( + logger.warning( f"Round number: {self.round_number} cutoff timer elapsed after " f"{self.straggler_handling_policy.straggler_cutoff_time}s. " f"Applying {self.straggler_handling_policy.__class__.__name__} policy." @@ -460,7 +458,7 @@ def get_aggregated_tensor( Raises: ValueError: if Aggregator does not have an aggregated tensor for {tensor_key}. """ - self.logger.debug( + logger.debug( f"Retrieving aggregated tensor {tensor_name},{round_number},{tags} " f"for collaborator {collaborator_name}" ) @@ -490,7 +488,7 @@ def get_aggregated_tensor( start_retrieving_time = time.time() while nparray is None: - self.logger.debug("Waiting for tensor_key %s", agg_tensor_key) + logger.debug("Waiting for tensor_key %s", agg_tensor_key) time.sleep(5) nparray = self.tensor_db.get_tensor_from_cache(agg_tensor_key) if (time.time() - start_retrieving_time) > 60: @@ -609,20 +607,20 @@ def send_local_task_results( None """ if self._time_to_quit() or collaborator_name in self.stragglers: - self.logger.warning( + logger.warning( f"STRAGGLER: Collaborator {collaborator_name} is reporting results " f"after task {task_name} has finished." ) return if self.round_number != round_number: - self.logger.warning( + logger.warning( f"Collaborator {collaborator_name} is reporting results" f" for the wrong round: {round_number}. Ignoring..." ) return - self.logger.info( + logger.info( f"Collaborator {collaborator_name} is sending task results " f"for {task_name}, round {round_number}" ) @@ -631,7 +629,7 @@ def send_local_task_results( # we mustn't have results already if self._collaborator_task_completed(collaborator_name, task_name, round_number): - self.logger.warning( + logger.warning( f"Aggregator already has task results from collaborator {collaborator_name}" f" for task {task_key}" ) @@ -663,15 +661,6 @@ def send_local_task_results( "metric_value": float(value), } self.metric_queue.put(metrics) - self.logger.metric("%s", str(metrics)) - if self.write_logs: - self.log_metric( - collaborator_name, - task_name, - tensor_key.tensor_name, - float(value), - round_number, - ) task_results.append(tensor_key) @@ -699,7 +688,7 @@ def _end_of_round_with_stragglers_check(self): if collab_name not in self.collaborators_done ] if len(self.stragglers) != 0: - self.logger.warning(f"Identified stragglers: {self.stragglers}") + logger.warning(f"Identified stragglers: {self.stragglers}") self._end_of_round_check() def _process_named_tensor(self, named_tensor, collaborator_name): @@ -788,7 +777,7 @@ def _process_named_tensor(self, named_tensor, collaborator_name): assert final_nparray is not None, f"Could not create tensorkey {final_tensor_key}" self.tensor_db.cache_tensor({final_tensor_key: final_nparray}) - self.logger.debug("Created TensorKey: %s", final_tensor_key) + logger.debug("Created TensorKey: %s", final_tensor_key) return final_tensor_key, final_nparray @@ -846,7 +835,7 @@ def _prepare_trained(self, tensor_name, origin, round_number, report, agg_result # Apply delta (unless delta couldn't be created) if base_model_nparray is not None and self.use_delta_updates: - self.logger.debug("Applying delta for layer %s", decompressed_delta_tk[0]) + logger.debug("Applying delta for layer %s", decompressed_delta_tk[0]) new_model_tk, new_model_nparray = self.tensor_codec.apply_delta( decompressed_delta_tk, decompressed_delta_nparray, @@ -879,11 +868,14 @@ def _prepare_trained(self, tensor_name, origin, round_number, report, agg_result # Finally, cache the updated model tensor self.tensor_db.cache_tensor({final_model_tk: new_model_nparray}) - def _compute_validation_related_task_metrics(self, task_name): + def _compute_validation_related_task_metrics(self, task_name) -> dict: """Compute all validation related metrics. Args: task_name (str): Task name. + + Returns: + A dictionary of reportable metrics. """ # By default, print out all of the metrics that the validation # task sent @@ -918,6 +910,7 @@ def _compute_validation_related_task_metrics(self, task_name): task_agg_function = self.assigner.get_aggregation_type_for_task(task_name) task_key = TaskResultKey(task_name, collaborators_for_task[0], self.round_number) + metrics = {} for tensor_key in self.collaborator_tasks_results[task_key]: tensor_name, origin, round_number, report, tags = tensor_key assert ( @@ -934,32 +927,26 @@ def _compute_validation_related_task_metrics(self, task_name): ) if report: - # Caution: This schema must be followed. It is also used in - # gRPC message streams for director/envoy. - metrics = { - "metric_origin": "aggregator", - "task_name": task_name, - "metric_name": tensor_key.tensor_name, - "metric_value": float(agg_results), - "round": round_number, - } - - self.metric_queue.put(metrics) - self.logger.metric("%s", metrics) - if self.write_logs: - self.log_metric( - "aggregator", - task_name, - tensor_key.tensor_name, - float(agg_results), - round_number, - ) + # Metric must be a scalar. + value = float(agg_results) + + # TODO: Deprecate `metric_queue` going forward. + self.metric_queue.put( + { + "metric_origin": "aggregator", + "task_name": task_name, + "metric_name": tensor_key.tensor_name, + "metric_value": value, + "round": round_number, + } + ) + metrics.update({f"aggregator/{task_name}/{tensor_key.tensor_name}": value}) # FIXME: Configurable logic for min/max criteria in saving best. if "validate_agg" in tags: # Compare the accuracy of the model, potentially save it if self.best_model_score is None or self.best_model_score < agg_results: - self.logger.metric( + logger.info( f"Round {round_number}: saved the best " f"model with score {agg_results:f}" ) @@ -968,6 +955,8 @@ def _compute_validation_related_task_metrics(self, task_name): if "trained" in tags: self._prepare_trained(tensor_name, origin, round_number, report, agg_results) + return metrics + def _end_of_round_check(self): """Check if the round complete. @@ -985,22 +974,18 @@ def _end_of_round_check(self): return # Compute all validation related metrics - all_tasks = self.assigner.get_all_tasks_for_round(self.round_number) - for task_name in all_tasks: - self._compute_validation_related_task_metrics(task_name) + logs = {} + for task_name in self.assigner.get_all_tasks_for_round(self.round_number): + logs.update(self._compute_validation_related_task_metrics(task_name)) - if self.log_memory_usage: - # This is the place to check the memory usage of the aggregator - memory_detail = get_memory_usage() - memory_detail["round_number"] = self.round_number - memory_detail["metric_origin"] = "aggregator" - self.memory_details.append(memory_detail) + # End of round callbacks. + self.callbacks.on_round_end(self.round_number, logs) # Once all of the task results have been processed self._end_of_round_check_done[self.round_number] = True # Save the latest model - self.logger.info("Saving round %s model...", self.round_number) + logger.info("Saving round %s model...", self.round_number) self._save_model(self.round_number, self.last_state_path) self.round_number += 1 @@ -1011,12 +996,11 @@ def _end_of_round_check(self): # TODO This needs to be fixed! if self._time_to_quit(): - if self.log_memory_usage: - self.logger.info(f"Publish memory usage: {self.memory_details}") - write_memory_usage_to_file(self.memory_details, "aggregator_memory_usage.json") - self.logger.info("Experiment Completed. Cleaning up...") + logger.info("Experiment Completed. Cleaning up...") else: - self.logger.info("Starting round %s...", self.round_number) + logger.info("Starting round %s...", self.round_number) + # https://github.com/securefederatedai/openfl/pull/1195#discussion_r1879479537 + self.callbacks.on_round_begin(self.round_number) # Cleaning tensor db self.tensor_db.clean_up(self.db_store_rounds) @@ -1036,7 +1020,7 @@ def _is_collaborator_done(self, collaborator_name: str, round_number: int) -> No None """ if self.round_number != round_number: - self.logger.warning( + logger.warning( f"Collaborator {collaborator_name} is reporting results" f" for the wrong round: {round_number}. Ignoring..." ) @@ -1056,7 +1040,7 @@ def _is_collaborator_done(self, collaborator_name: str, round_number: int) -> No # update collaborators_done if all_tasks_completed: self.collaborators_done.append(collaborator_name) - self.logger.info( + logger.info( f"Round {self.round_number}: Collaborators that have completed all tasks: " f"{self.collaborators_done}" ) @@ -1070,16 +1054,19 @@ def stop(self, failed_collaborator: str = None) -> None: Returns: None """ - self.logger.info("Force stopping the aggregator execution.") + logger.info("Force stopping the aggregator execution.") # We imitate quit_job_sent_to the failed collaborator # So the experiment set to a finished state if failed_collaborator: self.quit_job_sent_to.append(failed_collaborator) + # End of experiment callbacks. + self.callbacks.on_experiment_end() + # This code does not actually send `quit` tasks to collaborators, # it just mimics it by filling arrays. for collaborator_name in filter(lambda c: c != failed_collaborator, self.authorized_cols): - self.logger.info( + logger.info( "Sending signal to collaborator %s to shutdown...", collaborator_name, ) diff --git a/openfl/component/collaborator/collaborator.py b/openfl/component/collaborator/collaborator.py index 08f19b9d94..d4fd380998 100644 --- a/openfl/component/collaborator/collaborator.py +++ b/openfl/component/collaborator/collaborator.py @@ -4,16 +4,18 @@ """Collaborator module.""" +import logging from enum import Enum -from logging import getLogger from time import sleep -from typing import Tuple +from typing import List, Optional, Tuple +import openfl.callbacks as callbacks_module from openfl.databases import TensorDB from openfl.pipelines import NoCompressionPipeline, TensorCodec from openfl.protocols import utils from openfl.utilities import TensorKey -from openfl.utilities.logs import get_memory_usage, write_memory_usage_to_file + +logger = logging.getLogger(__name__) class DevicePolicy(Enum): @@ -82,6 +84,8 @@ def __init__( compression_pipeline=None, db_store_rounds=1, log_memory_usage=False, + write_logs=False, + callbacks: Optional[List] = None, ): """Initialize the Collaborator object. @@ -103,6 +107,7 @@ def __init__( Defaults to None. db_store_rounds (int, optional): The number of rounds to store in the database. Defaults to 1. + callbacks (list, optional): List of callbacks. Defaults to None. """ self.single_col_cert_common_name = None @@ -123,31 +128,34 @@ def __init__( self.delta_updates = delta_updates self.client = client - # Flag can be enabled to get memory usage details for ubuntu system - self.log_memory_usage = log_memory_usage - self.task_config = task_config - self.logger = getLogger(__name__) + self.task_config = task_config # RESET/CONTINUE_LOCAL/CONTINUE_GLOBAL if hasattr(OptTreatment, opt_treatment): self.opt_treatment = OptTreatment[opt_treatment] else: - self.logger.error("Unknown opt_treatment: %s.", opt_treatment.name) + logger.error("Unknown opt_treatment: %s.", opt_treatment.name) raise NotImplementedError(f"Unknown opt_treatment: {opt_treatment}.") if hasattr(DevicePolicy, device_assignment_policy): self.device_assignment_policy = DevicePolicy[device_assignment_policy] else: - self.logger.error( - "Unknown device_assignment_policy: " f"{device_assignment_policy.name}." - ) + logger.error("Unknown device_assignment_policy: " f"{device_assignment_policy.name}.") raise NotImplementedError( f"Unknown device_assignment_policy: {device_assignment_policy}." ) self.task_runner.set_optimizer_treatment(self.opt_treatment.name) + # Callbacks + self.callbacks = callbacks_module.CallbackList( + callbacks, + add_memory_profiler=log_memory_usage, + add_metric_writer=write_logs, + origin=self.collaborator_name, + ) + def set_available_devices(self, cuda: Tuple[str] = ()): """Set available CUDA devices. @@ -159,33 +167,36 @@ def set_available_devices(self, cuda: Tuple[str] = ()): def run(self): """Run the collaborator.""" - memory_details = [] + # Experiment begin + self.callbacks.on_experiment_begin() + while True: - tasks, round_number, sleep_time, time_to_quit = self.get_tasks() + tasks, round_num, sleep_time, time_to_quit = self.get_tasks() + if time_to_quit: break - elif sleep_time > 0: - sleep(sleep_time) # some sleep function - else: - self.logger.info("Received the following tasks: %s", tasks) - for task in tasks: - self.do_task(task, round_number) - # Cleaning tensor db - self.tensor_db.clean_up(self.db_store_rounds) - if self.log_memory_usage: - # This is the place to check the memory usage of the collaborator - memory_detail = get_memory_usage() - memory_detail["round_number"] = round_number - memory_detail["metric_origin"] = self.collaborator_name - memory_details.append(memory_detail) - if self.log_memory_usage: - self.logger.info(f"Publish memory usage: {memory_details}") - write_memory_usage_to_file( - memory_details, f"{self.collaborator_name}_memory_usage.json" - ) + if not tasks: + sleep(sleep_time) + continue + + # Round begin + logger.info("Received Tasks: %s", tasks) + self.callbacks.on_round_begin(round_num) + + # Run tasks + logs = {} + for task in tasks: + metrics = self.do_task(task, round_num) + logs.update(metrics) - self.logger.info("End of Federation reached. Exiting...") + # Round end + self.tensor_db.clean_up(self.db_store_rounds) + self.callbacks.on_round_end(round_num, logs) + + # Experiment end + self.callbacks.on_experiment_end() + logger.info("Received shutdown signal. Exiting...") def run_simulation(self): """Specific function for the simulation. @@ -196,15 +207,15 @@ def run_simulation(self): while True: tasks, round_number, sleep_time, time_to_quit = self.get_tasks() if time_to_quit: - self.logger.info("End of Federation reached. Exiting...") + logger.info("End of Federation reached. Exiting...") break elif sleep_time > 0: sleep(sleep_time) # some sleep function else: - self.logger.info("Received the following tasks: %s", tasks) + logger.info("Received the following tasks: %s", tasks) for task in tasks: self.do_task(task, round_number) - self.logger.info( + logger.info( f"All tasks completed on {self.collaborator_name} " f"for round {round_number}..." ) @@ -220,19 +231,22 @@ def get_tasks(self): time_to_quit (bool): bool value for quit. """ # logging wait time to analyze training process - self.logger.info("Waiting for tasks...") + logger.info("Waiting for tasks...") tasks, round_number, sleep_time, time_to_quit = self.client.get_tasks( self.collaborator_name ) return tasks, round_number, sleep_time, time_to_quit - def do_task(self, task, round_number): + def do_task(self, task, round_number) -> dict: """Perform the specified task. Args: task (list_of_str): List of tasks. round_number (int): Actual round number. + + Returns: + A dictionary of reportable metrics of the current collaborator for the task. """ # map this task to an actual function name and kwargs if hasattr(self.task_runner, "TASK_REGISTRY"): @@ -288,7 +302,7 @@ def do_task(self, task, round_number): # New interactive python API # New `Core` TaskRunner contains registry of tasks func = self.task_runner.TASK_REGISTRY[func_name] - self.logger.debug("Using Interactive Python API") + logger.debug("Using Interactive Python API") # So far 'kwargs' contained parameters read from the plan # those are parameters that the eperiment owner registered for @@ -306,7 +320,7 @@ def do_task(self, task, round_number): # TaskRunner subclassing API # Tasks are defined as methods of TaskRunner func = getattr(self.task_runner, func_name) - self.logger.debug("Using TaskRunner subclassing API") + logger.debug("Using TaskRunner subclassing API") global_output_tensor_dict, local_output_tensor_dict = func( col_name=self.collaborator_name, @@ -321,7 +335,8 @@ def do_task(self, task, round_number): # send the results for this tasks; delta and compression will occur in # this function - self.send_task_results(global_output_tensor_dict, round_number, task_name) + metrics = self.send_task_results(global_output_tensor_dict, round_number, task_name) + return metrics def get_numpy_dict_for_tensorkeys(self, tensor_keys): """Get tensor dictionary for specified tensorkey set. @@ -345,13 +360,13 @@ def get_data_for_tensorkey(self, tensor_key): """ # try to get from the store tensor_name, origin, round_number, report, tags = tensor_key - self.logger.debug("Attempting to retrieve tensor %s from local store", tensor_key) + logger.debug("Attempting to retrieve tensor %s from local store", tensor_key) nparray = self.tensor_db.get_tensor_from_cache(tensor_key) # if None and origin is our client, request it from the client if nparray is None: if origin == self.collaborator_name: - self.logger.info( + logger.info( f"Attempting to find locally stored {tensor_name} tensor from prior round..." ) prior_round = round_number - 1 @@ -360,16 +375,14 @@ def get_data_for_tensorkey(self, tensor_key): TensorKey(tensor_name, origin, prior_round, report, tags) ) if nparray is not None: - self.logger.debug( + logger.debug( f"Found tensor {tensor_name} in local TensorDB " f"for round {prior_round}" ) return nparray prior_round -= 1 - self.logger.info( - f"Cannot find any prior version of tensor {tensor_name} locally..." - ) - self.logger.debug( + logger.info(f"Cannot find any prior version of tensor {tensor_name} locally...") + logger.debug( "Unable to get tensor from local store..." "attempting to retrieve from client" ) # Determine whether there are additional compression related @@ -397,7 +410,7 @@ def get_data_for_tensorkey(self, tensor_key): ) self.tensor_db.cache_tensor({new_model_tk: nparray}) else: - self.logger.info( + logger.info( "Count not find previous model layer." "Fetching latest layer from aggregator" ) @@ -411,7 +424,7 @@ def get_data_for_tensorkey(self, tensor_key): tensor_key, require_lossless=True ) else: - self.logger.debug("Found tensor %s in local TensorDB", tensor_key) + logger.debug("Found tensor %s in local TensorDB", tensor_key) return nparray @@ -437,7 +450,7 @@ def get_aggregated_tensor_from_aggregator(self, tensor_key, require_lossless=Fal """ tensor_name, origin, round_number, report, tags = tensor_key - self.logger.debug("Requesting aggregated tensor %s", tensor_key) + logger.debug("Requesting aggregated tensor %s", tensor_key) tensor = self.client.get_aggregated_tensor( self.collaborator_name, tensor_name, @@ -456,13 +469,16 @@ def get_aggregated_tensor_from_aggregator(self, tensor_key, require_lossless=Fal return nparray - def send_task_results(self, tensor_dict, round_number, task_name): + def send_task_results(self, tensor_dict, round_number, task_name) -> dict: """Send task results to the aggregator. Args: tensor_dict (dict): Tensor dictionary. round_number (int): Actual round number. task_name (string): Task name. + + Returns: + A dictionary of reportable metrics of the current collaborator for the task. """ named_tensors = [self.nparray_to_named_tensor(k, v) for k, v in tensor_dict.items()] @@ -477,17 +493,16 @@ def send_task_results(self, tensor_dict, round_number, task_name): if "valid" in task_name: data_size = self.task_runner.get_valid_data_size() - self.logger.debug("%s data size = %s", task_name, data_size) + logger.debug("%s data size = %s", task_name, data_size) + metrics = {} for tensor in tensor_dict: tensor_name, origin, fl_round, report, tags = tensor if report: - self.logger.metric( - f"Round {round_number}, collaborator {self.collaborator_name} " - f"is sending metric for task {task_name}:" - f" {tensor_name}\t{tensor_dict[tensor]:f}" - ) + # Reportable metric must be a scalar + value = float(tensor_dict[tensor]) + metrics.update({f"{self.collaborator_name}/{task_name}/{tensor_name}": value}) self.client.send_local_task_results( self.collaborator_name, @@ -497,6 +512,8 @@ def send_task_results(self, tensor_dict, round_number, task_name): named_tensors, ) + return metrics + def nparray_to_named_tensor(self, tensor_key, nparray): """Construct the NamedTensor Protobuf. @@ -579,7 +596,7 @@ def named_tensor_to_nparray(self, named_tensor): named_tensor.report, tuple(named_tensor.tags), ) - tensor_name, origin, round_number, report, tags = tensor_key + *_, tags = tensor_key if "compressed" in tags: decompressed_tensor_key, decompressed_nparray = self.tensor_codec.decompress( tensor_key, @@ -594,7 +611,7 @@ def named_tensor_to_nparray(self, named_tensor): else: # There could be a case where the compression pipeline is bypassed # entirely - self.logger.warning("Bypassing tensor codec...") + logger.warning("Bypassing tensor codec...") decompressed_tensor_key = tensor_key decompressed_nparray = raw_bytes diff --git a/openfl/federated/plan/plan.py b/openfl/federated/plan/plan.py index 34c50a4d1e..69ff36c19c 100644 --- a/openfl/federated/plan/plan.py +++ b/openfl/federated/plan/plan.py @@ -391,18 +391,9 @@ def get_aggregator(self, tensor_dict=None): defaults[SETTINGS]["assigner"] = self.get_assigner() defaults[SETTINGS]["compression_pipeline"] = self.get_tensor_pipe() defaults[SETTINGS]["straggler_handling_policy"] = self.get_straggler_handling_policy() - log_metric_callback = defaults[SETTINGS].get("log_metric_callback") - - if log_metric_callback: - if isinstance(log_metric_callback, dict): - log_metric_callback = Plan.import_(**log_metric_callback) - elif not callable(log_metric_callback): - raise TypeError( - f"log_metric_callback should be callable object " - f"or be import from code part, get {log_metric_callback}" - ) - defaults[SETTINGS]["log_metric_callback"] = log_metric_callback + # TODO: Load callbacks from plan. + if self.aggregator_ is None: self.aggregator_ = Plan.build(**defaults, initial_tensor_dict=tensor_dict) @@ -577,6 +568,8 @@ def get_collaborator( defaults[SETTINGS]["aggregator_uuid"] = self.aggregator_uuid defaults[SETTINGS]["federation_uuid"] = self.federation_uuid + # TODO: Load callbacks from the plan. + if task_runner is not None: defaults[SETTINGS]["task_runner"] = task_runner else: diff --git a/openfl/transport/grpc/aggregator_server.py b/openfl/transport/grpc/aggregator_server.py index bfae10351b..12f658a0aa 100644 --- a/openfl/transport/grpc/aggregator_server.py +++ b/openfl/transport/grpc/aggregator_server.py @@ -70,7 +70,6 @@ def __init__( TLS connection. **kwargs: Additional keyword arguments. """ - print(f"{use_tls=}") self.aggregator = aggregator self.uri = f"[::]:{agg_port}" self.use_tls = use_tls diff --git a/openfl/utilities/logs.py b/openfl/utilities/logs.py index ce64b5f7fb..3798f2bc90 100644 --- a/openfl/utilities/logs.py +++ b/openfl/utilities/logs.py @@ -4,43 +4,10 @@ """Logs utilities.""" -import json import logging -import os -import psutil from rich.console import Console from rich.logging import RichHandler -from tensorboardX import SummaryWriter - -writer = None - - -def get_writer(): - """Create global writer object. - - This function creates a global `SummaryWriter` object for logging to - TensorBoard. - """ - global writer - if not writer: - writer = SummaryWriter("./logs/tensorboard", flush_secs=5) - - -def write_metric(node_name, task_name, metric_name, metric, round_number): - """Write metric callback. - - This function logs a metric to TensorBoard. - - Args: - node_name (str): The name of the node. - task_name (str): The name of the task. - metric_name (str): The name of the metric. - metric (float): The value of the metric. - round_number (int): The current round number. - """ - get_writer() - writer.add_scalar(f"{node_name}/{task_name}/{metric_name}", metric, round_number) def setup_loggers(log_level=logging.INFO): @@ -60,52 +27,3 @@ def setup_loggers(log_level=logging.INFO): formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s] - %(message)s") handler.setFormatter(formatter) root.addHandler(handler) - - -def get_memory_usage() -> dict: - """Return memory usage details of the current process. - - Returns: - dict: A dictionary containing memory usage details. - """ - process = psutil.Process(os.getpid()) - virtual_memory = psutil.virtual_memory() - swap_memory = psutil.swap_memory() - memory_usage = { - "process_memory": round(process.memory_info().rss / (1024**2), 2), - "virtual_memory": { - "total": round(virtual_memory.total / (1024**2), 2), - "available": round(virtual_memory.available / (1024**2), 2), - "percent": virtual_memory.percent, - "used": round(virtual_memory.used / (1024**2), 2), - "free": round(virtual_memory.free / (1024**2), 2), - "active": round(virtual_memory.active / (1024**2), 2), - "inactive": round(virtual_memory.inactive / (1024**2), 2), - "buffers": round(virtual_memory.buffers / (1024**2), 2), - "cached": round(virtual_memory.cached / (1024**2), 2), - "shared": round(virtual_memory.shared / (1024**2), 2), - }, - "swap_memory": { - "total": round(swap_memory.total / (1024**2), 2), - "used": round(swap_memory.used / (1024**2), 2), - "free": round(swap_memory.free / (1024**2), 2), - "percent": swap_memory.percent, - }, - } - return memory_usage - - -def write_memory_usage_to_file(memory_usage_dict, file_name): - """ - Write memory usage details to a file. - - Args: - memory_usage_dict (dict): The memory usage details to write. - file_name (str): The name of the file to write to. - - Returns: - None - """ - file_path = os.path.join("logs", file_name) - with open(file_path, "w") as f: - json.dump(memory_usage_dict, f, indent=4) diff --git a/setup.py b/setup.py index 8e2d1b1c68..9bad746c74 100644 --- a/setup.py +++ b/setup.py @@ -95,7 +95,7 @@ def run(self): 'protobuf>=4.22,<6.0.0', 'grpcio>=1.56.2,<1.66.0', ], - python_requires='>=3.9, <3.12', + python_requires='>=3.10, <3.13', project_urls={ 'Bug Tracker': 'https://github.com/securefederatedai/openfl/issues', 'Documentation': 'https://openfl.readthedocs.io/en/stable/', @@ -109,9 +109,9 @@ def run(self): 'Topic :: System :: Distributed Computing', 'License :: OSI Approved :: Apache Software License', 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', ], entry_points={'console_scripts': ['fx=openfl.interface.cli:entry']}, cmdclass={ diff --git a/tests/github/experimental/workflow/LocalRuntime/requirements_experimental_localruntime_tests.txt b/tests/github/experimental/workflow/LocalRuntime/requirements_experimental_localruntime_tests.txt index e487d1aeb6..daeb192878 100644 --- a/tests/github/experimental/workflow/LocalRuntime/requirements_experimental_localruntime_tests.txt +++ b/tests/github/experimental/workflow/LocalRuntime/requirements_experimental_localruntime_tests.txt @@ -2,4 +2,8 @@ dill==0.3.6 metaflow==2.7.15 ray==2.9.2 torch +tabulate==0.9.0 torchvision +nbformat==5.10.4 +nbdev==2.3.12 +tensorboard \ No newline at end of file diff --git a/tests/openfl/component/collaborator/test_collaborator.py b/tests/openfl/component/collaborator/test_collaborator.py index a136fe2cba..88a60015c6 100644 --- a/tests/openfl/component/collaborator/test_collaborator.py +++ b/tests/openfl/component/collaborator/test_collaborator.py @@ -133,7 +133,6 @@ def test_send_task_results(collaborator_mock, tensor_key): data_size = -1 collaborator_mock.nparray_to_named_tensor = mock.Mock(return_value=None) collaborator_mock.client.send_local_task_results = mock.Mock() - collaborator_mock.logger.metric = mock.Mock() collaborator_mock.send_task_results(tensor_dict, round_number, task_name) collaborator_mock.client.send_local_task_results.assert_called_with( @@ -286,7 +285,7 @@ def test_run(collaborator_mock): collaborator_mock.get_tasks = mock.Mock() collaborator_mock.get_tasks.side_effect = [(['task'], round_number, 0, False), (['task'], round_number, 0, True)] - collaborator_mock.do_task = mock.Mock() + collaborator_mock.do_task = mock.Mock(return_value={'metric': 0.0}) collaborator_mock.run() collaborator_mock.do_task.assert_called_with('task', round_number)