-
Notifications
You must be signed in to change notification settings - Fork 1
Affiliate Data API #21
Changes from 6 commits
1f2ea6b
78a68ce
755334d
ae59b2f
4930caf
caf140d
6a53a84
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
Flask>=2.2.2 | ||
dune-client>=0.3.0 | ||
psycopg2-binary>=2.9.3 | ||
python-dotenv>=0.20.0 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
"""Basic Flask API""" | ||
import os | ||
from typing import Any | ||
|
||
from dotenv import load_dotenv | ||
from dune_client.client import DuneClient | ||
from flask import Flask, jsonify | ||
|
||
from src.fetch.affiliate_data import CachingAffiliateFetcher | ||
from src.logger import set_log | ||
|
||
app = Flask(__name__) | ||
load_dotenv() | ||
cached_fetcher = CachingAffiliateFetcher( | ||
dune=DuneClient(os.environ["DUNE_API_KEY"]), | ||
execution_id=os.environ.get("LAST_EXECUTION_ID"), | ||
cache_validity=int(os.environ["CACHE_VALIDITY"]), | ||
) | ||
log = set_log(__name__) | ||
|
||
|
||
@app.route("/profile/<string:address>", methods=["GET"]) | ||
def get_profile(address: str) -> Any: | ||
""" | ||
Fetches Affiliate Profile for `address` | ||
https://api.cow.fi/affiliate/api/v1/profile/<address> | ||
""" | ||
log.info(f"Fetching affiliate data for {address}") | ||
affiliate_data = cached_fetcher.get_affiliate_data(account=address) | ||
log.debug(f"got results {affiliate_data}") | ||
bh2smith marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return jsonify(affiliate_data) | ||
|
||
|
||
if __name__ == "__main__": | ||
app.run() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
"""Caching Dune affiliate data fetcher""" | ||
from __future__ import annotations | ||
|
||
from dataclasses import dataclass | ||
from datetime import datetime, timedelta | ||
from typing import Optional | ||
|
||
from dune_client.client import DuneClient | ||
from dune_client.query import Query | ||
from dune_client.types import DuneRecord | ||
|
||
from src.logger import set_log | ||
from src.utils import utc_now, valid_address | ||
|
||
AFFILIATE_QUERY_ID = 1757231 | ||
log = set_log(__name__) | ||
|
||
|
||
@dataclass | ||
class AffiliateData: # pylint: disable=invalid-name | ||
"""Represents the affiliate API return type""" | ||
|
||
totalTrades: int | ||
tradeVolumeUsd: float | ||
totalReferrals: int | ||
referralVolumeUsd: float | ||
lastUpdated: Optional[datetime] | ||
|
||
@classmethod | ||
def from_dict(cls, data_dict: DuneRecord, last_update: datetime) -> AffiliateData: | ||
"""Constructor of data class from DuneRecord or any dict with compatible types""" | ||
return cls( | ||
totalTrades=int(data_dict["total_trades"]), | ||
totalReferrals=int(data_dict["total_referrals"]), | ||
tradeVolumeUsd=float(data_dict["trade_volume_usd"]), | ||
referralVolumeUsd=float(data_dict["referral_volume_usd"]), | ||
lastUpdated=last_update, | ||
) | ||
|
||
@classmethod | ||
def default(cls) -> AffiliateData: | ||
"""Default return type for NotFound affiliates""" | ||
return cls( | ||
totalTrades=0, | ||
totalReferrals=0, | ||
tradeVolumeUsd=0.0, | ||
referralVolumeUsd=0.0, | ||
lastUpdated=None, | ||
) | ||
|
||
|
||
@dataclass | ||
class AffiliateMemory: | ||
"""Indexed affiliate data with knowledge of data's age""" | ||
|
||
data: dict[str, AffiliateData] | ||
last_update: datetime | ||
last_execution_id: str | ||
|
||
|
||
class CachingAffiliateFetcher: | ||
""" | ||
Class containing, DuneClient, FileIO and a logger for convenient Dune Fetching. | ||
""" | ||
|
||
def __init__( | ||
self, dune: DuneClient, execution_id: Optional[str], cache_validity: float = 30 | ||
) -> None: | ||
""" | ||
Class constructor. | ||
Builds DuneClient from `api_key`. | ||
- `cache_validity` is in minutes. | ||
- `execution_id` can be used to avoid initial fetching if known | ||
""" | ||
log.info("loading affiliate data cache") | ||
self.dune = dune | ||
self.memory: AffiliateMemory = self._fetch_affiliates(execution_id) | ||
self.cache_validity = timedelta(minutes=cache_validity) | ||
log.info("affiliate data cache loaded") | ||
|
||
def _fetch_affiliates(self, execution_id: Optional[str] = None) -> AffiliateMemory: | ||
"""Fetches Affiliate data from Dune and updates cache""" | ||
if execution_id: | ||
results = self.dune.get_result(execution_id) | ||
else: | ||
# https://dune.com/queries/1757231?d=1 | ||
# Query executes in ~24 seconds. | ||
results = self.dune.refresh(Query(AFFILIATE_QUERY_ID), ping_frequency=10) | ||
bh2smith marked this conversation as resolved.
Show resolved
Hide resolved
|
||
log.info(f"query execution {results.execution_id} succeeded") | ||
|
||
last_update = results.times.execution_ended_at or utc_now() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need the or statement? Wouldn't it be better to throw an error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is because There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think an error would be best practice to know if something did not go well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Although we would already have errored on the refresh if this field was not available. I will also error here if it happens. Will do. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I would rather set a warning here and use utc now because it makes sense and its not clear what to do otherwise. We definitely don't want the app to crash. I could also log an error (so we know about it), but not crash. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In conclusion I don't think we should "throw" and error, but rather "log" an error (so we are informed, but the app doesn't crash). This would lead us to be notice and report a bug in Dune. |
||
return AffiliateMemory( | ||
last_update=last_update, | ||
last_execution_id=results.execution_id, | ||
data={ | ||
row["trader"]: AffiliateData.from_dict(row, last_update=last_update) | ||
for row in results.get_rows() | ||
}, | ||
) | ||
|
||
def _update_memory(self) -> None: | ||
"""Internal method to update the cached affiliate data""" | ||
self.memory = self._fetch_affiliates() | ||
|
||
def cache_expired(self) -> bool: | ||
""" | ||
Determines if data cache is expired. | ||
""" | ||
before = self.memory.last_update | ||
delta = self.cache_validity | ||
return utc_now() > before + delta | ||
bh2smith marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def get_affiliate_data(self, account: str) -> AffiliateData: | ||
""" | ||
Returns affiliate data for `account`. | ||
If the record on hand is considered "too old", | ||
refreshes memory before return. | ||
""" | ||
if not valid_address(account): | ||
log.warning(f"Invalid address received {account}") | ||
bh2smith marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if self.cache_expired(): | ||
log.debug("cache expired - refreshing results") | ||
self._update_memory() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this mean that the api query could take 30 secs? We should check with the front-end whether this is okay. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer to update in the background a bit less frequently. The original Idea I had for this was that we refresh whenever a request comes in AND the cache is expired, but we return the value we currently have. This way we don't need to have scheduled refreshes, but demand triggers an update whenever necessary. Furthermore, nobody would every have to wait because we always return something. I still need to figure out how exactly to implement this, but would prefer to do it in a follow up PR (as it is a more sophisticated feature). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
But if we would like to ever serve two requests at the same time in parallel, then it could be that both trigger dune to recalculate it, right? Maybe it's not so important, as we will not get there soon. Not sure how we would handle it. What do you dislike about my proposal? Is it cost saving? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am okay to do it in another PR. sure There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea... the only thing about the proposal I am not fond of is the 5 minute updates. Would prefer something like 30. I also don't know how to implement the update in parallel with the service at this moment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Important to remember that prices may not even become available so frequently. |
||
|
||
clean_address = account.lower() | ||
result = self.memory.data.get(clean_address, AffiliateData.default()) | ||
log.debug(f"Found result for {account}: {result}") | ||
return result |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
import os | ||
import os | ||
import unittest | ||
|
||
from dune_client.client import DuneClient | ||
|
||
from src.fetch.affiliate_data import ( | ||
CachingAffiliateFetcher, | ||
) | ||
|
||
|
||
class TestCachingAffiliateFetcher(unittest.TestCase): | ||
"""This test requires valid API Key""" | ||
|
||
def setUp(self) -> None: | ||
self.dune = DuneClient(os.environ["DUNE_API_KEY"]) | ||
|
||
def test_affiliate_fetching(self): | ||
fetcher = CachingAffiliateFetcher(self.dune, execution_id=None) | ||
known_trader_and_referrer = "0x0d5dc686d0a2abbfdafdfb4d0533e886517d4e83" | ||
affiliate_data = fetcher.get_affiliate_data(known_trader_and_referrer) | ||
# As of January 17, 2023 | ||
self.assertGreater(affiliate_data.totalTrades, 241) | ||
self.assertGreater(affiliate_data.totalReferrals, 7) | ||
self.assertGreater(affiliate_data.tradeVolumeUsd, 75687577) | ||
self.assertGreater(affiliate_data.referralVolumeUsd, 75788291) | ||
|
||
def test_affiliate_fetching_deterministic(self): | ||
fetcher = CachingAffiliateFetcher( | ||
self.dune, execution_id="01GQ0YASEDSE1W7MENGTT1Q3AJ" | ||
) | ||
known_trader_and_referrer = "0x0d5dc686d0a2abbfdafdfb4d0533e886517d4e83" | ||
affiliate_data = fetcher.get_affiliate_data(known_trader_and_referrer) | ||
|
||
self.assertEqual(affiliate_data.totalTrades, 242) | ||
self.assertEqual(affiliate_data.totalReferrals, 8) | ||
self.assertAlmostEqual(affiliate_data.tradeVolumeUsd, 75687577.26913233) | ||
self.assertAlmostEqual(affiliate_data.referralVolumeUsd, 75788291.13699351) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should be able to use flask's
APScheduler
to trigger a cache refresh periodically (e.g. once every 5 minutes): https://github.com/viniciuschiele/flask-apscheduler/blob/master/examples/jobs.pyThis would allow us to get rid of cache invalidation altogether (which should simplify the code). Basically the cache never becomes invalid it only gets updated periodically (and serves what ever data is current). We should add a prometheus metric on the last updated timestamp and alert if the update wasn't triggered for too long.
This design will also make sure there is only ever one thing writing the cache which makes us have to worry much less about concurrency.