Skip to content

Commit

Permalink
Rename array_name to name in runtime following #337
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Feb 12, 2024
1 parent 9ad8fbe commit e99b0da
Show file tree
Hide file tree
Showing 10 changed files with 15 additions and 17 deletions.
10 changes: 5 additions & 5 deletions cubed/extensions/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def on_compute_start(self, event):
primitive_op = node["primitive_op"]
plan.append(
dict(
array_name=name,
name=name,
op_name=node["op_name"],
projected_mem=primitive_op.projected_mem,
reserved_mem=primitive_op.reserved_mem,
Expand Down Expand Up @@ -50,7 +50,7 @@ def analyze(plan_df, events_df):
plan_df["reserved_mem_mb"] = plan_df["reserved_mem"] / 1_000_000
plan_df = plan_df[
[
"array_name",
"name",
"op_name",
"projected_mem_mb",
"reserved_mem_mb",
Expand All @@ -68,7 +68,7 @@ def analyze(plan_df, events_df):
)

# find per-array stats
df = events_df.groupby("array_name", as_index=False).agg(
df = events_df.groupby("name", as_index=False).agg(
{
"peak_measured_mem_start_mb": ["min", "mean", "max"],
"peak_measured_mem_end_mb": ["max"],
Expand All @@ -78,7 +78,7 @@ def analyze(plan_df, events_df):

# flatten multi-index
df.columns = ["_".join(a).rstrip("_") for a in df.columns.to_flat_index()]
df = df.merge(plan_df, on="array_name")
df = df.merge(plan_df, on="name")

def projected_mem_utilization(row):
return row["peak_measured_mem_end_mb_max"] / row["projected_mem_mb"]
Expand All @@ -88,7 +88,7 @@ def projected_mem_utilization(row):
)
df = df[
[
"array_name",
"name",
"op_name",
"num_tasks",
"peak_measured_mem_start_mb_max",
Expand Down
4 changes: 1 addition & 3 deletions cubed/extensions/timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ def on_compute_end(self, event):
def create_timeline(stats, start_tstamp, end_tstamp, dst=None):
stats_df = pd.DataFrame(stats)

stats_df = stats_df.sort_values(
by=["task_create_tstamp", "array_name"], ascending=True
)
stats_df = stats_df.sort_values(by=["task_create_tstamp", "name"], ascending=True)

total_calls = len(stats_df)

Expand Down
2 changes: 1 addition & 1 deletion cubed/extensions/tqdm.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def on_compute_end(self, event):
pbar.close()

def on_task_end(self, event):
self.pbars[event.array_name].update(event.num_tasks)
self.pbars[event.name].update(event.num_tasks)


@contextlib.contextmanager
Expand Down
2 changes: 1 addition & 1 deletion cubed/runtime/executors/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async def async_map_unordered(
if return_stats:
result, stats = task.result()
if name is not None:
stats["array_name"] = name
stats["name"] = name
stats["task_create_tstamp"] = task_create_tstamp
yield result, stats
else:
Expand Down
2 changes: 1 addition & 1 deletion cubed/runtime/executors/coiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ def execute_dag(
for _, stats in coiled_function.map(input, config=pipeline.config):
if callbacks is not None:
if name is not None:
stats["array_name"] = name
stats["name"] = name
handle_callbacks(callbacks, stats)
2 changes: 1 addition & 1 deletion cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def execute_dag(
def standardise_lithops_stats(future: RetryingFuture) -> Dict[str, Any]:
stats = future.stats
return dict(
array_name=future.group_name,
name=future.group_name,
task_create_tstamp=stats["host_job_create_tstamp"],
function_start_tstamp=stats["worker_func_start_tstamp"],
function_end_tstamp=stats["worker_func_end_tstamp"],
Expand Down
2 changes: 1 addition & 1 deletion cubed/runtime/executors/modal.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def execute_dag(
order_outputs=False,
kwargs=dict(func=pipeline.function, config=pipeline.config),
):
stats["array_name"] = name
stats["name"] = name
stats["task_create_tstamp"] = task_create_tstamp
handle_callbacks(callbacks, stats)

Expand Down
2 changes: 1 addition & 1 deletion cubed/runtime/executors/modal_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async def map_unordered(
if return_stats:
result, stats = result
if name is not None:
stats["array_name"] = name
stats["name"] = name
stats["task_create_tstamp"] = task_create_tstamp
yield result, stats
else:
Expand Down
2 changes: 1 addition & 1 deletion cubed/runtime/executors/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ def execute_dag(
for m in pipeline.mappable:
exec_stage_func(pipeline.function, m, config=pipeline.config)
if callbacks is not None:
event = TaskEndEvent(array_name=name)
event = TaskEndEvent(name=name)
[callback.on_task_end(event) for callback in callbacks]
4 changes: 2 additions & 2 deletions cubed/runtime/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class ComputeEndEvent:
class TaskEndEvent:
"""Callback information about a completed task (or tasks)."""

array_name: str
"""Name of the array that the task is for."""
name: str
"""Name of the operation that the task is for."""

num_tasks: int = 1
"""Number of tasks that this event applies to (default 1)."""
Expand Down

0 comments on commit e99b0da

Please sign in to comment.