From 9490d6c6216c9998804c07063517b3cce2b95eac Mon Sep 17 00:00:00 2001 From: Chris Pickett Date: Fri, 22 Nov 2024 09:55:28 -0500 Subject: [PATCH] Implement `labels` field on Flow, Deployment, FlowRun, and TaskRun (#16050) --- docs/v3/api-ref/rest-api/server/schema.json | 292 ++++++++++++++++++ src/prefect/client/schemas/actions.py | 5 + src/prefect/client/schemas/objects.py | 11 +- src/prefect/client/schemas/responses.py | 3 + .../database/migrations/MIGRATION-NOTES.md | 4 + ...428d_add_labels_column_to_flow_flowrun_.py | 59 ++++ ...8b51_add_labels_column_to_flow_flowrun_.py | 70 +++++ src/prefect/server/database/orm_models.py | 12 + src/prefect/server/models/deployments.py | 25 +- src/prefect/server/models/flow_runs.py | 53 +++- src/prefect/server/models/flows.py | 11 + src/prefect/server/models/task_runs.py | 27 +- src/prefect/server/schemas/actions.py | 20 ++ src/prefect/server/schemas/core.py | 25 ++ src/prefect/server/schemas/responses.py | 3 + src/prefect/types/__init__.py | 24 +- .../test_task_run_state_change_events.py | 7 + tests/server/models/test_deployments.py | 50 +++ .../orchestration/api/test_deployments.py | 9 + .../orchestration/api/test_flow_runs.py | 5 + tests/server/orchestration/api/test_flows.py | 4 +- .../orchestration/api/test_task_runs.py | 7 + tests/telemetry/instrumentation_tester.py | 4 +- tests/test_flow_engine.py | 29 +- tests/test_types.py | 4 +- 25 files changed, 727 insertions(+), 36 deletions(-) create mode 100644 src/prefect/server/database/migrations/versions/postgresql/2024_11_15_150706_68a44144428d_add_labels_column_to_flow_flowrun_.py create mode 100644 src/prefect/server/database/migrations/versions/sqlite/2024_11_15_151042_5952a5498b51_add_labels_column_to_flow_flowrun_.py diff --git a/docs/v3/api-ref/rest-api/server/schema.json b/docs/v3/api-ref/rest-api/server/schema.json index aad0b16bc915..13571d0144d9 100644 --- a/docs/v3/api-ref/rest-api/server/schema.json +++ b/docs/v3/api-ref/rest-api/server/schema.json @@ -15242,6 +15242,40 @@ ] ] }, + "labels": { + "anyOf": [ + { + "additionalProperties": { + "anyOf": [ + { + "type": "boolean" + }, + { + "type": "integer" + }, + { + "type": "number" + }, + { + "type": "string" + } + ] + }, + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Labels", + "description": "A dictionary of key-value labels. Values can be strings, numbers, or booleans.", + "examples": [ + { + "key": "value1", + "key2": 42 + } + ] + }, "pull_steps": { "anyOf": [ { @@ -15981,6 +16015,33 @@ ] ] }, + "labels": { + "additionalProperties": { + "anyOf": [ + { + "type": "boolean" + }, + { + "type": "integer" + }, + { + "type": "number" + }, + { + "type": "string" + } + ] + }, + "type": "object", + "title": "Labels", + "description": "A dictionary of key-value labels. Values can be strings, numbers, or booleans.", + "examples": [ + { + "key": "value1", + "key2": 42 + } + ] + }, "work_queue_name": { "anyOf": [ { @@ -17184,6 +17245,40 @@ "tag-2" ] ] + }, + "labels": { + "anyOf": [ + { + "additionalProperties": { + "anyOf": [ + { + "type": "boolean" + }, + { + "type": "integer" + }, + { + "type": "number" + }, + { + "type": "string" + } + ] + }, + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Labels", + "description": "A dictionary of key-value labels. Values can be strings, numbers, or booleans.", + "examples": [ + { + "key": "value1", + "key2": 42 + } + ] } }, "type": "object", @@ -17217,6 +17312,40 @@ "tag-2" ] ] + }, + "labels": { + "anyOf": [ + { + "additionalProperties": { + "anyOf": [ + { + "type": "boolean" + }, + { + "type": "integer" + }, + { + "type": "number" + }, + { + "type": "string" + } + ] + }, + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Labels", + "description": "A dictionary of key-value labels. Values can be strings, numbers, or booleans.", + "examples": [ + { + "key": "value1", + "key2": 42 + } + ] } }, "additionalProperties": false, @@ -17616,6 +17745,40 @@ ] ] }, + "labels": { + "anyOf": [ + { + "additionalProperties": { + "anyOf": [ + { + "type": "boolean" + }, + { + "type": "integer" + }, + { + "type": "number" + }, + { + "type": "string" + } + ] + }, + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Labels", + "description": "A dictionary of key-value labels. Values can be strings, numbers, or booleans.", + "examples": [ + { + "key": "value1", + "key2": 42 + } + ] + }, "parent_task_run_id": { "anyOf": [ { @@ -17904,6 +18067,40 @@ ] ] }, + "labels": { + "anyOf": [ + { + "additionalProperties": { + "anyOf": [ + { + "type": "boolean" + }, + { + "type": "integer" + }, + { + "type": "number" + }, + { + "type": "string" + } + ] + }, + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Labels", + "description": "A dictionary of key-value labels. Values can be strings, numbers, or booleans.", + "examples": [ + { + "key": "value1", + "key2": 42 + } + ] + }, "idempotency_key": { "anyOf": [ { @@ -19360,6 +19557,33 @@ ] ] }, + "labels": { + "additionalProperties": { + "anyOf": [ + { + "type": "boolean" + }, + { + "type": "integer" + }, + { + "type": "number" + }, + { + "type": "string" + } + ] + }, + "type": "object", + "title": "Labels", + "description": "A dictionary of key-value labels. Values can be strings, numbers, or booleans.", + "examples": [ + { + "key": "value1", + "key2": 42 + } + ] + }, "parent_task_run_id": { "anyOf": [ { @@ -21967,6 +22191,40 @@ ] ] }, + "labels": { + "anyOf": [ + { + "additionalProperties": { + "anyOf": [ + { + "type": "boolean" + }, + { + "type": "integer" + }, + { + "type": "number" + }, + { + "type": "string" + } + ] + }, + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Labels", + "description": "A dictionary of key-value labels. Values can be strings, numbers, or booleans.", + "examples": [ + { + "key": "value1", + "key2": 42 + } + ] + }, "state_id": { "anyOf": [ { @@ -22239,6 +22497,40 @@ ] ] }, + "labels": { + "anyOf": [ + { + "additionalProperties": { + "anyOf": [ + { + "type": "boolean" + }, + { + "type": "integer" + }, + { + "type": "number" + }, + { + "type": "string" + } + ] + }, + "type": "object" + }, + { + "type": "null" + } + ], + "title": "Labels", + "description": "A dictionary of key-value labels. Values can be strings, numbers, or booleans.", + "examples": [ + { + "key": "value1", + "key2": 42 + } + ] + }, "task_inputs": { "additionalProperties": { "items": { diff --git a/src/prefect/client/schemas/actions.py b/src/prefect/client/schemas/actions.py index 2e918f5d588b..c6f7adf40d05 100644 --- a/src/prefect/client/schemas/actions.py +++ b/src/prefect/client/schemas/actions.py @@ -27,6 +27,7 @@ from prefect.settings import PREFECT_DEPLOYMENT_SCHEDULE_MAX_SCHEDULED_RUNS from prefect.types import ( MAX_VARIABLE_NAME_LENGTH, + KeyValueLabelsField, Name, NonEmptyishName, NonNegativeFloat, @@ -66,6 +67,7 @@ class FlowCreate(ActionBaseModel): description="A list of flow tags", examples=[["tag-1", "tag-2"]], ) + labels: KeyValueLabelsField class FlowUpdate(ActionBaseModel): @@ -159,6 +161,7 @@ def convert_to_strings(cls, values): description="Parameters for flow runs scheduled by the deployment.", ) tags: List[str] = Field(default_factory=list) + labels: KeyValueLabelsField pull_steps: Optional[List[dict]] = Field(None) work_queue_name: Optional[str] = Field(None) @@ -315,6 +318,7 @@ class TaskRunCreate(ActionBaseModel): default_factory=objects.TaskRunPolicy, ) tags: List[str] = Field(default_factory=list) + labels: KeyValueLabelsField task_inputs: Dict[ str, List[ @@ -357,6 +361,7 @@ class FlowRunCreate(ActionBaseModel): default_factory=objects.FlowRunPolicy ) tags: List[str] = Field(default_factory=list) + labels: KeyValueLabelsField idempotency_key: Optional[str] = Field(None) diff --git a/src/prefect/client/schemas/objects.py b/src/prefect/client/schemas/objects.py index 5a01ad8d340d..df4661d65d1b 100644 --- a/src/prefect/client/schemas/objects.py +++ b/src/prefect/client/schemas/objects.py @@ -53,7 +53,7 @@ from prefect.settings import PREFECT_CLOUD_API_URL, PREFECT_CLOUD_UI_URL from prefect.types import ( MAX_VARIABLE_NAME_LENGTH, - KeyValueLabels, + KeyValueLabelsField, Name, NonNegativeInteger, PositiveInteger, @@ -559,11 +559,7 @@ class FlowRun(ObjectBaseModel): description="A list of tags on the flow run", examples=[["tag-1", "tag-2"]], ) - labels: KeyValueLabels = Field( - default_factory=dict, - description="Prefect Cloud: A dictionary of key-value labels. Values can be strings, numbers, or booleans.", - examples=[{"key": "value1", "key2": 42}], - ) + labels: KeyValueLabelsField parent_task_run_id: Optional[UUID] = Field( default=None, description=( @@ -800,6 +796,7 @@ class TaskRun(ObjectBaseModel): description="A list of tags for the task run.", examples=[["tag-1", "tag-2"]], ) + labels: KeyValueLabelsField state_id: Optional[UUID] = Field( default=None, description="The id of the current task run state." ) @@ -1055,6 +1052,7 @@ class Flow(ObjectBaseModel): description="A list of flow tags", examples=[["tag-1", "tag-2"]], ) + labels: KeyValueLabelsField class DeploymentSchedule(ObjectBaseModel): @@ -1113,6 +1111,7 @@ class Deployment(ObjectBaseModel): description="A list of tags for the deployment", examples=[["tag-1", "tag-2"]], ) + labels: KeyValueLabelsField work_queue_name: Optional[str] = Field( default=None, description=( diff --git a/src/prefect/client/schemas/responses.py b/src/prefect/client/schemas/responses.py index 08e7b4f82115..29102b65f022 100644 --- a/src/prefect/client/schemas/responses.py +++ b/src/prefect/client/schemas/responses.py @@ -9,6 +9,7 @@ import prefect.client.schemas.objects as objects from prefect._internal.schemas.bases import ObjectBaseModel, PrefectBaseModel from prefect._internal.schemas.fields import CreatedBy, UpdatedBy +from prefect.types import KeyValueLabelsField from prefect.utilities.collections import AutoEnum from prefect.utilities.names import generate_slug @@ -201,6 +202,7 @@ class FlowRunResponse(ObjectBaseModel): description="A list of tags on the flow run", examples=[["tag-1", "tag-2"]], ) + labels: KeyValueLabelsField parent_task_run_id: Optional[UUID] = Field( default=None, description=( @@ -353,6 +355,7 @@ class DeploymentResponse(ObjectBaseModel): description="A list of tags for the deployment", examples=[["tag-1", "tag-2"]], ) + labels: KeyValueLabelsField work_queue_name: Optional[str] = Field( default=None, description=( diff --git a/src/prefect/server/database/migrations/MIGRATION-NOTES.md b/src/prefect/server/database/migrations/MIGRATION-NOTES.md index d6d04831ed31..285a6f88599f 100644 --- a/src/prefect/server/database/migrations/MIGRATION-NOTES.md +++ b/src/prefect/server/database/migrations/MIGRATION-NOTES.md @@ -8,6 +8,10 @@ Each time a database migration is written, an entry is included here with: This gives us a history of changes and will create merge conflicts if two migrations are made at once, flagging situations where a branch needs to be updated before merging. +# Add `labels` column to Flow, FlowRun, TaskRun, and Deployment +SQLite: `5952a5498b51` +Postgres: `68a44144428d` + # Migrate `Deployment.concurrency_limit` to a foreign key `Deployment.concurrency_limit_id` SQLite: `4ad4658cbefe` Postgres: `eaec5004771f` diff --git a/src/prefect/server/database/migrations/versions/postgresql/2024_11_15_150706_68a44144428d_add_labels_column_to_flow_flowrun_.py b/src/prefect/server/database/migrations/versions/postgresql/2024_11_15_150706_68a44144428d_add_labels_column_to_flow_flowrun_.py new file mode 100644 index 000000000000..7b5ee2a88d29 --- /dev/null +++ b/src/prefect/server/database/migrations/versions/postgresql/2024_11_15_150706_68a44144428d_add_labels_column_to_flow_flowrun_.py @@ -0,0 +1,59 @@ +"""Add `labels` column to Flow, FlowRun, TaskRun, and Deployment + +Revision ID: 68a44144428d +Revises: eaec5004771f +Create Date: 2024-11-15 15:07:06.141947 +""" + +import sqlalchemy as sa +from alembic import op + +import prefect + +# revision identifiers, used by Alembic. +revision = "68a44144428d" +down_revision = "eaec5004771f" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "deployment", + sa.Column( + "labels", + prefect.server.utilities.database.JSON(astext_type=sa.Text()), + nullable=True, + ), + ) + op.add_column( + "flow", + sa.Column( + "labels", + prefect.server.utilities.database.JSON(astext_type=sa.Text()), + nullable=True, + ), + ) + op.add_column( + "flow_run", + sa.Column( + "labels", + prefect.server.utilities.database.JSON(astext_type=sa.Text()), + nullable=True, + ), + ) + op.add_column( + "task_run", + sa.Column( + "labels", + prefect.server.utilities.database.JSON(astext_type=sa.Text()), + nullable=True, + ), + ) + + +def downgrade(): + op.drop_column("task_run", "labels") + op.drop_column("flow_run", "labels") + op.drop_column("flow", "labels") + op.drop_column("deployment", "labels") diff --git a/src/prefect/server/database/migrations/versions/sqlite/2024_11_15_151042_5952a5498b51_add_labels_column_to_flow_flowrun_.py b/src/prefect/server/database/migrations/versions/sqlite/2024_11_15_151042_5952a5498b51_add_labels_column_to_flow_flowrun_.py new file mode 100644 index 000000000000..e9be03a49591 --- /dev/null +++ b/src/prefect/server/database/migrations/versions/sqlite/2024_11_15_151042_5952a5498b51_add_labels_column_to_flow_flowrun_.py @@ -0,0 +1,70 @@ +"""Add `labels` column to Flow, FlowRun, TaskRun, and Deployment + +Revision ID: 5952a5498b51 +Revises: 4ad4658cbefe +Create Date: 2024-11-15 15:10:42.138653 + +""" + +import sqlalchemy as sa +from alembic import op + +import prefect + +# revision identifiers, used by Alembic. +revision = "5952a5498b51" +down_revision = "4ad4658cbefe" +branch_labels = None +depends_on = None + + +def upgrade(): + with op.batch_alter_table("deployment", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "labels", + prefect.server.utilities.database.JSON(astext_type=sa.Text()), + nullable=True, + ) + ) + + with op.batch_alter_table("flow", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "labels", + prefect.server.utilities.database.JSON(astext_type=sa.Text()), + nullable=True, + ) + ) + + with op.batch_alter_table("flow_run", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "labels", + prefect.server.utilities.database.JSON(astext_type=sa.Text()), + nullable=True, + ) + ) + + with op.batch_alter_table("task_run", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "labels", + prefect.server.utilities.database.JSON(astext_type=sa.Text()), + nullable=True, + ) + ) + + +def downgrade(): + with op.batch_alter_table("task_run", schema=None) as batch_op: + batch_op.drop_column("labels") + + with op.batch_alter_table("flow_run", schema=None) as batch_op: + batch_op.drop_column("labels") + + with op.batch_alter_table("flow", schema=None) as batch_op: + batch_op.drop_column("labels") + + with op.batch_alter_table("deployment", schema=None) as batch_op: + batch_op.drop_column("labels") diff --git a/src/prefect/server/database/orm_models.py b/src/prefect/server/database/orm_models.py index fb2643791d03..cfdc3c0645cd 100644 --- a/src/prefect/server/database/orm_models.py +++ b/src/prefect/server/database/orm_models.py @@ -141,6 +141,9 @@ class Flow(Base): tags: Mapped[List[str]] = mapped_column( JSON, server_default="[]", default=list, nullable=False ) + labels: Mapped[Union[schemas.core.KeyValueLabels, None]] = mapped_column( + JSON, nullable=True + ) flow_runs = sa.orm.relationship("FlowRun", back_populates="flow", lazy="raise") deployments = sa.orm.relationship("Deployment", back_populates="flow", lazy="raise") @@ -514,6 +517,9 @@ class FlowRun(Run): tags: Mapped[List[str]] = mapped_column( JSON, server_default="[]", default=list, nullable=False ) + labels: Mapped[Union[schemas.core.KeyValueLabels, None]] = mapped_column( + JSON, nullable=True + ) created_by: Mapped[Union[schemas.core.CreatedBy, None]] = mapped_column( Pydantic(schemas.core.CreatedBy), @@ -713,6 +719,9 @@ class TaskRun(Run): tags: Mapped[List[str]] = mapped_column( JSON, server_default="[]", default=list, nullable=False ) + labels: Mapped[Union[schemas.core.KeyValueLabels, None]] = mapped_column( + JSON, nullable=True + ) # TODO remove this foreign key for significant delete performance gains state_id = sa.Column( @@ -901,6 +910,9 @@ def job_variables(self): tags: Mapped[List[str]] = mapped_column( JSON, server_default="[]", default=list, nullable=False ) + labels: Mapped[Union[schemas.core.KeyValueLabels, None]] = mapped_column( + JSON, nullable=True + ) parameters = sa.Column(JSON, server_default="{}", default=dict, nullable=False) pull_steps = sa.Column(JSON, default=list, nullable=True) parameter_openapi_schema = sa.Column(JSON, default=dict, nullable=True) diff --git a/src/prefect/server/models/deployments.py b/src/prefect/server/models/deployments.py index 9e0beea34745..62c13dadc30a 100644 --- a/src/prefect/server/models/deployments.py +++ b/src/prefect/server/models/deployments.py @@ -4,7 +4,7 @@ """ import datetime -from typing import Dict, Iterable, List, Optional, Sequence, TypeVar +from typing import Dict, Iterable, List, Optional, Sequence, TypeVar, cast from uuid import UUID, uuid4 import pendulum @@ -82,6 +82,8 @@ async def create_deployment( # https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#the-set-clause deployment.updated = pendulum.now("UTC") # type: ignore[assignment] + deployment.labels = await with_system_labels_for_deployment(session, deployment) + schedules = deployment.schedules insert_values = deployment.model_dump_for_orm( exclude_unset=True, exclude={"schedules"} @@ -1082,3 +1084,24 @@ async def mark_deployments_not_ready( occurred=pendulum.now("UTC"), ) ) + + +async def with_system_labels_for_deployment( + session: AsyncSession, + deployment: schemas.core.Deployment, +) -> schemas.core.KeyValueLabels: + """Augment user supplied labels with system default labels for a deployment.""" + default_labels = cast( + schemas.core.KeyValueLabels, + { + "prefect.flow.id": str(deployment.flow_id), + }, + ) + + user_supplied_labels = deployment.labels or {} + + parent_labels = ( + await models.flows.read_flow_labels(session, deployment.flow_id) + ) or {} + + return parent_labels | default_labels | user_supplied_labels diff --git a/src/prefect/server/models/flow_runs.py b/src/prefect/server/models/flow_runs.py index 93acd1821139..fbb6f522f61b 100644 --- a/src/prefect/server/models/flow_runs.py +++ b/src/prefect/server/models/flow_runs.py @@ -6,7 +6,18 @@ import contextlib import datetime from itertools import chain -from typing import Any, Dict, List, Optional, Sequence, Tuple, Type, TypeVar, Union +from typing import ( + Any, + Dict, + List, + Optional, + Sequence, + Tuple, + Type, + TypeVar, + Union, + cast, +) from uuid import UUID import pendulum @@ -60,6 +71,10 @@ async def create_flow_run( now = pendulum.now("UTC") # model: Union[orm_models.FlowRun, None] = None + flow_run.labels = await with_system_labels_for_flow_run( + session=session, flow_run=flow_run + ) + flow_run_dict = dict( **flow_run.model_dump_for_orm( exclude={ @@ -582,3 +597,39 @@ async def read_flow_run_graph( max_nodes=PREFECT_API_MAX_FLOW_RUN_GRAPH_NODES.value(), max_artifacts=PREFECT_API_MAX_FLOW_RUN_GRAPH_ARTIFACTS.value(), ) + + +async def with_system_labels_for_flow_run( + session: AsyncSession, + flow_run: Union[schemas.core.FlowRun, schemas.actions.FlowRunCreate], +) -> schemas.core.KeyValueLabels: + """Augment user supplied labels with system default labels for a flow + run.""" + + default_labels = cast( + schemas.core.KeyValueLabels, + { + "prefect.flow.id": str(flow_run.flow_id), + }, + ) + + parent_labels: schemas.core.KeyValueLabels = {} + user_supplied_labels = flow_run.labels or {} + + # `deployment_id` is deprecated on `schemas.actions.FlowRunCreate`. Only + # check `deployment_id` if given an instance of a `schemas.core.FlowRun`. + if isinstance(flow_run, schemas.core.FlowRun) and flow_run.deployment_id: + default_labels["prefect.deployment.id"] = str(flow_run.deployment_id) + deployment = await models.deployments.read_deployment( + session, deployment_id=flow_run.deployment_id + ) + parent_labels = deployment.labels if deployment and deployment.labels else {} + else: + # If the flow run is not part of a deployment then we need to check for + # labels from the flow. We don't use this when there is a deployment as + # the deployment would have inherited the flow labels already. + parent_labels = ( + await models.flows.read_flow_labels(session, flow_run.flow_id) or {} + ) + + return parent_labels | default_labels | user_supplied_labels diff --git a/src/prefect/server/models/flows.py b/src/prefect/server/models/flows.py index be9040910821..ba3243468b01 100644 --- a/src/prefect/server/models/flows.py +++ b/src/prefect/server/models/flows.py @@ -280,3 +280,14 @@ async def delete_flow(session: AsyncSession, flow_id: UUID) -> bool: delete(orm_models.Flow).where(orm_models.Flow.id == flow_id) ) return result.rowcount > 0 + + +async def read_flow_labels( + session: AsyncSession, + flow_id: UUID, +) -> Union[schemas.core.KeyValueLabels, None]: + result = await session.execute( + select(orm_models.Flow.labels).where(orm_models.Flow.id == flow_id) + ) + + return result.scalar() diff --git a/src/prefect/server/models/task_runs.py b/src/prefect/server/models/task_runs.py index 35038c04743d..9f1be084612a 100644 --- a/src/prefect/server/models/task_runs.py +++ b/src/prefect/server/models/task_runs.py @@ -4,7 +4,7 @@ """ import contextlib -from typing import Any, Dict, Optional, Sequence, Type, TypeVar, Union +from typing import Any, Dict, Optional, Sequence, Type, TypeVar, Union, cast from uuid import UUID import pendulum @@ -59,6 +59,10 @@ async def create_task_run( now = pendulum.now("UTC") model: Union[orm_models.TaskRun, None] + task_run.labels = await with_system_labels_for_task_run( + session=session, task_run=task_run + ) + # if a dynamic key exists, we need to guard against conflicts if task_run.flow_run_id: insert_stmt = ( @@ -481,3 +485,24 @@ async def set_task_run_state( ) return result + + +async def with_system_labels_for_task_run( + session: AsyncSession, + task_run: schemas.core.TaskRun, +) -> schemas.core.KeyValueLabels: + """Augment user supplied labels with system default labels for a task + run.""" + + client_supplied_labels = task_run.labels or {} + default_labels = cast(schemas.core.KeyValueLabels, {}) + parent_labels: schemas.core.KeyValueLabels = {} + + if task_run.flow_run_id: + default_labels["prefect.flow-run.id"] = str(task_run.flow_run_id) + flow_run = await models.flow_runs.read_flow_run( + session=session, flow_run_id=task_run.flow_run_id + ) + parent_labels = flow_run.labels if flow_run and flow_run.labels else {} + + return parent_labels | default_labels | client_supplied_labels diff --git a/src/prefect/server/schemas/actions.py b/src/prefect/server/schemas/actions.py index 67719eead7dd..e95efe7c741d 100644 --- a/src/prefect/server/schemas/actions.py +++ b/src/prefect/server/schemas/actions.py @@ -80,6 +80,11 @@ class FlowCreate(ActionBaseModel): description="A list of flow tags", examples=[["tag-1", "tag-2"]], ) + labels: Union[schemas.core.KeyValueLabels, None] = Field( + default_factory=dict, + description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.", + examples=[{"key": "value1", "key2": 42}], + ) class FlowUpdate(ActionBaseModel): @@ -176,6 +181,11 @@ class DeploymentCreate(ActionBaseModel): description="A list of deployment tags.", examples=[["tag-1", "tag-2"]], ) + labels: Union[schemas.core.KeyValueLabels, None] = Field( + default_factory=dict, + description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.", + examples=[{"key": "value1", "key2": 42}], + ) pull_steps: Optional[List[dict]] = Field(None) work_queue_name: Optional[str] = Field(None) @@ -427,6 +437,11 @@ class TaskRunCreate(ActionBaseModel): description="A list of tags for the task run.", examples=[["tag-1", "tag-2"]], ) + labels: Union[schemas.core.KeyValueLabels, None] = Field( + default_factory=dict, + description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.", + examples=[{"key": "value1", "key2": 42}], + ) task_inputs: Dict[ str, List[ @@ -502,6 +517,11 @@ class FlowRunCreate(ActionBaseModel): description="A list of tags for the flow run.", examples=[["tag-1", "tag-2"]], ) + labels: Union[schemas.core.KeyValueLabels, None] = Field( + default_factory=dict, + description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.", + examples=[{"key": "value1", "key2": 42}], + ) idempotency_key: Optional[str] = Field( None, description=( diff --git a/src/prefect/server/schemas/core.py b/src/prefect/server/schemas/core.py index c6d3a692fb24..496941340a3d 100644 --- a/src/prefect/server/schemas/core.py +++ b/src/prefect/server/schemas/core.py @@ -11,6 +11,9 @@ BaseModel, ConfigDict, Field, + StrictBool, + StrictFloat, + StrictInt, field_validator, model_validator, ) @@ -76,6 +79,8 @@ DEFAULT_BLOCK_SCHEMA_VERSION = "non-versioned" +KeyValueLabels = dict[str, Union[StrictBool, StrictInt, StrictFloat, str]] + class Flow(ORMBaseModel): """An ORM representation of flow data.""" @@ -88,6 +93,11 @@ class Flow(ORMBaseModel): description="A list of flow tags", examples=[["tag-1", "tag-2"]], ) + labels: Union[KeyValueLabels, None] = Field( + default_factory=dict, + description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.", + examples=[{"key": "value1", "key2": 42}], + ) class FlowRunPolicy(PrefectBaseModel): @@ -225,6 +235,11 @@ class FlowRun(ORMBaseModel): description="A list of tags on the flow run", examples=[["tag-1", "tag-2"]], ) + labels: Union[KeyValueLabels, None] = Field( + default_factory=dict, + description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.", + examples=[{"key": "value1", "key2": 42}], + ) parent_task_run_id: Optional[UUID] = Field( default=None, description=( @@ -440,6 +455,11 @@ class TaskRun(ORMBaseModel): description="A list of tags for the task run.", examples=[["tag-1", "tag-2"]], ) + labels: Union[KeyValueLabels, None] = Field( + default_factory=dict, + description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.", + examples=[{"key": "value1", "key2": 42}], + ) state_id: Optional[UUID] = Field( default=None, description="The id of the current task run state." ) @@ -584,6 +604,11 @@ class Deployment(ORMBaseModel): description="A list of tags for the deployment", examples=[["tag-1", "tag-2"]], ) + labels: Union[KeyValueLabels, None] = Field( + default_factory=dict, + description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.", + examples=[{"key": "value1", "key2": 42}], + ) work_queue_name: Optional[str] = Field( default=None, description=( diff --git a/src/prefect/server/schemas/responses.py b/src/prefect/server/schemas/responses.py index fd8d7cab0b5f..207631d9473e 100644 --- a/src/prefect/server/schemas/responses.py +++ b/src/prefect/server/schemas/responses.py @@ -19,6 +19,7 @@ WorkQueueStatusDetail, ) from prefect.server.utilities.schemas.bases import ORMBaseModel, PrefectBaseModel +from prefect.types import KeyValueLabelsField from prefect.utilities.collections import AutoEnum from prefect.utilities.names import generate_slug @@ -229,6 +230,7 @@ class FlowRunResponse(ORMBaseModel): description="A list of tags on the flow run", examples=[["tag-1", "tag-2"]], ) + labels: KeyValueLabelsField parent_task_run_id: Optional[UUID] = Field( default=None, description=( @@ -386,6 +388,7 @@ class DeploymentResponse(ORMBaseModel): description="A list of tags for the deployment", examples=[["tag-1", "tag-2"]], ) + labels: KeyValueLabelsField work_queue_name: Optional[str] = Field( default=None, description=( diff --git a/src/prefect/types/__init__.py b/src/prefect/types/__init__.py index 9c4afda93936..14d07397f7bc 100644 --- a/src/prefect/types/__init__.py +++ b/src/prefect/types/__init__.py @@ -1,6 +1,6 @@ from functools import partial -from typing import Annotated, Any, Dict, List, Set, TypeVar, Union -from typing_extensions import Literal +from typing import Annotated, Any, Dict, List, Optional, Set, TypeVar, Union +from typing_extensions import Literal, TypeAlias import orjson import pydantic @@ -148,9 +148,29 @@ def validate_set_T_from_delim_string( BeforeValidator(lambda x: x.upper()), ] + +KeyValueLabels: TypeAlias = dict[str, Union[StrictBool, StrictInt, StrictFloat, str]] + + +def convert_none_to_empty_dict(v: Optional[KeyValueLabels]) -> KeyValueLabels: + return v or {} + + +KeyValueLabelsField = Annotated[ + KeyValueLabels, + Field( + default_factory=dict, + description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.", + examples=[{"key": "value1", "key2": 42}], + ), + BeforeValidator(convert_none_to_empty_dict), +] + + __all__ = [ "ClientRetryExtraCodes", "LogLevel", + "KeyValueLabelsField", "NonNegativeInteger", "PositiveInteger", "ListOfNonEmptyStrings", diff --git a/tests/events/client/instrumentation/test_task_run_state_change_events.py b/tests/events/client/instrumentation/test_task_run_state_change_events.py index b402b8d5d1f7..94006c9d6b5b 100644 --- a/tests/events/client/instrumentation/test_task_run_state_change_events.py +++ b/tests/events/client/instrumentation/test_task_run_state_change_events.py @@ -86,6 +86,7 @@ def happy_path(): "name": task_run.name, "run_count": 0, "tags": [], + "labels": {}, "task_inputs": {}, "total_run_time": 0.0, }, @@ -141,6 +142,7 @@ def happy_path(): "name": task_run.name, "run_count": 1, "tags": [], + "labels": {}, "task_inputs": {}, "total_run_time": 0.0, }, @@ -201,6 +203,7 @@ def happy_path(): "name": task_run.name, "run_count": 1, "tags": [], + "labels": {}, "task_inputs": {}, }, } @@ -281,6 +284,7 @@ def happy_path(): "name": task_run.name, "run_count": 0, "tags": [], + "labels": {}, "task_inputs": {}, "total_run_time": 0.0, }, @@ -336,6 +340,7 @@ def happy_path(): "name": task_run.name, "run_count": 1, "tags": [], + "labels": {}, "task_inputs": {}, "total_run_time": 0.0, }, @@ -401,6 +406,7 @@ def happy_path(): "name": task_run.name, "run_count": 1, "tags": [], + "labels": {}, "task_inputs": {}, }, } @@ -540,6 +546,7 @@ def happy_little_tree(): "flow_run_run_count": 0, "run_count": 0, "tags": [], + "labels": {}, "task_inputs": {}, "total_run_time": 0.0, }, diff --git a/tests/server/models/test_deployments.py b/tests/server/models/test_deployments.py index 852895ad0389..bf99abc96858 100644 --- a/tests/server/models/test_deployments.py +++ b/tests/server/models/test_deployments.py @@ -1609,3 +1609,53 @@ async def test_can_delete_all_schedules( deployment_id=deployment.id, ) assert len(schedules) == 0 + + +class TestDeploymentLabels: + @pytest.mark.parametrize( + "user_supplied_labels,parent_flow_labels,expected", + [ + ( + {"env": "prod", "label-on-flow": "from-user"}, + {"label-on-flow": "from-flow", "flow-label": 15}, + { + "label-on-flow": "from-user", + "env": "prod", + "flow-label": 15, + # "prefect.flow.id": added within test + }, + ), + ( + {}, + {}, + { + # "prefect.flow.id": added within test + }, + ), + ], + ) + async def test_with_system_labels_for_deployment( + self, + user_supplied_labels, + parent_flow_labels, + expected, + session: AsyncSession, + flow: orm_models.Flow, + ): + deployment_data = schemas.core.Deployment( + name="Test Deployment with labels", + flow_id=flow.id, + labels=user_supplied_labels, + tags=["d-tag"], + ) + flow.labels = parent_flow_labels + session.add(flow) + await session.commit() + expected["prefect.flow.id"] = str(flow.id) + + merged_labels = await models.deployments.with_system_labels_for_deployment( + session=session, + deployment=deployment_data, + ) + + assert merged_labels == expected diff --git a/tests/server/orchestration/api/test_deployments.py b/tests/server/orchestration/api/test_deployments.py index 16ef2ed7cda8..35b23578ec9d 100644 --- a/tests/server/orchestration/api/test_deployments.py +++ b/tests/server/orchestration/api/test_deployments.py @@ -86,6 +86,7 @@ async def test_create_deployment( entrypoint="/file.py:flow", flow_id=flow.id, tags=["foo"], + labels={"env": "dev"}, parameters={"foo": "bar"}, job_variables={"cpu": 24}, storage_document_id=storage_document_id, @@ -101,6 +102,10 @@ async def test_create_deployment( assert deployment_response.storage_document_id == storage_document_id assert deployment_response.job_variables == {"cpu": 24} assert deployment_response.status == "NOT_READY" + assert deployment_response.labels == { + "env": "dev", + "prefect.flow.id": str(flow.id), + } deployment = await models.deployments.read_deployment( session=session, deployment_id=deployment_response.id @@ -2570,6 +2575,10 @@ async def test_create_flow_run_from_deployment_with_defaults( assert response.json()["work_queue_name"] == deployment.work_queue_name assert response.json()["state_type"] == schemas.states.StateType.SCHEDULED assert response.json()["deployment_version"] is None + assert response.json()["labels"] == { + "prefect.flow.id": str(deployment.flow_id), + "prefect.deployment.id": str(deployment.id), + } async def test_create_flow_run_from_deployment_with_deployment_version( self, deployment_with_version, client diff --git a/tests/server/orchestration/api/test_flow_runs.py b/tests/server/orchestration/api/test_flow_runs.py index 5b2ea1db1cca..afe4c4da5fc8 100644 --- a/tests/server/orchestration/api/test_flow_runs.py +++ b/tests/server/orchestration/api/test_flow_runs.py @@ -30,6 +30,7 @@ async def test_create_flow_run(self, flow, client, session): flow_id=flow.id, name="orange you glad i didn't say yellow salamander", state=client_states.Pending().to_state_create(), + labels={"env": "dev"}, ).model_dump(mode="json"), ) assert response.status_code == status.HTTP_201_CREATED, response.text @@ -39,6 +40,10 @@ async def test_create_flow_run(self, flow, client, session): assert ( response.json()["name"] == "orange you glad i didn't say yellow salamander" ) + assert response.json()["labels"] == { + "env": "dev", + "prefect.flow.id": str(flow.id), + } flow_run = await models.flow_runs.read_flow_run( session=session, flow_run_id=response.json()["id"] diff --git a/tests/server/orchestration/api/test_flows.py b/tests/server/orchestration/api/test_flows.py index 4adf8e4487fb..ca9842a2decc 100644 --- a/tests/server/orchestration/api/test_flows.py +++ b/tests/server/orchestration/api/test_flows.py @@ -11,14 +11,16 @@ class TestCreateFlow: async def test_create_flow(self, session, client): - flow_data = {"name": "my-flow"} + flow_data = {"name": "my-flow", "labels": {"env": "dev"}} response = await client.post("/flows/", json=flow_data) assert response.status_code == status.HTTP_201_CREATED assert response.json()["name"] == "my-flow" flow_id = response.json()["id"] flow = await models.flows.read_flow(session=session, flow_id=flow_id) + assert flow assert str(flow.id) == flow_id + assert flow.labels == {"env": "dev"} async def test_create_flow_populates_and_returned_created(self, client): now = pendulum.now(tz="UTC") diff --git a/tests/server/orchestration/api/test_task_runs.py b/tests/server/orchestration/api/test_task_runs.py index 0a4678e510b9..7a1d94d997d7 100644 --- a/tests/server/orchestration/api/test_task_runs.py +++ b/tests/server/orchestration/api/test_task_runs.py @@ -21,16 +21,23 @@ async def test_create_task_run(self, flow_run, client, session): "task_key": "my-task-key", "name": "my-cool-task-run-name", "dynamic_key": "0", + "labels": {"env": "dev"}, } response = await client.post("/task_runs/", json=task_run_data) assert response.status_code == status.HTTP_201_CREATED assert response.json()["flow_run_id"] == str(flow_run.id) assert response.json()["id"] assert response.json()["name"] == "my-cool-task-run-name" + assert response.json()["labels"] == { + "env": "dev", + "prefect.flow.id": str(flow_run.flow_id), + "prefect.flow-run.id": str(flow_run.id), + } task_run = await models.task_runs.read_task_run( session=session, task_run_id=response.json()["id"] ) + assert task_run assert task_run.flow_run_id == flow_run.id async def test_create_task_run_gracefully_upserts(self, flow_run, client): diff --git a/tests/telemetry/instrumentation_tester.py b/tests/telemetry/instrumentation_tester.py index 61d6f66a122d..f2df6dbe8281 100644 --- a/tests/telemetry/instrumentation_tester.py +++ b/tests/telemetry/instrumentation_tester.py @@ -95,8 +95,8 @@ def get_finished_spans(self): def assert_has_attributes(obj: HasAttributes, attributes: Dict[str, Any]): assert obj.attributes is not None for key, val in attributes.items(): - assert key in obj.attributes - assert obj.attributes[key] == val + assert key in obj.attributes, f"Key {key!r} not found in attributes" + assert obj.attributes[key] == val, f"Value for key {key!r} does not match" @staticmethod def assert_span_instrumented_for(span: Union[Span, ReadableSpan], module): diff --git a/tests/test_flow_engine.py b/tests/test_flow_engine.py index db0016756aa7..f00d130423cc 100644 --- a/tests/test_flow_engine.py +++ b/tests/test_flow_engine.py @@ -16,7 +16,7 @@ from prefect import Flow, __development_base_path__, flow, task from prefect.client.orchestration import PrefectClient, SyncPrefectClient from prefect.client.schemas.filters import FlowFilter, FlowRunFilter -from prefect.client.schemas.objects import FlowRun, StateType +from prefect.client.schemas.objects import StateType from prefect.client.schemas.sorting import FlowRunSort from prefect.concurrency.asyncio import concurrency as aconcurrency from prefect.concurrency.sync import concurrency @@ -45,7 +45,6 @@ from prefect.server.schemas.core import ConcurrencyLimitV2 from prefect.server.schemas.core import FlowRun as ServerFlowRun from prefect.testing.utilities import AsyncMock -from prefect.types import KeyValueLabels from prefect.utilities.callables import get_call_parameters from prefect.utilities.filesystem import tmpchdir @@ -1891,27 +1890,18 @@ def instrumented_flow(): assert span.status.status_code == trace.StatusCode.OK def test_flow_run_instrumentation_captures_labels( - self, instrumentation: InstrumentationTester, monkeypatch + self, + instrumentation: InstrumentationTester, + sync_prefect_client: SyncPrefectClient, ): - # simulate server responding with labels on flow run - class FlowRunWithLabels(FlowRun): - labels: KeyValueLabels = pydantic.Field( - default_factory=lambda: { - "prefect.deployment.id": "some-id", - "my-label": "my-value", - } - ) - - monkeypatch.setattr( - "prefect.client.orchestration.FlowRun", - FlowRunWithLabels, - ) - @flow def instrumented_flow(): pass - instrumented_flow() + state = instrumented_flow(return_state=True) + + assert state.state_details.flow_run_id is not None + flow_run = sync_prefect_client.read_flow_run(state.state_details.flow_run_id) spans = instrumentation.get_finished_spans() assert len(spans) == 1 @@ -1921,11 +1911,10 @@ def instrumented_flow(): instrumentation.assert_has_attributes( span, { + **flow_run.labels, "prefect.run.type": "flow", "prefect.flow.name": "instrumented-flow", "prefect.run.id": mock.ANY, - "prefect.deployment.id": "some-id", - "my-label": "my-value", }, ) diff --git a/tests/test_types.py b/tests/test_types.py index 7f1d962031d0..443bed6a8fdc 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -1,10 +1,10 @@ from pydantic import BaseModel -from prefect.types import KeyValueLabels +from prefect.types import KeyValueLabelsField def test_allow_none_as_empty_dict(): class Model(BaseModel): - labels: KeyValueLabels + labels: KeyValueLabelsField assert Model(labels=None).labels == {} # type: ignore