Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Introduce new logic for syncing raw batch data #140

Open
wants to merge 26 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,29 @@ ETHERSCAN_API_KEY=

# Query ID for the aggregate query on Dune
AGGREGATE_QUERY_ID=

# Node for each chain we want to run a sync job on
NODE_URL_ETHEREUM=
NODE_URL_GNOSIS=
NODE_URL_ARBITRUM=

# The network which we run a sync job on.
# Current options are: {"mainnet", "xdai", "arbitrum-one"}
NETWORK=

# The prefix of the dune table where we sync the various mainnet price feeds
PRICE_FEED_TARGET_TABLE=

# The prefix of the dune table where we sync the monthly raw batch data
BATCH_DATA_TARGET_TABLE="batch_data"

# Dune api timeout parameter, where we recommend to set it to 600
DUNE_API_REQUEST_TIMEOUT=600

# The following constants define the upper and lower caps per network
EPSILON_LOWER_ETHEREUM=
EPSILON_UPPER_ETHEREUM=
EPSILON_LOWER_GNOSIS=
EPSILON_UPPER_GNOSIS=
EPSILON_LOWER_ARBITRUM=
EPSILON_UPPER_ARBITRUM=
1 change: 1 addition & 0 deletions requirements/prod.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ ndjson>=0.3.1
py-multiformats-cid>=0.4.4
boto3>=1.26.12
SQLAlchemy>=2.0,<3.0
web3==6.20.3
93 changes: 91 additions & 2 deletions src/fetch/orderbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

from src.logger import set_log
from src.models.block_range import BlockRange
from src.utils import open_query
from src.utils import open_query, node_suffix

log = set_log(__name__)

MAX_PROCESSING_DELAY = 10
BUCKET_SIZE = {"mainnet": 10000, "xdai": 30000, "arbitrum-one": 1000000}


class OrderbookEnv(Enum):
Expand All @@ -45,7 +46,14 @@ class OrderbookFetcher:
def _pg_engine(db_env: OrderbookEnv) -> Engine:
"""Returns a connection to postgres database"""
load_dotenv()
db_url = os.environ[f"{db_env}_DB_URL"]
if "NETWORK" in os.environ:
db_url = (
os.environ[f"{db_env}_DB_URL"]
+ "/"
+ os.environ.get("NETWORK", "mainnet")
)
else:
db_url = os.environ[f"{db_env}_DB_URL"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR changes the meaning of the PROD_DB_URL and BARN_DB_URL env variables, and this else statement just ensures that it defaults to the "old" way, so that the tests won't fail. This should be removed once the secrets in the repo are edited.

db_string = f"postgresql+psycopg2://{db_url}"
return create_engine(db_string)

Expand Down Expand Up @@ -166,6 +174,87 @@ def get_batch_rewards(cls, block_range: BlockRange) -> DataFrame:
return barn.copy()
return pd.DataFrame()

@classmethod
def run_batch_data_sql(cls, block_range: BlockRange) -> DataFrame:
"""
Fetches and validates Batch data DataFrame as concatenation from Prod and Staging DB
"""
load_dotenv()
network = node_suffix(os.environ["NETWORK"])
epsilon_upper = str(os.environ[f"EPSILON_UPPER_{network}"])
epsilon_lower = str(os.environ[f"EPSILON_LOWER_{network}"])
batch_data_query_prod = (
open_query("orderbook/batch_data.sql")
.replace("{{start_block}}", str(block_range.block_from))
.replace("{{end_block}}", str(block_range.block_to))
.replace(
"{{EPSILON_LOWER}}", epsilon_lower
) # lower ETH cap for payment (in WEI)
.replace(
"{{EPSILON_UPPER}}", epsilon_upper
) # upper ETH cap for payment (in WEI)
.replace("{{env}}", "prod")
)
batch_data_query_barn = (
open_query("orderbook/batch_data.sql")
.replace("{{start_block}}", str(block_range.block_from))
.replace("{{end_block}}", str(block_range.block_to))
.replace(
"{{EPSILON_LOWER}}", epsilon_lower
) # lower ETH cap for payment (in WEI)
.replace(
"{{EPSILON_UPPER}}", epsilon_upper
) # upper ETH cap for payment (in WEI)
.replace("{{env}}", "barn")
)
data_types = {
# According to this: https://stackoverflow.com/a/11548224
# capitalized int64 means `Optional<Integer>` and it appears to work.
"block_number": "Int64",
"block_deadline": "int64",
}
barn, prod = cls._query_both_dbs(
batch_data_query_prod, batch_data_query_barn, data_types
)

# Warn if solver appear in both environments.
if not set(prod.solver).isdisjoint(set(barn.solver)):
log.warning(
f"solver overlap in {block_range}: solvers "
f"{set(prod.solver).intersection(set(barn.solver))} part of both prod and barn"
)

if not prod.empty and not barn.empty:
return pd.concat([prod, barn])
if not prod.empty:
return prod.copy()
if not barn.empty:
return barn.copy()
return pd.DataFrame()

@classmethod
def get_batch_data(cls, block_range: BlockRange) -> DataFrame:
"""
Decomposes the block range into buckets of 10k blocks each,
so as to ensure the batch data query runs fast enough.
At the end, it concatenates everything into one data frame
"""
load_dotenv()
start = block_range.block_from
end = block_range.block_to
bucket_size = BUCKET_SIZE[os.environ.get("NETWORK", "mainnet")]
res = []
while start < end:
size = min(end - start, bucket_size)
log.info(f"About to process block range ({start}, {start + size})")
res.append(
cls.run_batch_data_sql(
BlockRange(block_from=start, block_to=start + size)
)
)
start = start + size
return pd.concat(res)

@classmethod
def get_app_hashes(cls) -> DataFrame:
"""
Expand Down
27 changes: 26 additions & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,23 @@

from dotenv import load_dotenv
from dune_client.client import DuneClient
from web3 import Web3

from src.fetch.orderbook import OrderbookFetcher
from src.logger import set_log
from src.models.tables import SyncTable
from src.post.aws import AWSClient
from src.sync import sync_app_data
from src.sync import sync_price_feed
from src.sync.config import SyncConfig, AppDataSyncConfig, PriceFeedSyncConfig
from src.sync.config import (
SyncConfig,
AppDataSyncConfig,
PriceFeedSyncConfig,
BatchDataSyncConfig,
)
from src.sync.order_rewards import sync_order_rewards, sync_batch_rewards
from src.sync.batch_data import sync_batch_data
from src.utils import node_suffix

log = set_log(__name__)

Expand Down Expand Up @@ -57,6 +65,11 @@ def main() -> None:
request_timeout=float(os.environ.get("DUNE_API_REQUEST_TIMEOUT", 10)),
)
orderbook = OrderbookFetcher()
network = os.environ.get("NETWORK", "mainnet")
log.info(f"Network is set to: {network}")
web3 = Web3(
Web3.HTTPProvider(os.environ.get("NODE_URL" + "_" + node_suffix(network)))
)

if args.sync_table == SyncTable.APP_DATA:
table = os.environ["APP_DATA_TARGET_TABLE"]
Expand Down Expand Up @@ -98,6 +111,18 @@ def main() -> None:
fetcher=orderbook,
dry_run=args.dry_run,
)
elif args.sync_table == SyncTable.BATCH_DATA:
table = os.environ["BATCH_DATA_TARGET_TABLE"]
harisang marked this conversation as resolved.
Show resolved Hide resolved
assert table, "BATCH DATA sync needs a BATCH_DATA_TARGET_TABLE env"
asyncio.run(
sync_batch_data(
web3,
orderbook,
dune=dune,
config=BatchDataSyncConfig(table),
dry_run=args.dry_run,
)
)
else:
log.error(f"unsupported sync_table '{args.sync_table}'")

Expand Down
1 change: 1 addition & 0 deletions src/models/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class SyncTable(Enum):
APP_DATA = "app_data"
ORDER_REWARDS = "order_rewards"
BATCH_REWARDS = "batch_rewards"
BATCH_DATA = "batch_data"
INTERNAL_IMBALANCE = "internal_imbalance"
PRICE_FEED = "price_feed"

Expand Down
Loading
Loading