Skip to content

Commit

Permalink
Refactor: use method activity
Browse files Browse the repository at this point in the history
  • Loading branch information
pfranck committed Jul 8, 2024
1 parent af2c5c2 commit ac73803
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 16 deletions.
16 changes: 9 additions & 7 deletions polling/infrequent/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ class ComposeGreetingInput:
name: str


@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
attempt = activity.info().attempt - 1
test_service = TestService(attempt=attempt)
# If this raises an exception because it's not done yet, the activity will
# continually be scheduled for retry
return await test_service.get_service_result(input)
class ComposeGreeting:
def __init__(self):
self.test_service = TestService()

@activity.defn
async def compose_greeting(self, input: ComposeGreetingInput) -> str:
# If this raises an exception because it's not done yet, the activity will
# continually be scheduled for retry
return await self.test_service.get_service_result(input)
6 changes: 3 additions & 3 deletions polling/infrequent/run_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
from temporalio.client import Client
from temporalio.worker import Worker

from polling.infrequent.activities import compose_greeting
from polling.infrequent.activities import ComposeGreeting
from polling.infrequent.workflows import GreetingWorkflow


async def main():
client = await Client.connect("localhost:7233")

activities = ComposeGreeting()
worker = Worker(
client,
task_queue="infrequent-activity-retry-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
activities=[activities.compose_greeting],
)
await worker.run()

Expand Down
6 changes: 3 additions & 3 deletions polling/infrequent/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
from temporalio.common import RetryPolicy

with workflow.unsafe.imports_passed_through():
from polling.infrequent.activities import ComposeGreetingInput, compose_greeting
from polling.infrequent.activities import ComposeGreeting, ComposeGreetingInput


@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
compose_greeting,
return await workflow.execute_activity_method(
ComposeGreeting.compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=2),
retry_policy=RetryPolicy(
Expand Down
6 changes: 3 additions & 3 deletions polling/test_service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
class TestService:
def __init__(self, attempt=0, error_attempts=5):
self.try_attempts = attempt
self.error_attempts = error_attempts
def __init__(self):
self.try_attempts = 0
self.error_attempts = 5

async def get_service_result(self, input):
print(
Expand Down

0 comments on commit ac73803

Please sign in to comment.