From 11ba014a61e79ff64851e6794adaffe42c432eb4 Mon Sep 17 00:00:00 2001 From: Feda Curic Date: Fri, 15 Dec 2023 14:04:52 +0100 Subject: [PATCH] Implement adaptive batch size --- pyproject.toml | 1 + src/ert/analysis/_es_update.py | 52 ++++++++++++++++++++++++++++------ 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 6c5a6b28cbe..e239e87bbd2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -135,6 +135,7 @@ types = [ "types-decorator", "types-docutils", "types-tqdm", + "types-psutil" ] [tool.setuptools] diff --git a/src/ert/analysis/_es_update.py b/src/ert/analysis/_es_update.py index c86f8df5073..8e07c3fc8c7 100644 --- a/src/ert/analysis/_es_update.py +++ b/src/ert/analysis/_es_update.py @@ -21,6 +21,7 @@ import iterative_ensemble_smoother as ies import numpy as np +import psutil import xarray as xr from iterative_ensemble_smoother.experimental import ( AdaptiveESMDA, @@ -593,15 +594,41 @@ def analysis_ES( ) if module.localization: num_params = temp_storage[param_group.name].shape[0] - batch_size = min(1000, num_params) - batches = _split_by_batchsize(np.arange(0, num_params), batch_size) - progress_callback( - AnalysisStatusEvent( - msg=f"Running localization on {num_params} parameters,{num_obs} responses, {ensemble_size} realizations and {len(batches)} batches" - ) + # Calculate adaptive batch size. + # Adaptive Localization calculates the cross-covariance between + # parameters and responses. + # Cross-covariance is a matrix with shape num_params x num_obs + # which may be larger than memory. + + # From `psutil` documentation: + # - available: + # the memory that can be given instantly to processes without the + # system going into swap. + # This is calculated by summing different memory values depending + # on the platform and it is supposed to be used to monitor actual + # memory usage in a cross platform fashion. + available_memory_bytes = psutil.virtual_memory().available + memory_safety_factor = 0.8 + bytes_in_float64 = 8 + batch_size = min( + int( + np.floor( + available_memory_bytes + * memory_safety_factor + / (num_obs * bytes_in_float64) + ) + ), + num_params, ) + batches = _split_by_batchsize(np.arange(0, num_params), batch_size) + + log_msg = f"Running localization on {num_params} parameters, {num_obs} responses, {ensemble_size} realizations and {len(batches)} batches" + _logger.info(log_msg) + progress_callback(AnalysisStatusEvent(msg=log_msg)) + + start = time.time() for param_batch_idx in TimedIterator(batches, progress_callback): X_local = temp_storage[param_group.name][param_batch_idx, :] temp_storage[param_group.name][ @@ -615,6 +642,9 @@ def analysis_ES( cov_YY=cov_YY, verbose=False, ) + _logger.info( + f"Adaptive Localization of {param_group} completed in {(time.time() - start) / 60} minutes" + ) else: # Use low-level ies API to allow looping over parameters @@ -632,10 +662,14 @@ def analysis_ES( # Update manually using global transition matrix T temp_storage[param_group.name] = X_local @ T - progress_callback( - AnalysisStatusEvent(msg=f"Storing data for {param_group.name}..") - ) + log_msg = f"Storing data for {param_group.name}.." + _logger.info(log_msg) + progress_callback(AnalysisStatusEvent(msg=log_msg)) + start = time.time() _save_temp_storage_to_disk(target_fs, temp_storage, iens_active_index) + _logger.info( + f"Storing data for {param_group.name} completed in {(time.time() - start) / 60} minutes" + ) # Finally, if some parameter groups have not been updated we need to copy the parameters # from the parent ensemble.