Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce callbacks API #1195

Merged

Conversation

MasterSkepticista
Copy link
Collaborator

@MasterSkepticista MasterSkepticista commented Dec 5, 2024

This PR introduces callbacks, an abstract way of running user-defined actions in TaskRunner or Workflow APIs.

Callbacks can be used to perform actions at different stages of the Federated Learning process. To create a custom callback, a user/developer needs to subclass openfl.callbacks.Callback and then implement the necessary methods. This creates a fine separation between framework/user code, and provides a step-function improvement in the time taken to implement ad-hoc tasks by eliminating the time spent by a user or a developer to contextualize framework code.

Callbacks can be triggered on the aggregator and collaborator side for the following events:

  • At the beginning of an experiment, e.g. for one-off setups, or state recovery
  • At the beginning of every round, e.g. for inspecting aggregated models
  • At the end of every round, e.g. for saving model weights
  • At the end of an experiment, e.g. to serve/upload artifacts

With this abstraction, common user-facing actions can be instrumented in the experiment without having to modify the source code. Here is an example:

# Define actions on Aggregator-side
aggregator = openfl.component.Aggregator(..., callbacks=[
    openfl.callbacks.MetricWriter("./logs", use_tensorboard=True), 
    openfl.callbacks.LambdaCallback(on_experiment_end=lambda: _start_model_server(exit_in_minutes=10)),
    openfl.callbacks.ModelCheckpoint("./save", task="aggregated_model_validation", monitor="loss", mode="min"),
])

# Define actions on Collaborator-side
collaborator = openfl.component.Collaborator(..., callbacks=[
    openfl.callbacks.MemoryProfiler("./logs"), 
    openfl.callbacks.LambdaCallback(
        on_experiment_start=lambda: print("Experiment started!"),
        on_round_begin=lambda round_num, _: print(f"Starting round {round_num}!"),
        on_round_end=lambda round_num, _: print(f"End of round {round_num}!"),
        on_experiment_end=lambda: print("Experiment ended successfully!"),
    ),
    openfl.callbacks.Recover("./snapshots"),  # State recovery before experiment starts, save on round end
])

This PR provides built-in support for:

  • MemoryProfiler: To use, set log_memory_usage: true in the plan file. Both agg/col are supported.
  • LambdaCallback: for arbitrary functions
  • MetricWriter: Supports both Tensorboard and text logging. To use, set write_logs: true in the plan file. Both agg/col are supported.

Future:

  • ModelCheckpoint: a new metric-configurable checkpointing logic
  • BackupAndRestore: a new state recovery and saving technique, for all participant resiliency (coming soon)

Signed-off-by: Shah, Karan <[email protected]>
Signed-off-by: Shah, Karan <[email protected]>
Signed-off-by: Shah, Karan <[email protected]>
@MasterSkepticista MasterSkepticista changed the title Dummy PR for hot-staging Introduce callbacks API Dec 10, 2024
Copy link
Collaborator

@rahulga1 rahulga1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@payalcha @noopurintel please check if it impacts any of your test for memory related info?

openfl/callbacks/callback_list.py Show resolved Hide resolved
openfl/callbacks/callback_list.py Show resolved Hide resolved
openfl/callbacks/callback_list.py Show resolved Hide resolved
openfl/component/aggregator/aggregator.py Show resolved Hide resolved
openfl/component/collaborator/collaborator.py Outdated Show resolved Hide resolved
openfl/component/collaborator/collaborator.py Show resolved Hide resolved
Copy link
Collaborator

@theakshaypant theakshaypant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had some questions related to the callbacks api.

openfl/callbacks/callback_list.py Outdated Show resolved Hide resolved
openfl/callbacks/callback_list.py Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be no use case for on_experiment_begin, on_round_begin or on_experiment_end on the aggregator side given that they are not invoked here?

Copy link
Collaborator Author

@MasterSkepticista MasterSkepticista Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I don't know, yet.

Aggregator does not seem to have a "clear" point of round begin. It just starts, and collaborators start calling RPCs asynchronously. Same for experiment begin. An argument could be made to call on_experiment_begin during the initialization of aggregator. Lots of loose ends, I am looking for an elegant approach here. WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For on_experiment_begin, I was thinking along the same lines, that it could be done when the aggregator is initialised. Similarly on_experiment_end could be denoted when the aggregator is being stopped (given that all tasks have been completed on collaborators) but one could argue the semantics on when the experiment actually begins or ends.
Only have a non-elegant way to verify on_round_begin for aggregators where a flag can be used to check the first time get_tasks is called in each round.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on_experiment_{begin,end} proposals are reasonable - during __init__ and stop(...).

Lets think more for on_round_begin. Can it become elegant if we take the liberty of minor refactoring?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to Aggregator._end_of_round_check, we can introduce a _beginning_of_round_check which is called at the start of get_tasks unless it time_to_quit.

We can log (read "store") when a collaborator calls get_tasks for each round. If the round number is not logged (again, read "stored"), we know this is the first get_tasks for the round and hence the beginning of the round.

The storage mentioned can be done in an in-memory dict similar to other attributes (such as collaborator_tasks_results) of the Aggregator.

Copy link
Collaborator Author

@MasterSkepticista MasterSkepticista Dec 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using this for now: 1cffb9d

Edit: Please review once too.

@psfoley
Copy link
Contributor

psfoley commented Dec 21, 2024

@MasterSkepticista Are the 3 remaining unresolved items in the checklist (TensorBoard, ModelCheckpoint, BackupAndRestore) intended to be addressed before merge?

Signed-off-by: Shah, Karan <[email protected]>
Signed-off-by: Shah, Karan <[email protected]>
Signed-off-by: Shah, Karan <[email protected]>
Signed-off-by: Shah, Karan <[email protected]>
@MasterSkepticista
Copy link
Collaborator Author

@psfoley ModelCheckpoint and state backup/restore are big-ticket items. Tensorboard support is added via MetricWriter callback. Updated PR description for usage methods.

This PR does not introduce any new plan arguments, for backwards compatibility. Callbacks will be exposed to users in a follow-up PR. So, ready to merge.

@MasterSkepticista MasterSkepticista added the pull_ready Ready to merge. label Dec 21, 2024
Copy link
Collaborator

@theakshaypant theakshaypant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a couple of questions which might be out of scope for this PR but still wanted to highlight them.

openfl/component/aggregator/aggregator.py Outdated Show resolved Hide resolved
def set_tensor_db(self, tensor_db):
self.tensor_db = tensor_db

def on_round_begin(self, round_num: int, logs=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my read through, I see that the logs named-arg is only used by MetricWriter.
For extensibility, would it make more sense to have a **kwargs instead?
We would still need to consider how these "dynamic" named args are sent by the aggregator/collaborator but this could be maybe thought of in subsequent/future changes to the callbacks API.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For extensibility, would it make more sense to have a **kwargs instead?

Generally speaking, **kwargs should be avoided. To pass information that specific callbacks would need, we have params that are declared at the time of instantiating callbacks list. This makes it clear what attributes callbacks will have access to, internally. Dynamic setting makes it harder to inspect and test code.

tensor_db: Optional `TensorDB` instance of the respective participant.
If provided, callbacks can access TensorDB for various actions.
params: Additional parameters saved for use within the callbacks.

Currently, logs are the default because most callbacks generally trigger actions based on metrics - say writing logs to file, trigger saving a checkpoint if logs["loss"] < 0.2, write logs to tensorboard etc.

That said, I agree that we can expand this argument list further - if more callbacks benefit from them.

@psfoley psfoley merged commit c280f10 into securefederatedai:develop Dec 23, 2024
25 checks passed
psfoley added a commit that referenced this pull request Dec 23, 2024
* Bump Python version to 3.10 - 3.12 (#1213)

* update python version to 3.12

Signed-off-by: yes <[email protected]>

* dummy commit

Signed-off-by: yes <[email protected]>

* dummy commit

Signed-off-by: yes <[email protected]>

* update python version to 3.12

Signed-off-by: yes <[email protected]>

* dummy commit

Signed-off-by: yes <[email protected]>

* dummy commit

Signed-off-by: yes <[email protected]>

* removed files

Signed-off-by: yes <[email protected]>

* reverted doc change

Signed-off-by: yes <[email protected]>

* added missing requirements for Workflow Interface Tests

Signed-off-by: yes <[email protected]>

* added tensorboard

Signed-off-by: yes <[email protected]>

---------

Signed-off-by: yes <[email protected]>

* Introduce `callbacks` API (#1195)

* Get rid of kwargs

Signed-off-by: Shah, Karan <[email protected]>

* Use module-level logger

Signed-off-by: Shah, Karan <[email protected]>

* Reduce keras verbosity

Signed-off-by: Shah, Karan <[email protected]>

* Remove all log_metric and log_memory_usage traces; add callback hooks

Signed-off-by: Shah, Karan <[email protected]>

* Add `openfl.callbacks` module

Signed-off-by: Shah, Karan <[email protected]>

* Include round_num for task callbacks

Signed-off-by: Shah, Karan <[email protected]>

* Add tensordb to callbacks

Signed-off-by: Shah, Karan <[email protected]>

* No round_num on task callbacks

Signed-off-by: Shah, Karan <[email protected]>

* Remove task boundary callbacks

Signed-off-by: Shah, Karan <[email protected]>

* Remove tb/model_ckpt. Add memory_profiler

Signed-off-by: Shah, Karan <[email protected]>

* Restore psutil and tbX

Signed-off-by: Shah, Karan <[email protected]>

* Format code

Signed-off-by: Shah, Karan <[email protected]>

* Define default callbacks

Signed-off-by: Shah, Karan <[email protected]>

* Add write_logs for bwd compat

Signed-off-by: Shah, Karan <[email protected]>

* Add log_metric_callback for bwd compat

Signed-off-by: Shah, Karan <[email protected]>

* Migrate to module-level logger for collaborator

Signed-off-by: Shah, Karan <[email protected]>

* Review comments

Signed-off-by: Shah, Karan <[email protected]>

* Add metric_writer

Signed-off-by: Shah, Karan <[email protected]>

* Add collaborator side metric logging

Signed-off-by: Shah, Karan <[email protected]>

* Make log dirs on exp begin

Signed-off-by: Shah, Karan <[email protected]>

* Do not print use_tls

Signed-off-by: Shah, Karan <[email protected]>

* Assume reportable metric to be a scalar

Signed-off-by: Shah, Karan <[email protected]>

* Add aggregator side callbacks

Signed-off-by: Shah, Karan <[email protected]>

* do_task test returns mock dict

Signed-off-by: Shah, Karan <[email protected]>

* Consistency changes

Signed-off-by: Shah, Karan <[email protected]>

* Add documentation hooks

Signed-off-by: Shah, Karan <[email protected]>

* Update docstring

Signed-off-by: Shah, Karan <[email protected]>

* Update docs hook

Signed-off-by: Shah, Karan <[email protected]>

* Remove all traces of log_metric_callback and write_metric

Signed-off-by: Shah, Karan <[email protected]>

* Do on_round_begin if not time_to_quit

Signed-off-by: Shah, Karan <[email protected]>

---------

Signed-off-by: Shah, Karan <[email protected]>

---------

Signed-off-by: yes <[email protected]>
Signed-off-by: Shah, Karan <[email protected]>
Co-authored-by: Shailesh Tanwar <[email protected]>
Co-authored-by: Karan Shah <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pull_ready Ready to merge.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants