Skip to content

Commit

Permalink
feat(worker): add SILVERBACK_FORK_MODE handler execution context (#157)
Browse files Browse the repository at this point in the history
* feat(worker): add SILVERBACK_FORK_MODE handler execution context

* refactor: use internal variable for fork detection to prevent scope bug

* fix: block other forks until ApeWorX/ape#2348

* refactor(fork): working with fork fix from ape core

depends: ApeWorX/ape#2349

---------

Co-authored-by: johnson2427 <[email protected]>
  • Loading branch information
fubuloubu and johnson2427 authored Nov 24, 2024
1 parent 282e910 commit e949e04
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 8 deletions.
4 changes: 4 additions & 0 deletions ape-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Ensure we are configured for fork mode correct for example
ethereum:
mainnet-fork:
default_provider: foundry
46 changes: 38 additions & 8 deletions silverback/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import atexit
import inspect
from collections import defaultdict
from datetime import timedelta
from typing import Any, Callable
from functools import wraps
from typing import Any, Awaitable, Callable

from ape.api.networks import LOCAL_NETWORK_NAME
from ape.contracts import ContractEvent, ContractInstance
Expand Down Expand Up @@ -108,17 +110,18 @@ def __init__(self, settings: Settings | None = None):

provider_context = settings.get_provider_context()
# NOTE: This allows using connected ape methods e.g. `Contract`
provider = provider_context.__enter__()
self.provider = provider_context.__enter__()

self.identifier = SilverbackID(
name=settings.BOT_NAME,
network=provider.network.name,
ecosystem=provider.network.ecosystem.name,
network=self.provider.network.name,
ecosystem=self.provider.network.ecosystem.name,
)

# Adjust defaults from connection
if settings.NEW_BLOCK_TIMEOUT is None and (
provider.network.name.endswith("-fork") or provider.network.name == LOCAL_NETWORK_NAME
self.provider.network.name.endswith("-fork")
or self.provider.network.name == LOCAL_NETWORK_NAME
):
settings.NEW_BLOCK_TIMEOUT = int(timedelta(days=1).total_seconds())

Expand All @@ -138,6 +141,7 @@ def __init__(self, settings: Settings | None = None):

self.signer = settings.get_signer()
self.new_block_timeout = settings.NEW_BLOCK_TIMEOUT
self.use_fork = settings.FORK_MODE and not self.provider.network.name.endswith("-fork")

signer_str = f"\n SIGNER={repr(self.signer)}"
new_block_timeout_str = (
Expand All @@ -146,7 +150,9 @@ def __init__(self, settings: Settings | None = None):

network_choice = f"{self.identifier.ecosystem}:{self.identifier.network}"
logger.success(
f'Loaded Silverback Bot:\n NETWORK="{network_choice}"'
"Loaded Silverback Bot:\n"
f' NETWORK="{network_choice}"\n'
f" FORK_MODE={self.use_fork}"
f"{signer_str}{new_block_timeout_str}"
)

Expand Down Expand Up @@ -225,6 +231,25 @@ async def __create_snapshot_handler(
last_block_processed=self.state.get("system:last_block_processed", -1),
)

# To ensure we don't have too many forks at once
# HACK: Until `NetworkManager.fork` (and `ProviderContextManager`) allows concurrency

def _with_fork_decorator(self, handler: Callable) -> Callable:
# Trigger worker-side handling using fork network by wrapping handler
fork_context = self.provider.network_manager.fork

@wraps(handler)
async def fork_handler(*args, **kwargs):
with fork_context():
result = handler(*args, **kwargs)

if inspect.isawaitable(result):
return await result

return result

return fork_handler

def broker_task_decorator(
self,
task_type: TaskType,
Expand Down Expand Up @@ -266,7 +291,9 @@ def broker_task_decorator(
raise ContainerTypeMismatchError(task_type, container)

# Register user function as task handler with our broker
def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask:
def add_taskiq_task(
handler: Callable[..., Any | Awaitable[Any]]
) -> AsyncTaskiqDecoratedTask:
labels = {"task_type": str(task_type)}

# NOTE: Do *not* do `if container` because that does a `len(container)` call,
Expand All @@ -276,14 +303,17 @@ def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask:
# Address is almost a certainty if the container is being used as a filter here.
if not (contract_address := getattr(container.contract, "address", None)):
raise InvalidContainerTypeError(
"Please provider a contract event from a valid contract instance."
"Please provide a contract event from a valid contract instance."
)

labels["contract_address"] = contract_address
labels["event_signature"] = container.abi.signature

self.tasks[task_type].append(TaskData(name=handler.__name__, labels=labels))

if self.use_fork:
handler = self._with_fork_decorator(handler)

return self.broker.register_task(
handler,
task_name=handler.__name__,
Expand Down
4 changes: 4 additions & 0 deletions silverback/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class Settings(BaseSettings, ManagerAccessMixin):
# A unique identifier for this silverback instance
BOT_NAME: str = "bot"

# Execute every handler using an independent fork context
# NOTE: Requires fork-able provider installed and configured for network
FORK_MODE: bool = False

BROKER_CLASS: str = "taskiq:InMemoryBroker"
BROKER_URI: str = "" # To be deprecated in 0.6
BROKER_KWARGS: dict[str, Any] = dict()
Expand Down

0 comments on commit e949e04

Please sign in to comment.