-
Notifications
You must be signed in to change notification settings - Fork 56
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
274 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
# Sentry Sample | ||
|
||
This sample shows how to configure [Sentry](https://sentry.io) to intercept and capture errors from the Temporal SDK. | ||
|
||
For this sample, the optional `sentry` dependency group must be included. To include, run: | ||
|
||
poetry install --with sentry | ||
|
||
To run, first see [README.md](../README.md) for prerequisites. Set `SENTRY_DSN` environment variable to the Sentry DSN. | ||
Then, run the following from this directory to start the worker: | ||
|
||
poetry run python worker.py | ||
|
||
This will start the worker. Then, in another terminal, run the following to execute the workflow: | ||
|
||
poetry run python starter.py | ||
|
||
The workflow should complete with the hello result. If you alter the workflow or the activity to raise an | ||
`ApplicationError` instead, it should appear in Sentry. |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
from dataclasses import asdict, is_dataclass | ||
from typing import Any, Optional, Type, Union | ||
|
||
import sentry_sdk | ||
from temporalio import activity, workflow | ||
from temporalio.worker import ( | ||
ActivityInboundInterceptor, | ||
ExecuteActivityInput, | ||
ExecuteWorkflowInput, | ||
Interceptor, | ||
WorkflowInboundInterceptor, | ||
WorkflowInterceptorClassInput, | ||
) | ||
|
||
|
||
def _set_common_workflow_tags( | ||
info: Union[workflow.Info, activity.Info], scope: sentry_sdk.Scope | ||
): | ||
scope.set_tag("temporal.workflow.type", info.workflow_type) | ||
scope.set_tag("temporal.workflow.id", info.workflow_id) | ||
|
||
|
||
class _SentryActivityInboundInterceptor(ActivityInboundInterceptor): | ||
async def execute_activity(self, input: ExecuteActivityInput) -> Any: | ||
transaction_name = input.fn.__module__ + "." + input.fn.__qualname__ | ||
scope_ctx_manager = sentry_sdk.configure_scope() | ||
with scope_ctx_manager as scope, sentry_sdk.start_transaction( | ||
name=transaction_name | ||
): | ||
scope.set_tag("temporal.execution_type", "activity") | ||
activity_info = activity.info() | ||
_set_common_workflow_tags(activity_info, scope) | ||
scope.set_tag("temporal.activity.id", activity_info.activity_id) | ||
scope.set_tag("temporal.activity.type", activity_info.activity_type) | ||
scope.set_tag("temporal.activity.task_queue", activity_info.task_queue) | ||
scope.set_tag( | ||
"temporal.workflow.namespace", activity_info.workflow_namespace | ||
) | ||
scope.set_tag("temporal.workflow.run_id", activity_info.workflow_run_id) | ||
try: | ||
return await super().execute_activity(input) | ||
except Exception as e: | ||
if len(input.args) == 1 and is_dataclass(input.args[0]): | ||
scope.set_context("temporal.activity.input", asdict(input.args[0])) | ||
scope.set_context("temporal.activity.info", activity.info().__dict__) | ||
sentry_sdk.capture_exception(e) | ||
raise e | ||
finally: | ||
scope.clear() | ||
|
||
|
||
class _SentryWorkflowInterceptor(WorkflowInboundInterceptor): | ||
async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any: | ||
transaction_name = input.run_fn.__module__ + "." + input.run_fn.__qualname__ | ||
scope_ctx_manager = sentry_sdk.configure_scope() | ||
with scope_ctx_manager as scope, sentry_sdk.start_transaction( | ||
name=transaction_name | ||
): | ||
scope.set_tag("temporal.execution_type", "workflow") | ||
workflow_info = workflow.info() | ||
_set_common_workflow_tags(workflow_info, scope) | ||
scope.set_tag("temporal.workflow.task_queue", workflow_info.task_queue) | ||
scope.set_tag("temporal.workflow.namespace", workflow_info.namespace) | ||
scope.set_tag("temporal.workflow.run_id", workflow_info.run_id) | ||
try: | ||
return await super().execute_workflow(input) | ||
except Exception as e: | ||
if len(input.args) == 1 and is_dataclass(input.args[0]): | ||
scope.set_context("temporal.workflow.input", asdict(input.args[0])) | ||
scope.set_context("temporal.workflow.info", workflow.info().__dict__) | ||
sentry_sdk.capture_exception(e) | ||
raise e | ||
finally: | ||
scope.clear() | ||
|
||
|
||
class SentryInterceptor(Interceptor): | ||
"""Temporal Interceptor class which will report workflow & activity exceptions to Sentry""" | ||
|
||
def intercept_activity( | ||
self, next: ActivityInboundInterceptor | ||
) -> ActivityInboundInterceptor: | ||
"""Implementation of | ||
:py:meth:`temporalio.worker.Interceptor.intercept_activity`. | ||
""" | ||
return _SentryActivityInboundInterceptor(super().intercept_activity(next)) | ||
|
||
def workflow_interceptor_class( | ||
self, input: WorkflowInterceptorClassInput | ||
) -> Optional[Type[WorkflowInboundInterceptor]]: | ||
return _SentryWorkflowInterceptor |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
import asyncio | ||
import os | ||
|
||
from temporalio.client import Client | ||
|
||
from sentry.worker import GreetingWorkflow | ||
|
||
|
||
async def main(): | ||
# Connect client | ||
client = await Client.connect("localhost:7233") | ||
|
||
# Run workflow | ||
result = await client.execute_workflow( | ||
GreetingWorkflow.run, | ||
"World", | ||
id="sentry-workflow-id", | ||
task_queue="sentry-task-queue", | ||
) | ||
print(f"Workflow result: {result}") | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
import asyncio | ||
import logging | ||
import os | ||
from dataclasses import dataclass | ||
from datetime import timedelta | ||
|
||
import sentry_sdk | ||
from temporalio import activity, workflow | ||
from temporalio.client import Client | ||
from temporalio.worker import Worker | ||
|
||
from sentry.interceptor import SentryInterceptor | ||
|
||
|
||
@dataclass | ||
class ComposeGreetingInput: | ||
greeting: str | ||
name: str | ||
|
||
|
||
@activity.defn | ||
async def compose_greeting(input: ComposeGreetingInput) -> str: | ||
activity.logger.info("Running activity with parameter %s" % input) | ||
return f"{input.greeting}, {input.name}!" | ||
|
||
|
||
@workflow.defn | ||
class GreetingWorkflow: | ||
@workflow.run | ||
async def run(self, name: str) -> str: | ||
workflow.logger.info("Running workflow with parameter %s" % name) | ||
return await workflow.execute_activity( | ||
compose_greeting, | ||
ComposeGreetingInput("Hello", name), | ||
start_to_close_timeout=timedelta(seconds=10), | ||
) | ||
|
||
|
||
async def main(): | ||
# Uncomment the line below to see logging | ||
# logging.basicConfig(level=logging.INFO) | ||
|
||
# Initialize the Sentry SDK | ||
sentry_sdk.init( | ||
dsn=os.environ.get("SENTRY_DSN"), | ||
) | ||
|
||
# Start client | ||
client = await Client.connect("localhost:7233") | ||
|
||
# Run a worker for the workflow | ||
worker = Worker( | ||
client, | ||
task_queue="sentry-task-queue", | ||
workflows=[GreetingWorkflow], | ||
activities=[compose_greeting], | ||
interceptors=[SentryInterceptor()], # Use SentryInterceptor for error reporting | ||
) | ||
|
||
await worker.run() | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |