Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into feat/local_time
Browse files Browse the repository at this point in the history
  • Loading branch information
dtdang committed Nov 25, 2024
2 parents b7e49bf + e949e04 commit f6deedb
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 32 deletions.
14 changes: 7 additions & 7 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
rev: v5.0.0
hooks:
- id: check-yaml

- repo: https://github.com/pre-commit/mirrors-isort
rev: v5.10.1
- repo: https://github.com/PyCQA/isort
rev: 5.13.2
hooks:
- id: isort

- repo: https://github.com/psf/black
rev: 23.12.0
rev: 24.10.0
hooks:
- id: black
name: black

- repo: https://github.com/pycqa/flake8
rev: 6.1.0
rev: 7.1.1
hooks:
- id: flake8

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.7.1
rev: v1.13.0
hooks:
- id: mypy
additional_dependencies: [types-setuptools, pydantic]

- repo: https://github.com/executablebooks/mdformat
rev: 0.7.17
rev: 0.7.19
hooks:
- id: mdformat
additional_dependencies: [mdformat-gfm, mdformat-frontmatter]
Expand Down
4 changes: 4 additions & 0 deletions ape-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Ensure we are configured for fork mode correct for example
ethereum:
mainnet-fork:
default_provider: foundry
2 changes: 1 addition & 1 deletion docs/userguides/deploying.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ docker push your-registry-url/project/botA:latest

TODO: The ApeWorX team has github actions definitions for building, pushing and deploying.

If you are unfamiliar with docker and container registries, you can use the \[\[github-action\]\].
If you are unfamiliar with docker and container registries, you can use the \[[github-action]\].

You do not need to build using this command if you use the github action, but it is there to help you if you are having problems figuring out how to build and run your bot images on the cluster successfully.

Expand Down
10 changes: 5 additions & 5 deletions docs/userguides/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,13 @@ def handle_on_worker_shutdown(state):

This function comes a parameter `state` that you can use for storing the results of your startup computation or resources that you have provisioned.

It's import to note that this is useful for ensuring that your workers (of which there can be multiple) have the resources necessary to properly handle any updates you want to make in your handler functions, such as connecting to the Telegram API, an SQL or NoSQL database connection, or something else. **This function will run on every worker process**.
It's import to note that this is useful for ensuring that your workers (of which there can be multiple) have the resources necessary to properly handle any updates you want to make in your handler functions, such as connecting to the Telegram API, an SQL or NoSQL database connection, or something else. **This function will run on every worker process**.

*New in 0.2.0*: These events moved from `on_startup()` and `on_shutdown()` for clarity.

#### Worker State

The `state` variable is also useful as this can be made available to each handler method so other stateful quantities can be maintained for other uses. Each distributed worker has its own instance of state.
The `state` variable is also useful as this can be made available to each handler method so other stateful quantities can be maintained for other uses. Each distributed worker has its own instance of state.

To access the state from a handler, you must annotate `context` as a dependency like so:

Expand All @@ -163,7 +163,7 @@ def block_handler(block, context: Annotated[Context, TaskiqDepends()]):

### Bot Events

You can also add an bot startup and shutdown handler that will be **executed once upon every bot startup**. This may be useful for things like processing historical events since the bot was shutdown or other one-time actions to perform at startup.
You can also add an bot startup and shutdown handler that will be **executed once upon every bot startup**. This may be useful for things like processing historical events since the bot was shutdown or other one-time actions to perform at startup.

```py
@bot.on_startup()
Expand All @@ -180,7 +180,7 @@ def handle_on_shutdown():
...
```

*Changed in 0.2.0*: The behavior of the `@bot.on_startup()` decorator and handler signature have changed. It is now executed only once upon bot startup and worker events have moved on `@bot.on_worker_startup()`.
*Changed in 0.2.0*: The behavior of the `@bot.on_startup()` decorator and handler signature have changed. It is now executed only once upon bot startup and worker events have moved on `@bot.on_worker_startup()`.

## Bot State

Expand Down Expand Up @@ -271,7 +271,7 @@ Use segregated keys and limit your risk by controlling the amount of funds that
Using only the `silverback run ...` command in a default configuration executes everything in one process and the job queue is completely in-memory with a shared state.
In some high volume environments, you may want to deploy your Silverback bot in a distributed configuration using multiple processes to handle the messages at a higher rate.

The primary components are the client and workers. The client handles Silverback events (blocks and contract event logs) and creates jobs for the workers to process in an asynchronous manner.
The primary components are the client and workers. The client handles Silverback events (blocks and contract event logs) and creates jobs for the workers to process in an asynchronous manner.

For this to work, you must configure a [TaskIQ broker](https://taskiq-python.github.io/guide/architecture-overview.html#broker) capable of distributed processing.
Additonally, it is highly suggested you should also configure a [TaskIQ result backend](https://taskiq-python.github.io/guide/architecture-overview.html#result-backend) in order to process and store the results of executing tasks.
Expand Down
17 changes: 9 additions & 8 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
"hypothesis-jsonschema", # Generate strategies for pydantic models
],
"lint": [
"black>=24", # Auto-formatter and linter
"mypy>=1.10", # Static type analyzer
"black>=24.10.0,<25", # Auto-formatter and linter
"mypy>=1.13.0,<2", # Static type analyzer
"types-setuptools", # Needed for mypy type shed
"flake8>=7", # Style linter
"isort>=5.13", # Import sorting linter
"mdformat>=0.7", # Auto-formatter for markdown
"flake8>=7.1.1,<8", # Style linter
"isort>=5.13.2,<6", # Import sorting linter
"mdformat>=0.7.19", # Auto-formatter for markdown
"mdformat-gfm>=0.3.6", # Needed for formatting GitHub-flavored markdown
"mdformat-frontmatter>=2.0", # Needed for frontmatters-style headers in issue templates
"mdformat-pyproject>=0.0.1", # Allows configuring in pyproject.toml
"mdformat-pyproject>=0.0.2", # Allows configuring in pyproject.toml
],
"doc": ["sphinx-ape"],
"release": [ # `release` GitHub Action job uses this
Expand Down Expand Up @@ -63,14 +63,15 @@
install_requires=[
"apepay>=0.3.2,<1",
"click", # Use same version as eth-ape
"eth-ape>=0.7,<1.0",
"eth-ape>=0.8.19,<1.0",
"ethpm-types>=0.6.10", # lower pin only, `eth-ape` governs upper pin
"eth-pydantic-types", # Use same version as eth-ape
"packaging", # Use same version as eth-ape
"pydantic_settings", # Use same version as eth-ape
"taskiq[metrics]>=0.11.3,<0.12",
"taskiq[metrics]>=0.11.9,<0.12",
"tomlkit>=0.12,<1", # For reading/writing global platform profile
"fief-client[cli]>=0.19,<1", # for platform auth/cluster login
"websockets>=14.1,<15", # For subscriptions
],
entry_points={
"console_scripts": ["silverback=silverback._cli:cli"],
Expand Down
46 changes: 38 additions & 8 deletions silverback/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import atexit
import inspect
from collections import defaultdict
from datetime import timedelta
from typing import Any, Callable
from functools import wraps
from typing import Any, Awaitable, Callable

from ape.api.networks import LOCAL_NETWORK_NAME
from ape.contracts import ContractEvent, ContractInstance
Expand Down Expand Up @@ -108,17 +110,18 @@ def __init__(self, settings: Settings | None = None):

provider_context = settings.get_provider_context()
# NOTE: This allows using connected ape methods e.g. `Contract`
provider = provider_context.__enter__()
self.provider = provider_context.__enter__()

self.identifier = SilverbackID(
name=settings.BOT_NAME,
network=provider.network.name,
ecosystem=provider.network.ecosystem.name,
network=self.provider.network.name,
ecosystem=self.provider.network.ecosystem.name,
)

# Adjust defaults from connection
if settings.NEW_BLOCK_TIMEOUT is None and (
provider.network.name.endswith("-fork") or provider.network.name == LOCAL_NETWORK_NAME
self.provider.network.name.endswith("-fork")
or self.provider.network.name == LOCAL_NETWORK_NAME
):
settings.NEW_BLOCK_TIMEOUT = int(timedelta(days=1).total_seconds())

Expand All @@ -138,6 +141,7 @@ def __init__(self, settings: Settings | None = None):

self.signer = settings.get_signer()
self.new_block_timeout = settings.NEW_BLOCK_TIMEOUT
self.use_fork = settings.FORK_MODE and not self.provider.network.name.endswith("-fork")

signer_str = f"\n SIGNER={repr(self.signer)}"
new_block_timeout_str = (
Expand All @@ -146,7 +150,9 @@ def __init__(self, settings: Settings | None = None):

network_choice = f"{self.identifier.ecosystem}:{self.identifier.network}"
logger.success(
f'Loaded Silverback Bot:\n NETWORK="{network_choice}"'
"Loaded Silverback Bot:\n"
f' NETWORK="{network_choice}"\n'
f" FORK_MODE={self.use_fork}"
f"{signer_str}{new_block_timeout_str}"
)

Expand Down Expand Up @@ -225,6 +231,25 @@ async def __create_snapshot_handler(
last_block_processed=self.state.get("system:last_block_processed", -1),
)

# To ensure we don't have too many forks at once
# HACK: Until `NetworkManager.fork` (and `ProviderContextManager`) allows concurrency

def _with_fork_decorator(self, handler: Callable) -> Callable:
# Trigger worker-side handling using fork network by wrapping handler
fork_context = self.provider.network_manager.fork

@wraps(handler)
async def fork_handler(*args, **kwargs):
with fork_context():
result = handler(*args, **kwargs)

if inspect.isawaitable(result):
return await result

return result

return fork_handler

def broker_task_decorator(
self,
task_type: TaskType,
Expand Down Expand Up @@ -266,7 +291,9 @@ def broker_task_decorator(
raise ContainerTypeMismatchError(task_type, container)

# Register user function as task handler with our broker
def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask:
def add_taskiq_task(
handler: Callable[..., Any | Awaitable[Any]]
) -> AsyncTaskiqDecoratedTask:
labels = {"task_type": str(task_type)}

# NOTE: Do *not* do `if container` because that does a `len(container)` call,
Expand All @@ -276,14 +303,17 @@ def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask:
# Address is almost a certainty if the container is being used as a filter here.
if not (contract_address := getattr(container.contract, "address", None)):
raise InvalidContainerTypeError(
"Please provider a contract event from a valid contract instance."
"Please provide a contract event from a valid contract instance."
)

labels["contract_address"] = contract_address
labels["event_signature"] = container.abi.signature

self.tasks[task_type].append(TaskData(name=handler.__name__, labels=labels))

if self.use_fork:
handler = self._with_fork_decorator(handler)

return self.broker.register_task(
handler,
task_name=handler.__name__,
Expand Down
4 changes: 4 additions & 0 deletions silverback/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class Settings(BaseSettings, ManagerAccessMixin):
# A unique identifier for this silverback instance
BOT_NAME: str = "bot"

# Execute every handler using an independent fork context
# NOTE: Requires fork-able provider installed and configured for network
FORK_MODE: bool = False

BROKER_CLASS: str = "taskiq:InMemoryBroker"
BROKER_URI: str = "" # To be deprecated in 0.6
BROKER_KWARGS: dict[str, Any] = dict()
Expand Down
4 changes: 2 additions & 2 deletions silverback/subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from ape.logging import logger
from websockets import ConnectionClosedError
from websockets import client as ws_client
from websockets.asyncio import client as ws_client


class SubscriptionType(Enum):
Expand All @@ -26,7 +26,7 @@ def __init__(self, ws_provider_uri: str):
self._ws_provider_uri = ws_provider_uri

# Stateful
self._connection: ws_client.WebSocketClientProtocol | None = None
self._connection: ws_client.ClientConnection | None = None
self._last_request: int = 0
self._subscriptions: dict[str, asyncio.Queue] = {}
self._rpc_msg_buffer: list[dict] = []
Expand Down
3 changes: 2 additions & 1 deletion silverback/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@


async def run_worker(broker: AsyncBroker, worker_count=2, shutdown_timeout=90):
shutdown_event = asyncio.Event()
try:
tasks = []
with ThreadPoolExecutor(max_workers=worker_count) as pool:
Expand All @@ -19,7 +20,7 @@ async def run_worker(broker: AsyncBroker, worker_count=2, shutdown_timeout=90):
max_prefetch=0,
)
broker.is_worker_process = True
tasks.append(receiver.listen())
tasks.append(receiver.listen(shutdown_event))

await asyncio.gather(*tasks)
finally:
Expand Down

0 comments on commit f6deedb

Please sign in to comment.