diff --git a/cubed/extensions/history.py b/cubed/extensions/history.py index af12e1a9..8003b8b1 100644 --- a/cubed/extensions/history.py +++ b/cubed/extensions/history.py @@ -15,7 +15,7 @@ def on_compute_start(self, event): primitive_op = node["primitive_op"] plan.append( dict( - array_name=name, + name=name, op_name=node["op_name"], projected_mem=primitive_op.projected_mem, reserved_mem=primitive_op.reserved_mem, @@ -50,7 +50,7 @@ def analyze(plan_df, events_df): plan_df["reserved_mem_mb"] = plan_df["reserved_mem"] / 1_000_000 plan_df = plan_df[ [ - "array_name", + "name", "op_name", "projected_mem_mb", "reserved_mem_mb", @@ -68,7 +68,7 @@ def analyze(plan_df, events_df): ) # find per-array stats - df = events_df.groupby("array_name", as_index=False).agg( + df = events_df.groupby("name", as_index=False).agg( { "peak_measured_mem_start_mb": ["min", "mean", "max"], "peak_measured_mem_end_mb": ["max"], @@ -78,7 +78,7 @@ def analyze(plan_df, events_df): # flatten multi-index df.columns = ["_".join(a).rstrip("_") for a in df.columns.to_flat_index()] - df = df.merge(plan_df, on="array_name") + df = df.merge(plan_df, on="name") def projected_mem_utilization(row): return row["peak_measured_mem_end_mb_max"] / row["projected_mem_mb"] @@ -88,7 +88,7 @@ def projected_mem_utilization(row): ) df = df[ [ - "array_name", + "name", "op_name", "num_tasks", "peak_measured_mem_start_mb_max", diff --git a/cubed/extensions/timeline.py b/cubed/extensions/timeline.py index 9cc40491..2e9061e3 100644 --- a/cubed/extensions/timeline.py +++ b/cubed/extensions/timeline.py @@ -31,9 +31,7 @@ def on_compute_end(self, event): def create_timeline(stats, start_tstamp, end_tstamp, dst=None): stats_df = pd.DataFrame(stats) - stats_df = stats_df.sort_values( - by=["task_create_tstamp", "array_name"], ascending=True - ) + stats_df = stats_df.sort_values(by=["task_create_tstamp", "name"], ascending=True) total_calls = len(stats_df) diff --git a/cubed/extensions/tqdm.py b/cubed/extensions/tqdm.py index 72796147..5f1dc015 100644 --- a/cubed/extensions/tqdm.py +++ b/cubed/extensions/tqdm.py @@ -31,7 +31,7 @@ def on_compute_end(self, event): pbar.close() def on_task_end(self, event): - self.pbars[event.array_name].update(event.num_tasks) + self.pbars[event.name].update(event.num_tasks) @contextlib.contextmanager diff --git a/cubed/runtime/executors/asyncio.py b/cubed/runtime/executors/asyncio.py index 0ab20900..0de3fabc 100644 --- a/cubed/runtime/executors/asyncio.py +++ b/cubed/runtime/executors/asyncio.py @@ -58,7 +58,7 @@ async def async_map_unordered( if return_stats: result, stats = task.result() if name is not None: - stats["array_name"] = name + stats["name"] = name stats["task_create_tstamp"] = task_create_tstamp yield result, stats else: diff --git a/cubed/runtime/executors/coiled.py b/cubed/runtime/executors/coiled.py index bd0e111b..c60b63c1 100644 --- a/cubed/runtime/executors/coiled.py +++ b/cubed/runtime/executors/coiled.py @@ -35,5 +35,5 @@ def execute_dag( for _, stats in coiled_function.map(input, config=pipeline.config): if callbacks is not None: if name is not None: - stats["array_name"] = name + stats["name"] = name handle_callbacks(callbacks, stats) diff --git a/cubed/runtime/executors/lithops.py b/cubed/runtime/executors/lithops.py index 25d9b93a..79d594f8 100644 --- a/cubed/runtime/executors/lithops.py +++ b/cubed/runtime/executors/lithops.py @@ -221,7 +221,7 @@ def execute_dag( def standardise_lithops_stats(future: RetryingFuture) -> Dict[str, Any]: stats = future.stats return dict( - array_name=future.group_name, + name=future.group_name, task_create_tstamp=stats["host_job_create_tstamp"], function_start_tstamp=stats["worker_func_start_tstamp"], function_end_tstamp=stats["worker_func_end_tstamp"], diff --git a/cubed/runtime/executors/modal.py b/cubed/runtime/executors/modal.py index fbb265b4..c00f2057 100644 --- a/cubed/runtime/executors/modal.py +++ b/cubed/runtime/executors/modal.py @@ -136,7 +136,7 @@ def execute_dag( order_outputs=False, kwargs=dict(func=pipeline.function, config=pipeline.config), ): - stats["array_name"] = name + stats["name"] = name stats["task_create_tstamp"] = task_create_tstamp handle_callbacks(callbacks, stats) diff --git a/cubed/runtime/executors/modal_async.py b/cubed/runtime/executors/modal_async.py index 91725336..a9e0ac1c 100644 --- a/cubed/runtime/executors/modal_async.py +++ b/cubed/runtime/executors/modal_async.py @@ -51,7 +51,7 @@ async def map_unordered( if return_stats: result, stats = result if name is not None: - stats["array_name"] = name + stats["name"] = name stats["task_create_tstamp"] = task_create_tstamp yield result, stats else: diff --git a/cubed/runtime/executors/python.py b/cubed/runtime/executors/python.py index e55f712c..4edfeb37 100644 --- a/cubed/runtime/executors/python.py +++ b/cubed/runtime/executors/python.py @@ -28,5 +28,5 @@ def execute_dag( for m in pipeline.mappable: exec_stage_func(pipeline.function, m, config=pipeline.config) if callbacks is not None: - event = TaskEndEvent(array_name=name) + event = TaskEndEvent(name=name) [callback.on_task_end(event) for callback in callbacks] diff --git a/cubed/runtime/types.py b/cubed/runtime/types.py index 898c244e..d7e46b62 100644 --- a/cubed/runtime/types.py +++ b/cubed/runtime/types.py @@ -47,8 +47,8 @@ class ComputeEndEvent: class TaskEndEvent: """Callback information about a completed task (or tasks).""" - array_name: str - """Name of the array that the task is for.""" + name: str + """Name of the operation that the task is for.""" num_tasks: int = 1 """Number of tasks that this event applies to (default 1)."""