diff --git a/.env.sample b/.env.sample index 30c8c807..3adeb977 100644 --- a/.env.sample +++ b/.env.sample @@ -28,3 +28,29 @@ ETHERSCAN_API_KEY= # Query ID for the aggregate query on Dune AGGREGATE_QUERY_ID= + +# Node for each chain we want to run a sync job on +NODE_URL_ETHEREUM= +NODE_URL_GNOSIS= +NODE_URL_ARBITRUM= + +# The network which we run a sync job on. +# Current options are: {"mainnet", "xdai", "arbitrum-one"} +NETWORK= + +# The prefix of the dune table where we sync the various mainnet price feeds +PRICE_FEED_TARGET_TABLE= + +# The prefix of the dune table where we sync the monthly raw batch data +BATCH_DATA_TARGET_TABLE="batch_data" + +# Dune api timeout parameter, where we recommend to set it to 600 +DUNE_API_REQUEST_TIMEOUT=600 + +# The following constants define the upper and lower caps per network +EPSILON_LOWER_ETHEREUM= +EPSILON_UPPER_ETHEREUM= +EPSILON_LOWER_GNOSIS= +EPSILON_UPPER_GNOSIS= +EPSILON_LOWER_ARBITRUM= +EPSILON_UPPER_ARBITRUM= diff --git a/requirements/prod.txt b/requirements/prod.txt index 4ab4c0d1..32a1ff67 100644 --- a/requirements/prod.txt +++ b/requirements/prod.txt @@ -7,3 +7,4 @@ ndjson>=0.3.1 py-multiformats-cid>=0.4.4 boto3>=1.26.12 SQLAlchemy>=2.0,<3.0 +web3==6.20.3 diff --git a/src/fetch/orderbook.py b/src/fetch/orderbook.py index 01ce3f93..1de3b621 100644 --- a/src/fetch/orderbook.py +++ b/src/fetch/orderbook.py @@ -14,11 +14,12 @@ from src.logger import set_log from src.models.block_range import BlockRange -from src.utils import open_query +from src.utils import open_query, node_suffix log = set_log(__name__) MAX_PROCESSING_DELAY = 10 +BUCKET_SIZE = {"mainnet": 10000, "xdai": 30000, "arbitrum-one": 1000000} class OrderbookEnv(Enum): @@ -45,7 +46,14 @@ class OrderbookFetcher: def _pg_engine(db_env: OrderbookEnv) -> Engine: """Returns a connection to postgres database""" load_dotenv() - db_url = os.environ[f"{db_env}_DB_URL"] + if "NETWORK" in os.environ: + db_url = ( + os.environ[f"{db_env}_DB_URL"] + + "/" + + os.environ.get("NETWORK", "mainnet") + ) + else: + db_url = os.environ[f"{db_env}_DB_URL"] db_string = f"postgresql+psycopg2://{db_url}" return create_engine(db_string) @@ -166,6 +174,87 @@ def get_batch_rewards(cls, block_range: BlockRange) -> DataFrame: return barn.copy() return pd.DataFrame() + @classmethod + def run_batch_data_sql(cls, block_range: BlockRange) -> DataFrame: + """ + Fetches and validates Batch data DataFrame as concatenation from Prod and Staging DB + """ + load_dotenv() + network = node_suffix(os.environ["NETWORK"]) + epsilon_upper = str(os.environ[f"EPSILON_UPPER_{network}"]) + epsilon_lower = str(os.environ[f"EPSILON_LOWER_{network}"]) + batch_data_query_prod = ( + open_query("orderbook/batch_data.sql") + .replace("{{start_block}}", str(block_range.block_from)) + .replace("{{end_block}}", str(block_range.block_to)) + .replace( + "{{EPSILON_LOWER}}", epsilon_lower + ) # lower ETH cap for payment (in WEI) + .replace( + "{{EPSILON_UPPER}}", epsilon_upper + ) # upper ETH cap for payment (in WEI) + .replace("{{env}}", "prod") + ) + batch_data_query_barn = ( + open_query("orderbook/batch_data.sql") + .replace("{{start_block}}", str(block_range.block_from)) + .replace("{{end_block}}", str(block_range.block_to)) + .replace( + "{{EPSILON_LOWER}}", epsilon_lower + ) # lower ETH cap for payment (in WEI) + .replace( + "{{EPSILON_UPPER}}", epsilon_upper + ) # upper ETH cap for payment (in WEI) + .replace("{{env}}", "barn") + ) + data_types = { + # According to this: https://stackoverflow.com/a/11548224 + # capitalized int64 means `Optional` and it appears to work. + "block_number": "Int64", + "block_deadline": "int64", + } + barn, prod = cls._query_both_dbs( + batch_data_query_prod, batch_data_query_barn, data_types + ) + + # Warn if solver appear in both environments. + if not set(prod.solver).isdisjoint(set(barn.solver)): + log.warning( + f"solver overlap in {block_range}: solvers " + f"{set(prod.solver).intersection(set(barn.solver))} part of both prod and barn" + ) + + if not prod.empty and not barn.empty: + return pd.concat([prod, barn]) + if not prod.empty: + return prod.copy() + if not barn.empty: + return barn.copy() + return pd.DataFrame() + + @classmethod + def get_batch_data(cls, block_range: BlockRange) -> DataFrame: + """ + Decomposes the block range into buckets of 10k blocks each, + so as to ensure the batch data query runs fast enough. + At the end, it concatenates everything into one data frame + """ + load_dotenv() + start = block_range.block_from + end = block_range.block_to + bucket_size = BUCKET_SIZE[os.environ.get("NETWORK", "mainnet")] + res = [] + while start < end: + size = min(end - start, bucket_size) + log.info(f"About to process block range ({start}, {start + size})") + res.append( + cls.run_batch_data_sql( + BlockRange(block_from=start, block_to=start + size) + ) + ) + start = start + size + return pd.concat(res) + @classmethod def get_app_hashes(cls) -> DataFrame: """ diff --git a/src/main.py b/src/main.py index 56b93812..faed5e6a 100644 --- a/src/main.py +++ b/src/main.py @@ -7,6 +7,7 @@ from dotenv import load_dotenv from dune_client.client import DuneClient +from web3 import Web3 from src.fetch.orderbook import OrderbookFetcher from src.logger import set_log @@ -14,8 +15,15 @@ from src.post.aws import AWSClient from src.sync import sync_app_data from src.sync import sync_price_feed -from src.sync.config import SyncConfig, AppDataSyncConfig, PriceFeedSyncConfig +from src.sync.config import ( + SyncConfig, + AppDataSyncConfig, + PriceFeedSyncConfig, + BatchDataSyncConfig, +) from src.sync.order_rewards import sync_order_rewards, sync_batch_rewards +from src.sync.batch_data import sync_batch_data +from src.utils import node_suffix log = set_log(__name__) @@ -57,6 +65,11 @@ def main() -> None: request_timeout=float(os.environ.get("DUNE_API_REQUEST_TIMEOUT", 10)), ) orderbook = OrderbookFetcher() + network = os.environ.get("NETWORK", "mainnet") + log.info(f"Network is set to: {network}") + web3 = Web3( + Web3.HTTPProvider(os.environ.get("NODE_URL" + "_" + node_suffix(network))) + ) if args.sync_table == SyncTable.APP_DATA: table = os.environ["APP_DATA_TARGET_TABLE"] @@ -98,6 +111,18 @@ def main() -> None: fetcher=orderbook, dry_run=args.dry_run, ) + elif args.sync_table == SyncTable.BATCH_DATA: + table = os.environ["BATCH_DATA_TARGET_TABLE"] + assert table, "BATCH DATA sync needs a BATCH_DATA_TARGET_TABLE env" + asyncio.run( + sync_batch_data( + web3, + orderbook, + dune=dune, + config=BatchDataSyncConfig(table), + dry_run=args.dry_run, + ) + ) else: log.error(f"unsupported sync_table '{args.sync_table}'") diff --git a/src/models/tables.py b/src/models/tables.py index 94dc69be..51bc4d59 100644 --- a/src/models/tables.py +++ b/src/models/tables.py @@ -8,6 +8,7 @@ class SyncTable(Enum): APP_DATA = "app_data" ORDER_REWARDS = "order_rewards" BATCH_REWARDS = "batch_rewards" + BATCH_DATA = "batch_data" INTERNAL_IMBALANCE = "internal_imbalance" PRICE_FEED = "price_feed" diff --git a/src/sql/orderbook/batch_data.sql b/src/sql/orderbook/batch_data.sql new file mode 100644 index 00000000..a008eda7 --- /dev/null +++ b/src/sql/orderbook/batch_data.sql @@ -0,0 +1,249 @@ +WITH observed_settlements AS ( + SELECT + -- settlement + tx_hash, + solver, + s.block_number, + -- settlement_observations + effective_gas_price * gas_used AS execution_cost, + surplus, + s.auction_id + FROM + settlement_observations so + JOIN settlements s ON s.block_number = so.block_number + AND s.log_index = so.log_index + JOIN settlement_scores ss ON s.auction_id = ss.auction_id + WHERE + ss.block_deadline > {{start_block}} + AND ss.block_deadline <= {{end_block}} +), +-- order data +order_data AS ( + SELECT + uid, + sell_token, + buy_token, + sell_amount, + buy_amount, + kind, + app_data + FROM orders + UNION ALL + SELECT + uid, + sell_token, + buy_token, + sell_amount, + buy_amount, + kind, + app_data + FROM jit_orders +), +-- unprocessed trade data +trade_data_unprocessed AS ( + SELECT + ss.winner AS solver, + s.auction_id, + s.tx_hash, + t.order_uid, + od.sell_token, + od.buy_token, + t.sell_amount, -- the total amount the user sends + t.buy_amount, -- the total amount the user receives + oe.surplus_fee AS observed_fee, -- the total discrepancy between what the user sends and what they would have send if they traded at clearing price + od.kind, + CASE + WHEN od.kind = 'sell' THEN od.buy_token + WHEN od.kind = 'buy' THEN od.sell_token + END AS surplus_token, + convert_from(ad.full_app_data, 'UTF8')::JSONB->'metadata'->'partnerFee'->>'recipient' AS partner_fee_recipient, + COALESCE(oe.protocol_fee_amounts[1], 0) AS first_protocol_fee_amount, + COALESCE(oe.protocol_fee_amounts[2], 0) AS second_protocol_fee_amount + FROM + settlements s + JOIN settlement_scores ss -- contains block_deadline + ON s.auction_id = ss.auction_id + JOIN trades t -- contains traded amounts + ON s.block_number = t.block_number -- given the join that follows with the order execution table, this works even when multiple txs appear in the same block + JOIN order_data od -- contains tokens and limit amounts + ON t.order_uid = od.uid + JOIN order_execution oe -- contains surplus fee + ON t.order_uid = oe.order_uid + AND s.auction_id = oe.auction_id + LEFT OUTER JOIN app_data ad -- contains full app data + ON od.app_data = ad.contract_app_data + WHERE + ss.block_deadline > {{start_block}} + AND ss.block_deadline <= {{end_block}} +), +-- processed trade data: +trade_data_processed AS ( + SELECT + auction_id, + solver, + tx_hash, + order_uid, + sell_amount, + buy_amount, + sell_token, + observed_fee, + surplus_token, + second_protocol_fee_amount, + first_protocol_fee_amount + second_protocol_fee_amount AS protocol_fee, + partner_fee_recipient, + CASE + WHEN partner_fee_recipient IS NOT NULL THEN second_protocol_fee_amount + ELSE 0 + END AS partner_fee, + surplus_token AS protocol_fee_token + FROM + trade_data_unprocessed +), +price_data AS ( + SELECT + tdp.auction_id, + tdp.order_uid, + ap_surplus.price / pow(10, 18) AS surplus_token_native_price, + ap_protocol.price / pow(10, 18) AS protocol_fee_token_native_price, + ap_sell.price / pow(10, 18) AS network_fee_token_native_price + FROM + trade_data_processed AS tdp + LEFT OUTER JOIN auction_prices ap_sell -- contains price: sell token + ON tdp.auction_id = ap_sell.auction_id + AND tdp.sell_token = ap_sell.token + LEFT OUTER JOIN auction_prices ap_surplus -- contains price: surplus token + ON tdp.auction_id = ap_surplus.auction_id + AND tdp.surplus_token = ap_surplus.token + LEFT OUTER JOIN auction_prices ap_protocol -- contains price: protocol fee token + ON tdp.auction_id = ap_protocol.auction_id + AND tdp.surplus_token = ap_protocol.token +), +trade_data_processed_with_prices AS ( + SELECT + tdp.auction_id, + tdp.solver, + tdp.tx_hash, + tdp.order_uid, + tdp.surplus_token, + tdp.protocol_fee, + tdp.protocol_fee_token, + tdp.partner_fee, + tdp.partner_fee_recipient, + CASE + WHEN tdp.sell_token != tdp.surplus_token THEN tdp.observed_fee - (tdp.sell_amount - tdp.observed_fee) / tdp.buy_amount * COALESCE(tdp.protocol_fee, 0) + ELSE tdp.observed_fee - COALESCE(tdp.protocol_fee, 0) + END AS network_fee, + tdp.sell_token AS network_fee_token, + surplus_token_native_price, + protocol_fee_token_native_price, + network_fee_token_native_price + FROM + trade_data_processed AS tdp + JOIN price_data pd + ON tdp.auction_id = pd.auction_id + AND tdp.order_uid = pd.order_uid +), +batch_protocol_fees AS ( + SELECT + solver, + tx_hash, + sum(protocol_fee * protocol_fee_token_native_price) AS protocol_fee + FROM + trade_data_processed_with_prices + GROUP BY + solver, + tx_hash +), +batch_network_fees AS ( + SELECT + solver, + tx_hash, + sum(network_fee * network_fee_token_native_price) AS network_fee + FROM + trade_data_processed_with_prices + GROUP BY + solver, + tx_hash +), +reward_data AS ( + SELECT + -- observations + os.tx_hash, + ss.auction_id, + -- TODO - Assuming that `solver == winner` when both not null + -- We will need to monitor that `solver == winner`! + ss.winner as solver, + block_number as settlement_block, + block_deadline, + coalesce(execution_cost, 0) as execution_cost, + coalesce(surplus, 0) as surplus, + -- scores + winning_score, + case + when block_number is not null + and block_number <= block_deadline + 1 then winning_score -- this includes a grace period of one block for settling a batch + else 0 + end as observed_score, + reference_score, + -- protocol_fees + coalesce(cast(protocol_fee as numeric(78, 0)), 0) as protocol_fee, + coalesce( + cast(network_fee as numeric(78, 0)), + 0 + ) as network_fee + FROM + settlement_scores ss + -- outer joins made in order to capture non-existent settlements. + LEFT OUTER JOIN observed_settlements os ON os.auction_id = ss.auction_id + LEFT OUTER JOIN batch_protocol_fees bpf ON bpf.tx_hash = os.tx_hash + LEFT OUTER JOIN batch_network_fees bnf ON bnf.tx_hash = os.tx_hash + WHERE + ss.block_deadline > {{start_block}} + AND ss.block_deadline <= {{end_block}} +), +reward_per_auction as ( + SELECT + tx_hash, + auction_id, + settlement_block, + block_deadline, + solver, + execution_cost, + surplus, + protocol_fee, -- the protocol fee + network_fee, -- the network fee + observed_score - reference_score as uncapped_payment, + -- Capped Reward = CLAMP_[-E, E + exec_cost](uncapped_reward_eth) + LEAST( + GREATEST( + - 10000000000000000000, + observed_score - reference_score + ), + 20000000000000000000 + ) as capped_payment, + winning_score, + reference_score + FROM + reward_data +) +SELECT + '{{env}}' as environment, + auction_id, + settlement_block as block_number, + block_deadline, + case + when tx_hash is NULL then NULL + else concat('0x', encode(tx_hash, 'hex')) + end as tx_hash, + concat('0x', encode(solver, 'hex')) as solver, + execution_cost :: text as execution_cost, + surplus :: text as surplus, + protocol_fee :: text as protocol_fee, + network_fee :: text as network_fee, + uncapped_payment :: text as uncapped_payment_eth, + capped_payment :: text as capped_payment, + winning_score :: text as winning_score, + reference_score :: text as reference_score +FROM + reward_per_auction +ORDER BY block_deadline diff --git a/src/sync/batch_data.py b/src/sync/batch_data.py new file mode 100644 index 00000000..6b8bd0e2 --- /dev/null +++ b/src/sync/batch_data.py @@ -0,0 +1,49 @@ +"""Main Entry point for batch data sync""" +import os +from dotenv import load_dotenv +from dune_client.client import DuneClient +from web3 import Web3 +from src.fetch.orderbook import OrderbookFetcher +from src.logger import set_log +from src.sync.config import BatchDataSyncConfig +from src.sync.common import compute_block_and_month_range +from src.models.block_range import BlockRange +from src.utils import node_suffix + + +log = set_log(__name__) + + +async def sync_batch_data( + node: Web3, + orderbook: OrderbookFetcher, + dune: DuneClient, + config: BatchDataSyncConfig, + dry_run: bool, +) -> None: + """Batch data Sync Logic""" + load_dotenv() + network = os.environ["NETWORK"] + network_name = node_suffix(network).lower() + + block_range_list, months_list = compute_block_and_month_range(node) + for i, _ in enumerate(block_range_list): + start_block = block_range_list[i][0] + end_block = block_range_list[i][1] + table_name = config.table + "_" + network_name + "_" + months_list[i] + block_range = BlockRange(block_from=start_block, block_to=end_block) + log.info( + f"About to process block range ({start_block}, {end_block}) for month {months_list[i]}" + ) + batch_data = orderbook.get_batch_data(block_range) + log.info("SQL query successfully executed. About to initiate upload to Dune.") + if not dry_run: + dune.upload_csv( + data=batch_data.to_csv(index=False), + table_name=table_name, + description=config.description, + is_private=False, + ) + log.info( + f"batch data sync run completed successfully for month {months_list[i]}" + ) diff --git a/src/sync/common.py b/src/sync/common.py index c3a11195..205f0f11 100644 --- a/src/sync/common.py +++ b/src/sync/common.py @@ -1,5 +1,7 @@ """Shared methods between both sync scripts.""" - +from datetime import datetime, timezone +from typing import List, Tuple +from web3 import Web3 from src.logger import set_log from src.models.tables import SyncTable from src.post.aws import AWSClient @@ -18,3 +20,103 @@ def last_sync_block(aws: AWSClient, table: SyncTable, genesis_block: int = 0) -> block_from = genesis_block return block_from + + +def find_block_with_timestamp(node: Web3, time_stamp: float) -> int: + """ + This implements binary search and returns the smallest block number + whose timestamp is at least as large as the time_stamp argument passed in the function + """ + end_block_number = int(node.eth.get_block("finalized")["number"]) + start_block_number = 1 + close_in_seconds = 30 + + while True: + mid_block_number = (start_block_number + end_block_number) // 2 + block = node.eth.get_block(mid_block_number) + block_time = block["timestamp"] + difference_in_seconds = int((time_stamp - block_time)) + + if abs(difference_in_seconds) < close_in_seconds: + break + + if difference_in_seconds < 0: + end_block_number = mid_block_number - 1 + else: + start_block_number = mid_block_number + 1 + + ## we now brute-force to ensure we have found the right block + for b in range(mid_block_number - 200, mid_block_number + 200): + block = node.eth.get_block(b) + block_time_stamp = block["timestamp"] + if block_time_stamp >= time_stamp: + return int(block["number"]) + # fallback in case correct block number hasn't been found + # in that case, we will include some more blocks than necessary + return mid_block_number + 200 + + +def compute_block_and_month_range( # pylint: disable=too-many-locals + node: Web3, +) -> Tuple[List[Tuple[int, int]], List[str]]: + """ + This determines the block range and the relevant months + for which we will compute and upload data on Dune. + """ + # We first compute the relevant block range + # Here, we assume that the job runs at least once every 24h + # Because of that, if it is the first day of month, we also + # compute the previous month's table just to be on the safe side + + latest_finalized_block = node.eth.get_block("finalized") + + current_month_end_block = int(latest_finalized_block["number"]) + current_month_end_timestamp = latest_finalized_block["timestamp"] + + current_month_end_datetime = datetime.fromtimestamp( + current_month_end_timestamp, tz=timezone.utc + ) + current_month_start_datetime = datetime( + current_month_end_datetime.year, current_month_end_datetime.month, 1, 00, 00 + ) + current_month_start_timestamp = current_month_start_datetime.replace( + tzinfo=timezone.utc + ).timestamp() + + current_month_start_block = find_block_with_timestamp( + node, current_month_start_timestamp + ) + + current_month = ( + f"{current_month_end_datetime.year}_{current_month_end_datetime.month}" + ) + months_list = [current_month] + block_range = [(current_month_start_block, current_month_end_block)] + if current_month_end_datetime.day == 1: + if current_month_end_datetime.month == 1: + previous_month = f"{current_month_end_datetime.year - 1}_12" + previous_month_start_datetime = datetime( + current_month_end_datetime.year - 1, 12, 1, 00, 00 + ) + else: + previous_month = f"""{current_month_end_datetime.year}_ + {current_month_end_datetime.month - 1} + """ + previous_month_start_datetime = datetime( + current_month_end_datetime.year, + current_month_end_datetime.month - 1, + 1, + 00, + 00, + ) + months_list.append(previous_month) + previous_month_start_timestamp = previous_month_start_datetime.replace( + tzinfo=timezone.utc + ).timestamp() + previous_month_start_block = find_block_with_timestamp( + node, previous_month_start_timestamp + ) + previous_month_end_block = current_month_start_block + block_range.append((previous_month_start_block, previous_month_end_block)) + + return block_range, months_list diff --git a/src/sync/config.py b/src/sync/config.py index 3be39788..6a84d3fb 100644 --- a/src/sync/config.py +++ b/src/sync/config.py @@ -40,3 +40,13 @@ class PriceFeedSyncConfig: description: str = ( "Table containing prices and timestamps from multiple price feeds" ) + + +@dataclass +class BatchDataSyncConfig: + """Configuration for batch data sync.""" + + # The name of the table to upload to + table: str = "batch_data_test" + # Description of the table (for creation) + description: str = "Table containing raw batch data" diff --git a/src/utils.py b/src/utils.py index a601e8c9..409452d9 100644 --- a/src/utils.py +++ b/src/utils.py @@ -13,3 +13,16 @@ def open_query(filename: str) -> str: def query_file(filename: str) -> str: """Returns proper path for filename in QUERY_PATH""" return os.path.join(QUERY_PATH, filename) + + +def node_suffix(network: str) -> str: + """ + Converts network internal name to name used for nodes and dune tables + """ + if network == "mainnet": + return "ETHEREUM" + if network == "xdai": + return "GNOSIS" + if network == "arbitrum-one": + return "ARBITRUM" + return ""