Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory Leak Logs #1159

Merged
merged 15 commits into from
Nov 21, 2024
68 changes: 67 additions & 1 deletion openfl/component/aggregator/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@


"""Aggregator module."""
import json
import queue
import time
from logging import getLogger
from threading import Lock

import psutil

from openfl.component.straggler_handling_functions import CutoffTimeBasedStragglerHandling
from openfl.databases import TensorDB
from openfl.interface.aggregation_functions import WeightedAverage
Expand All @@ -16,6 +19,8 @@
from openfl.utilities import TaskResultKey, TensorKey, change_tags
from openfl.utilities.logs import write_metric

AGG_MEM_FILE_NAME = "agg_mem_details.json"


class Aggregator:
"""An Aggregator is the central node in federated learning.
Expand Down Expand Up @@ -75,6 +80,7 @@ def __init__(
compression_pipeline=None,
db_store_rounds=1,
write_logs=False,
log_memory_usage=False,
log_metric_callback=None,
**kwargs,
):
Expand Down Expand Up @@ -122,7 +128,10 @@ def __init__(
)
self._end_of_round_check_done = [False] * rounds_to_train
self.stragglers = []

self.log_memory_usage = (
log_memory_usage # Flag can be enabled to get memory usage details for ubuntu system
)
self.memory_details = []
self.rounds_to_train = rounds_to_train

# if the collaborator requests a delta, this value is set to true
Expand Down Expand Up @@ -667,6 +676,53 @@ def send_local_task_results(

self._end_of_round_with_stragglers_check()

def get_memory_usage(self, round_number, metric_origin):
"""Logs the memory usage statistics for the given round number.

This method retrieves the current virtual and swap memory usage statistics
using the psutil library, formats them into a dictionary, and logs the
information using the logger.

Args:
round_number (int): The current round number for which memory usage is being logged.
"""
process = psutil.Process()
self.logger.info(f"{metric_origin} process id is {process}")
virtual_memory = psutil.virtual_memory()
swap_memory = psutil.swap_memory()
memory_usage = {
"round_number": round_number,
"metric_origin": metric_origin,
"process_memory": round(process.memory_info().rss / (1024**2), 2),
"virtual_memory": {
"total": round(virtual_memory.total / (1024**2), 2),
"available": round(virtual_memory.available / (1024**2), 2),
"percent": virtual_memory.percent,
"used": round(virtual_memory.used / (1024**2), 2),
"free": round(virtual_memory.free / (1024**2), 2),
"active": round(virtual_memory.active / (1024**2), 2),
"inactive": round(virtual_memory.inactive / (1024**2), 2),
"buffers": round(virtual_memory.buffers / (1024**2), 2),
"cached": round(virtual_memory.cached / (1024**2), 2),
"shared": round(virtual_memory.shared / (1024**2), 2),
},
"swap_memory": {
"total": round(swap_memory.total / (1024**2), 2),
"used": round(swap_memory.used / (1024**2), 2),
"free": round(swap_memory.free / (1024**2), 2),
"percent": swap_memory.percent,
},
}
self.logger.info(
f"**************** End of round check: {metric_origin} Memory Logs ******************"
)
self.logger.info("Memory Usage: %s", memory_usage)
self.logger.info(
"*************************************************************************************"
)

return memory_usage

def _end_of_round_with_stragglers_check(self):
"""
Checks if the minimum required collaborators have reported their results,
Expand Down Expand Up @@ -966,6 +1022,11 @@ def _end_of_round_check(self):
for task_name in all_tasks:
self._compute_validation_related_task_metrics(task_name)

if self.log_memory_usage:
# This is the place to check the memory usage of the aggregator
memory_detail = self.get_memory_usage(self.round_number, "aggregator")
self.memory_details.append(memory_detail)

# Once all of the task results have been processed
self._end_of_round_check_done[self.round_number] = True

Expand All @@ -981,6 +1042,11 @@ def _end_of_round_check(self):

# TODO This needs to be fixed!
if self._time_to_quit():
# Write self.memory_details to a file
if self.log_memory_usage:
self.logger.info("Writing memory details to file...")
with open(AGG_MEM_FILE_NAME, "w") as f:
json.dump(self.memory_details, f, indent=4)
self.logger.info("Experiment Completed. Cleaning up...")
else:
self.logger.info("Starting round %s...", self.round_number)
Expand Down
68 changes: 66 additions & 2 deletions openfl/component/collaborator/collaborator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@


"""Collaborator module."""

import json
from enum import Enum
from logging import getLogger
from time import sleep
from typing import Tuple

import psutil

from openfl.databases import TensorDB
from openfl.pipelines import NoCompressionPipeline, TensorCodec
from openfl.protocols import utils
Expand Down Expand Up @@ -80,6 +82,7 @@ def __init__(
delta_updates=False,
compression_pipeline=None,
db_store_rounds=1,
log_memory_usage=False,
**kwargs,
):
"""Initialize the Collaborator object.
Expand Down Expand Up @@ -123,7 +126,9 @@ def __init__(
self.delta_updates = delta_updates

self.client = client

self.log_memory_usage = (
log_memory_usage # Flag can be enabled to get memory usage details for ubuntu system
)
self.task_config = task_config

self.logger = getLogger(__name__)
Expand Down Expand Up @@ -158,6 +163,7 @@ def set_available_devices(self, cuda: Tuple[str] = ()):

def run(self):
"""Run the collaborator."""
memory_details = []
while True:
tasks, round_number, sleep_time, time_to_quit = self.get_tasks()
if time_to_quit:
Expand All @@ -171,6 +177,16 @@ def run(self):

# Cleaning tensor db
self.tensor_db.clean_up(self.db_store_rounds)
if self.log_memory_usage:
# This is the place to check the memory usage of the collaborator
memory_detail = self.get_memory_usage(
round_number, metric_origin=self.collaborator_name
)
memory_details.append(memory_detail)
if self.log_memory_usage:
# Write json file with memory usage details and collabrator name
with open(f"{self.collaborator_name}_mem_details.json", "w") as f:
json.dump(memory_details, f, indent=4)

self.logger.info("End of Federation reached. Exiting...")

Expand Down Expand Up @@ -588,3 +604,51 @@ def named_tensor_to_nparray(self, named_tensor):
self.tensor_db.cache_tensor({decompressed_tensor_key: decompressed_nparray})

return decompressed_nparray

def get_memory_usage(self, round_number, metric_origin):
"""
Logs the memory usage statistics for the given round number.

This method retrieves the current virtual and swap memory usage statistics
using the psutil library, formats them into a dictionary, and logs the
information using the logger.

Args:
round_number (int): The current round number for which memory usage is being logged.
"""
process = psutil.Process()
self.logger.info(f"{metric_origin} process id is {process}")
virtual_memory = psutil.virtual_memory()
swap_memory = psutil.swap_memory()
memory_usage = {
"round_number": round_number,
"metric_origin": metric_origin,
"process_memory": round(process.memory_info().rss / (1024**2), 2),
"virtual_memory": {
"total": round(virtual_memory.total / (1024**2), 2),
"available": round(virtual_memory.available / (1024**2), 2),
"percent": virtual_memory.percent,
"used": round(virtual_memory.used / (1024**2), 2),
"free": round(virtual_memory.free / (1024**2), 2),
"active": round(virtual_memory.active / (1024**2), 2),
"inactive": round(virtual_memory.inactive / (1024**2), 2),
"buffers": round(virtual_memory.buffers / (1024**2), 2),
"cached": round(virtual_memory.cached / (1024**2), 2),
"shared": round(virtual_memory.shared / (1024**2), 2),
},
"swap_memory": {
"total": round(swap_memory.total / (1024**2), 2),
"used": round(swap_memory.used / (1024**2), 2),
"free": round(swap_memory.free / (1024**2), 2),
"percent": swap_memory.percent,
},
}
self.logger.info(
f"**************** End of round check: {metric_origin} Memory Logs ******************"
)
self.logger.info("Memory Usage: %s", memory_usage)
self.logger.info(
"*************************************************************************************"
)

return memory_usage
14 changes: 11 additions & 3 deletions tests/end_to_end/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# Define a named tuple to store the objects for model owner, aggregator, and collaborators
federation_fixture = collections.namedtuple(
"federation_fixture",
"model_owner, aggregator, collaborators, model_name, disable_client_auth, disable_tls, workspace_path, results_dir",
"model_owner, aggregator, collaborators, model_name, disable_client_auth, disable_tls, workspace_path, results_dir, num_rounds",
)


Expand Down Expand Up @@ -62,6 +62,11 @@ def pytest_addoption(parser):
action="store_true",
help="Disable TLS for communication",
)
parser.addoption(
"--log_memory_usage",
action="store_true",
help="Enable memory log in collaborators and aggregator",
)


@pytest.fixture(scope="session", autouse=True)
Expand Down Expand Up @@ -234,14 +239,16 @@ def fx_federation(request, pytestconfig):
num_rounds = args.num_rounds
disable_client_auth = args.disable_client_auth
disable_tls = args.disable_tls
log_memory_usage = args.log_memory_usage

log.info(
f"Running federation setup using Task Runner API on single machine with below configurations:\n"
f"\tNumber of collaborators: {num_collaborators}\n"
f"\tNumber of rounds: {num_rounds}\n"
f"\tModel name: {model_name}\n"
f"\tClient authentication: {not disable_client_auth}\n"
f"\tTLS: {not disable_tls}"
f"\tTLS: {not disable_tls}\n"
f"\tMemory Logs: {log_memory_usage}"
)

# Validate the model name and create the workspace name
Expand All @@ -251,7 +258,7 @@ def fx_federation(request, pytestconfig):
workspace_name = f"workspace_{model_name}"

# Create model owner object and the workspace for the model
model_owner = participants.ModelOwner(workspace_name, model_name)
model_owner = participants.ModelOwner(workspace_name, model_name, log_memory_usage)
try:
workspace_path = model_owner.create_workspace(results_dir=results_dir)
except Exception as e:
Expand Down Expand Up @@ -318,4 +325,5 @@ def fx_federation(request, pytestconfig):
disable_tls=disable_tls,
workspace_path=workspace_path,
results_dir=results_dir,
num_rounds=num_rounds,
)
8 changes: 7 additions & 1 deletion tests/end_to_end/models/participants.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ class ModelOwner:
4. Importing and exporting the workspace etc.
"""

def __init__(self, workspace_name, model_name):
def __init__(self, workspace_name, model_name, log_memory_usage):
"""
Initialize the ModelOwner class
Args:
workspace_name (str): Workspace name
model_name (str): Model name
log_memory_usage (bool): Memory Log flag
"""
self.workspace_name = workspace_name
self.model_name = model_name
Expand All @@ -38,6 +39,7 @@ def __init__(self, workspace_name, model_name):
self.plan_path = None
self.num_collaborators = constants.NUM_COLLABORATORS
self.rounds_to_train = constants.NUM_ROUNDS
self.log_memory_usage = log_memory_usage

def create_workspace(self, results_dir=None):
"""
Expand Down Expand Up @@ -132,6 +134,10 @@ def modify_plan(self, new_rounds=None, num_collaborators=None, disable_client_au
data = yaml.load(fp, Loader=yaml.FullLoader)

data["aggregator"]["settings"]["rounds_to_train"] = int(self.rounds_to_train)
# Memory Leak related
data["aggregator"]["settings"]["log_memory_usage"] = self.log_memory_usage
data["collaborator"]["settings"]["log_memory_usage"] = self.log_memory_usage

data["data_loader"]["settings"]["collaborator_count"] = int(self.num_collaborators)
data["network"]["settings"]["disable_client_auth"] = disable_client_auth
data["network"]["settings"]["tls"] = not disable_tls
Expand Down
2 changes: 2 additions & 0 deletions tests/end_to_end/utils/conftest_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def parse_arguments():
- model_name (str, default="torch_cnn_mnist"): Model name
- disable_client_auth (bool): Disable client authentication
- disable_tls (bool): Disable TLS for communication
- log_memory_usage (bool): Enable Memory leak logs

Raises:
SystemExit: If the required arguments are not provided or if any argument parsing error occurs.
Expand All @@ -32,6 +33,7 @@ def parse_arguments():
parser.add_argument("--model_name", type=str, help="Model name")
parser.add_argument("--disable_client_auth", action="store_true", help="Disable client authentication")
parser.add_argument("--disable_tls", action="store_true", help="Disable TLS for communication")
parser.add_argument("--log_memory_usage", action="store_true", help="Enable Memory leak logs")
args = parser.parse_known_args()[0]
return args

Expand Down
5 changes: 3 additions & 2 deletions tests/end_to_end/utils/federation_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def verify_federation_run_completion(fed_obj, results):
executor.submit(
_verify_completion_for_participant,
participant,
fed_obj.num_rounds,
results[i]
)
for i, participant in enumerate(fed_obj.collaborators + [fed_obj.aggregator])
Expand All @@ -99,7 +100,7 @@ def verify_federation_run_completion(fed_obj, results):
return all(results)


def _verify_completion_for_participant(participant, result_file):
def _verify_completion_for_participant(participant, num_rounds, result_file, time_for_each_round=100):
"""
Verify the completion of the process for the participant
Args:
Expand All @@ -109,7 +110,7 @@ def _verify_completion_for_participant(participant, result_file):
bool: True if successful, else False
"""
# Wait for the successful output message to appear in the log till timeout
timeout = 900 # in seconds
timeout = 300 + ( time_for_each_round * num_rounds ) # in seconds
log.info(f"Printing the last line of the log file for {participant.name} to track the progress")
with open(result_file, 'r') as file:
content = file.read()
Expand Down
Loading