Skip to content

Commit

Permalink
Fix test_flow_run_instrumentation_captures_labels
Browse files Browse the repository at this point in the history
  • Loading branch information
bunchesofdonald committed Nov 20, 2024
1 parent 4885aba commit 2886b28
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 23 deletions.
4 changes: 2 additions & 2 deletions src/prefect/types/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from functools import partial
from typing import Annotated, Any, Dict, List, Optional, Set, TypeAlias, TypeVar, Union
from typing_extensions import Literal
from typing import Annotated, Any, Dict, List, Optional, Set, TypeVar, Union
from typing_extensions import Literal, TypeAlias
import orjson
import pydantic

Expand Down
4 changes: 2 additions & 2 deletions tests/telemetry/instrumentation_tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ def get_finished_spans(self):
def assert_has_attributes(obj: HasAttributes, attributes: Dict[str, Any]):
assert obj.attributes is not None
for key, val in attributes.items():
assert key in obj.attributes
assert obj.attributes[key] == val
assert key in obj.attributes, f"Key {key!r} not found in attributes"
assert obj.attributes[key] == val, f"Value for key {key!r} does not match"

@staticmethod
def assert_span_instrumented_for(span: Union[Span, ReadableSpan], module):
Expand Down
28 changes: 9 additions & 19 deletions tests/test_flow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from prefect import Flow, __development_base_path__, flow, task
from prefect.client.orchestration import PrefectClient, SyncPrefectClient
from prefect.client.schemas.filters import FlowFilter, FlowRunFilter
from prefect.client.schemas.objects import FlowRun, StateType
from prefect.client.schemas.objects import StateType
from prefect.client.schemas.sorting import FlowRunSort
from prefect.concurrency.asyncio import concurrency as aconcurrency
from prefect.concurrency.sync import concurrency
Expand Down Expand Up @@ -1891,27 +1891,18 @@ def instrumented_flow():
assert span.status.status_code == trace.StatusCode.OK

def test_flow_run_instrumentation_captures_labels(
self, instrumentation: InstrumentationTester, monkeypatch
self,
instrumentation: InstrumentationTester,
sync_prefect_client: SyncPrefectClient,
):
# simulate server responding with labels on flow run
class FlowRunWithLabels(FlowRun):
labels: KeyValueLabels = pydantic.Field(
default_factory=lambda: {
"prefect.deployment.id": "some-id",
"my-label": "my-value",
}
)

monkeypatch.setattr(
"prefect.client.orchestration.FlowRun",
FlowRunWithLabels,
)

@flow
def instrumented_flow():
pass

instrumented_flow()
state = instrumented_flow(return_state=True)

assert state.state_details.flow_run_id is not None
flow_run = sync_prefect_client.read_flow_run(state.state_details.flow_run_id)

spans = instrumentation.get_finished_spans()
assert len(spans) == 1
Expand All @@ -1921,11 +1912,10 @@ def instrumented_flow():
instrumentation.assert_has_attributes(
span,
{
**flow_run.labels,
"prefect.run.type": "flow",
"prefect.flow.name": "instrumented-flow",
"prefect.run.id": mock.ANY,
"prefect.deployment.id": "some-id",
"my-label": "my-value",
},
)

Expand Down

0 comments on commit 2886b28

Please sign in to comment.