Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Sync price feed #125

Merged
merged 8 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/fetch/orderbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class OrderbookEnv(Enum):

BARN = "BARN"
PROD = "PROD"
ANALYTICS = "ANALYTICS"

def __str__(self) -> str:
return str(self.value)
Expand Down Expand Up @@ -175,3 +176,11 @@ def get_app_hashes(cls) -> DataFrame:

# We are only interested in unique app data
return pd.concat([prod, barn]).drop_duplicates().reset_index(drop=True)

@classmethod
def get_price_feed(cls) -> DataFrame:
"""
Fetches prices from multiple price feeds from the analytics db
"""
prices_query = open_query("prices.sql")
return cls._read_query_for_env(prices_query, OrderbookEnv.ANALYTICS)
19 changes: 17 additions & 2 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
from src.models.tables import SyncTable
from src.post.aws import AWSClient
from src.sync import sync_app_data
from src.sync.config import SyncConfig, AppDataSyncConfig
from src.sync import sync_price_feed
from src.sync.config import SyncConfig, AppDataSyncConfig, PriceFeedSyncConfig
from src.sync.order_rewards import sync_order_rewards, sync_batch_rewards

log = set_log(__name__)
Expand Down Expand Up @@ -48,7 +49,10 @@ def __init__(self) -> None:
if __name__ == "__main__":
load_dotenv()
args = ScriptArgs()
dune = DuneClient(os.environ["DUNE_API_KEY"])
dune = DuneClient(
api_key=os.environ["DUNE_API_KEY"],
request_timeout=float(os.environ.get("DUNE_API_REQUEST_TIMEOUT", 10)),
)
orderbook = OrderbookFetcher()

if args.sync_table == SyncTable.APP_DATA:
Expand All @@ -62,6 +66,17 @@ def __init__(self) -> None:
dry_run=args.dry_run,
)
)
elif args.sync_table == SyncTable.PRICE_FEED:
table = os.environ["PRICE_FEED_TARGET_TABLE"]
assert table, "PRICE FEED sync needs a PRICE_FEED_TARGET_TABLE env"
asyncio.run(
sync_price_feed(
orderbook,
dune=dune,
config=PriceFeedSyncConfig(table),
dry_run=args.dry_run,
)
)
elif args.sync_table == SyncTable.ORDER_REWARDS:
aws = AWSClient.new_from_environment()
volume_path = Path(os.environ["VOLUME_PATH"])
Expand Down
1 change: 1 addition & 0 deletions src/models/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class SyncTable(Enum):
ORDER_REWARDS = "order_rewards"
BATCH_REWARDS = "batch_rewards"
INTERNAL_IMBALANCE = "internal_imbalance"
PRICE_FEED = "price_feed"

def __str__(self) -> str:
return str(self.value)
Expand Down
8 changes: 8 additions & 0 deletions src/sql/prices.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- Selects all prices collected in the analytics db
SELECT
concat('0x', encode(p.token_address, 'hex')) as token_address,
p.time,
p.price,
td.decimals,
p.source
FROM prices p INNER JOIN token_decimals td ON p.token_address = td.token_address
1 change: 1 addition & 0 deletions src/sync/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
"""Re-exported sync methods."""
from .app_data import sync_app_data
from .price_feed import sync_price_feed
12 changes: 12 additions & 0 deletions src/sync/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,15 @@ class AppDataSyncConfig:
description: str = (
"Table containing known CoW Protocol appData hashes and their pre-images"
)


@dataclass
class PriceFeedSyncConfig:
"""Configuration for price feed sync."""

# The name of the table to upload to
table: str = "price_feed_test"
# Description of the table (for creation)
description: str = (
"Table containing prices and timestamps from multiple price feeds"
)
27 changes: 27 additions & 0 deletions src/sync/price_feed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Main Entry point for price feed sync"""

from dune_client.client import DuneClient

from src.fetch.orderbook import OrderbookFetcher
from src.logger import set_log
from src.sync.config import PriceFeedSyncConfig

log = set_log(__name__)


async def sync_price_feed(
orderbook: OrderbookFetcher,
dune: DuneClient,
config: PriceFeedSyncConfig,
dry_run: bool,
) -> None:
"""Price Feed Sync Logic"""
prices = orderbook.get_price_feed()
if not dry_run:
dune.upload_csv(
data=prices.to_csv(index=False),
table_name=config.table,
description=config.description,
is_private=False,
)
log.info("price feed sync run completed successfully")
Loading