diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index 4dc33409..e18a0b9f 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -1,8 +1,10 @@ +import itertools +from memory_profiler import profile from time import time -from typing import Callable, Dict, List, Any, Tuple, Union, Sequence, Mapping +from typing import Callable, Dict, Generator, List, Any, Tuple, Union, Sequence, Mapping from tqdm.auto import tqdm -from cadCAD.utils import flatten +from cadCAD.utils import flatten, lazy_flatten from cadCAD.utils.execution import print_exec_info from cadCAD.configuration import Configuration, Processor from cadCAD.configuration.utils import TensorFieldReport, configs_as_objs, configs_as_dicts @@ -80,6 +82,7 @@ def __init__(self, self.configs = configs self.empty_return = empty_return + @profile def execute(self) -> Tuple[object, object, Dict[str, object]]: if self.empty_return is True: return [], [], [] @@ -142,21 +145,44 @@ def get_final_dist_results(simulations: List[StateHistory], psu, ep) for psu, ep in list(zip(psus, eps))] return simulations, tensor_fields, sessions + def get_final_results_lazy(simulations: Generator, + psus: List[StateUpdateBlocks], + eps, + sessions: List[SessionDict], + remote_threshold: int): + is_generator: bool = isinstance(simulations, Generator) + if is_generator == False: + raise ValueError( + 'Invalid simulation results (Executor output is not a Generator required for lazy execution)') + + tensor_fields = [] + # NOTE here we change the result type to iterable + tensor_fields = itertools.chain.from_iterable( + map(create_tensor_field, zip(psus, eps))) + + flat_simulations = map( + lazy_flatten, map(lazy_flatten, simulations)) + + # NOTE here we change the result type, which is now an iterable + iterable_flat_simulations = itertools.chain.from_iterable( + flat_simulations) + + return iterable_flat_simulations, tensor_fields, sessions + def get_final_results(simulations: List[StateHistory], psus: List[StateUpdateBlocks], eps, sessions: List[SessionDict], remote_threshold: int): - + # if list of lists of lists of dicts: do flatten # if list of dicts: do not flatetn # else raise error - init: bool = isinstance(simulations, Sequence) failed_1 = False failed_2 = False - + try: init: bool = isinstance(simulations, Sequence) dont_flatten = init & isinstance(simulations[0], Mapping) @@ -174,8 +200,8 @@ def get_final_results(simulations: List[StateHistory], do_flatten = False if failed_1 and failed_2: - raise ValueError('Invalid simulation results (Executor output is not list[dict] or list[list[list[dict]]])') - + raise ValueError( + 'Invalid simulation results (Executor output is not list[dict] or list[list[list[dict]]])') flat_timesteps, tensor_fields = [], [] for sim_result, psu, ep in tqdm(list(zip(simulations, psus, eps)), @@ -184,7 +210,7 @@ def get_final_results(simulations: List[StateHistory], if do_flatten: flat_timesteps.append(flatten(sim_result)) tensor_fields.append(create_tensor_field(psu, ep)) - + if do_flatten: flat_simulations = flatten(flat_timesteps) else: @@ -209,15 +235,19 @@ def get_final_results(simulations: List[StateHistory], else: raise ValueError("Invalid execution mode specified") - print("Execution Method: " + self.exec_method.__name__) simulations_results = self.exec_method( sim_executors, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, RunIDs, ExpIDs, SubsetIDs, SubsetWindows, original_N, self.additional_objs ) - final_result = get_final_results( - simulations_results, partial_state_updates, eps, sessions, remote_threshold) + if (self.additional_objs is not None and self.additional_objs['lazy_eval']): + final_result = get_final_results_lazy( + simulations_results, partial_state_updates, eps, sessions, remote_threshold) + else: + final_result = get_final_results( + simulations_results, partial_state_updates, eps, sessions, remote_threshold) + elif self.exec_context == ExecutionMode.distributed: print("Execution Method: " + self.exec_method.__name__) simulations_results = self.exec_method( @@ -228,6 +258,6 @@ def get_final_results(simulations: List[StateHistory], simulations_results, partial_state_updates, eps, sessions) t2 = time() - print(f"Total execution time: {t2 - t1 :.2f}s") + print(f"Total execution time: {t2 - t1:.2f}s") return final_result diff --git a/cadCAD/engine/execution.py b/cadCAD/engine/execution.py index a88ed53c..29073f4b 100644 --- a/cadCAD/engine/execution.py +++ b/cadCAD/engine/execution.py @@ -1,11 +1,14 @@ import os -from typing import Callable, Dict, List, Any, Tuple, Sequence +from typing import Callable, Dict, Generator, List, Any, Tuple, Sequence from pathos.multiprocessing import ProcessPool # type: ignore from collections import Counter from cadCAD.types import * -from cadCAD.utils import flatten +from cadCAD.utils import flatten, lazy_flatten import tempfile import pickle +import sys +from pympler import asizeof +from memory_profiler import profile import dill VarDictType = Dict[str, List[object]] @@ -51,6 +54,15 @@ def single_proc_exec( def process_executor(params): simulation_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n = params + result = [simulation_exec( + var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n + )] + return result + + +def process_executor_disk(params): + simulation_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n = params + result = [simulation_exec( var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n )] @@ -60,7 +72,20 @@ def process_executor(params): return temp_file.name -def file_handler(filenames: List[str]) -> List: +@profile +def file_handler_inc(filenames: List[str]) -> Generator[List, None, None]: + # combined_results = [] + for file_name in filenames: + with open(file_name, 'rb') as f: # Note 'rb' for binary reading mode + result = dill.load(f) + yield result # Yield the loaded result for immediate processing + + f.close() + os.remove(file_name) # Clean up temporary file + + +@profile +def file_handler(filenames: List[str]) -> Generator[List, None, None]: combined_results = [] for file_name in filenames: with open(file_name, 'rb') as f: # Note 'rb' for binary reading mode @@ -69,10 +94,10 @@ def file_handler(filenames: List[str]) -> List: result = None f.close() os.remove(file_name) # Clean up temporary file - del result # Delete the result from memory after processing return combined_results +@profile def parallelize_simulations( simulation_execs: List[ExecutorFunction], var_dict_list: List[Parameters], @@ -90,6 +115,7 @@ def parallelize_simulations( ): print(f'Execution Mode: parallelized') + lazy_eval = additional_objs['lazy_eval'] params = [ (sim_exec, var_dict, states_list, config, env_processes, @@ -99,10 +125,16 @@ def parallelize_simulations( env_processes_list, Ts, SimIDs, Ns, SubsetIDs, SubsetWindows) ] + if (lazy_eval): + with ProcessPool(maxtasksperchild=1) as pool: + temp_files = pool.map(process_executor_disk, params) + generator = file_handler_inc(temp_files) + return lazy_flatten(generator) + with ProcessPool(maxtasksperchild=1) as pool: - temp_files = pool.map(process_executor, params) + results = pool.map(process_executor, params) - return flatten(file_handler(temp_files)) + return flatten(results) def local_simulations( diff --git a/cadCAD/tools/execution/easy_run.py b/cadCAD/tools/execution/easy_run.py index 5ea737e1..c1fda147 100644 --- a/cadCAD/tools/execution/easy_run.py +++ b/cadCAD/tools/execution/easy_run.py @@ -42,6 +42,7 @@ def easy_run( drop_substeps=True, exec_mode='local', deepcopy_off=False, + lazy_eval=False ) -> pd.DataFrame: """ Run cadCAD simulations without headaches. @@ -66,7 +67,10 @@ def easy_run( _exec_mode = ExecutionMode().local_mode elif exec_mode == 'single': _exec_mode = ExecutionMode().single_mode - exec_context = ExecutionContext(_exec_mode, additional_objs={'deepcopy_off': deepcopy_off}) + exec_context = ExecutionContext(_exec_mode, additional_objs={ + 'deepcopy_off': deepcopy_off, + 'lazy_eval': lazy_eval + }) executor = Executor(exec_context=exec_context, configs=configs) # Execute the cadCAD experiment