Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Affiliate Data API #21

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pull-request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ jobs:
env:
PROD_DB_URL: ${{ secrets.PROD_DB_URL }}
BARN_DB_URL: ${{ secrets.BARN_DB_URL }}
DUNE_API_KEY: ${{ secrets.DUNE_API_KEY }}
1 change: 1 addition & 0 deletions requirements/prod.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
Flask>=2.2.2
dune-client>=0.3.0
psycopg2-binary>=2.9.3
python-dotenv>=0.20.0
Expand Down
Empty file added src/api/__init__.py
Empty file.
35 changes: 35 additions & 0 deletions src/api/affiliate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Basic Flask API"""
import os
from typing import Any

from dotenv import load_dotenv
from dune_client.client import DuneClient
from flask import Flask, jsonify

from src.fetch.affiliate_data import CachingAffiliateFetcher
from src.logger import set_log

app = Flask(__name__)
load_dotenv()
cached_fetcher = CachingAffiliateFetcher(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to use flask's APScheduler to trigger a cache refresh periodically (e.g. once every 5 minutes): https://github.com/viniciuschiele/flask-apscheduler/blob/master/examples/jobs.py

This would allow us to get rid of cache invalidation altogether (which should simplify the code). Basically the cache never becomes invalid it only gets updated periodically (and serves what ever data is current). We should add a prometheus metric on the last updated timestamp and alert if the update wasn't triggered for too long.

This design will also make sure there is only ever one thing writing the cache which makes us have to worry much less about concurrency.

dune=DuneClient(os.environ["DUNE_API_KEY"]),
execution_id=os.environ.get("LAST_EXECUTION_ID"),
cache_validity=int(os.environ["CACHE_VALIDITY"]),
)
log = set_log(__name__)


@app.route("/profile/<string:address>", methods=["GET"])
def get_profile(address: str) -> Any:
"""
Fetches Affiliate Profile for `address`
https://api.cow.fi/affiliate/api/v1/profile/<address>
"""
log.info(f"Fetching affiliate data for {address}")
affiliate_data = cached_fetcher.get_affiliate_data(account=address)
log.debug(f"got results {affiliate_data}")
bh2smith marked this conversation as resolved.
Show resolved Hide resolved
return jsonify(affiliate_data)


if __name__ == "__main__":
app.run()
129 changes: 129 additions & 0 deletions src/fetch/affiliate_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""Caching Dune affiliate data fetcher"""
from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional

from dune_client.client import DuneClient
from dune_client.query import Query
from dune_client.types import DuneRecord

from src.logger import set_log
from src.utils import utc_now, valid_address

AFFILIATE_QUERY_ID = 1757231
log = set_log(__name__)


@dataclass
class AffiliateData: # pylint: disable=invalid-name
"""Represents the affiliate API return type"""

totalTrades: int
tradeVolumeUsd: float
totalReferrals: int
referralVolumeUsd: float
lastUpdated: Optional[datetime]

@classmethod
def from_dict(cls, data_dict: DuneRecord, last_update: datetime) -> AffiliateData:
"""Constructor of data class from DuneRecord or any dict with compatible types"""
return cls(
totalTrades=int(data_dict["total_trades"]),
totalReferrals=int(data_dict["total_referrals"]),
tradeVolumeUsd=float(data_dict["trade_volume_usd"]),
referralVolumeUsd=float(data_dict["referral_volume_usd"]),
lastUpdated=last_update,
)

@classmethod
def default(cls) -> AffiliateData:
"""Default return type for NotFound affiliates"""
return cls(
totalTrades=0,
totalReferrals=0,
tradeVolumeUsd=0.0,
referralVolumeUsd=0.0,
lastUpdated=None,
)


@dataclass
class AffiliateMemory:
"""Indexed affiliate data with knowledge of data's age"""

data: dict[str, AffiliateData]
last_update: datetime
last_execution_id: str


class CachingAffiliateFetcher:
"""
Class containing, DuneClient, FileIO and a logger for convenient Dune Fetching.
"""

def __init__(
self, dune: DuneClient, execution_id: Optional[str], cache_validity: float = 30
) -> None:
"""
Class constructor.
Builds DuneClient from `api_key`.
- `cache_validity` is in minutes.
- `execution_id` can be used to avoid initial fetching if known
"""
log.info("loading affiliate data cache")
self.dune = dune
self.memory: AffiliateMemory = self._fetch_affiliates(execution_id)
self.cache_validity = timedelta(minutes=cache_validity)
log.info("affiliate data cache loaded")

def _fetch_affiliates(self, execution_id: Optional[str] = None) -> AffiliateMemory:
"""Fetches Affiliate data from Dune and updates cache"""
if execution_id:
results = self.dune.get_result(execution_id)
else:
# https://dune.com/queries/1757231?d=1
# Query executes in ~24 seconds.
results = self.dune.refresh(Query(AFFILIATE_QUERY_ID), ping_frequency=10)
bh2smith marked this conversation as resolved.
Show resolved Hide resolved
log.info(f"query execution {results.execution_id} succeeded")

last_update = results.times.execution_ended_at or utc_now()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the or statement? Wouldn't it be better to throw an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because execution_ended_at is an optional field and we want to have an actual value. Luckily, the field will should always be populated if the query execution was successful. I could throw an error instead if you prefer.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an error would be best practice to know if something did not go well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although we would already have errored on the refresh if this field was not available. I will also error here if it happens. Will do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would rather set a warning here and use utc now because it makes sense and its not clear what to do otherwise. We definitely don't want the app to crash. I could also log an error (so we know about it), but not crash.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In conclusion I don't think we should "throw" and error, but rather "log" an error (so we are informed, but the app doesn't crash). This would lead us to be notice and report a bug in Dune.

return AffiliateMemory(
last_update=last_update,
last_execution_id=results.execution_id,
data={
row["trader"]: AffiliateData.from_dict(row, last_update=last_update)
for row in results.get_rows()
},
)

def _update_memory(self) -> None:
"""Internal method to update the cached affiliate data"""
self.memory = self._fetch_affiliates()

def cache_expired(self) -> bool:
"""
Determines if data cache is expired.
"""
before = self.memory.last_update
delta = self.cache_validity
return utc_now() > before + delta
bh2smith marked this conversation as resolved.
Show resolved Hide resolved

def get_affiliate_data(self, account: str) -> AffiliateData:
"""
Returns affiliate data for `account`.
If the record on hand is considered "too old",
refreshes memory before return.
"""
if not valid_address(account):
log.warning(f"Invalid address received {account}")
bh2smith marked this conversation as resolved.
Show resolved Hide resolved

if self.cache_expired():
log.debug("cache expired - refreshing results")
self._update_memory()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that the api query could take 30 secs? We should check with the front-end whether this is okay.
IF it is not too expensive, we could also just update the cache in the background all 5 minutes and don't update on new requests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to update in the background a bit less frequently.

The original Idea I had for this was that we refresh whenever a request comes in AND the cache is expired, but we return the value we currently have. This way we don't need to have scheduled refreshes, but demand triggers an update whenever necessary. Furthermore, nobody would every have to wait because we always return something. I still need to figure out how exactly to implement this, but would prefer to do it in a follow up PR (as it is a more sophisticated feature).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original Idea I had for this was that we refresh whenever a request comes in AND the cache is expired, but we return the value we currently have. This way we don't need to have scheduled refreshes, but demand triggers an update whenever necessary. Furthermore, nobody would every have to wait because we always return something. I still need to figure out how exactly to implement this, but would prefer to do it in a follow up PR (as it is a more sophisticated feature).

But if we would like to ever serve two requests at the same time in parallel, then it could be that both trigger dune to recalculate it, right? Maybe it's not so important, as we will not get there soon. Not sure how we would handle it.
(I don't know how concurrency in python works.)

What do you dislike about my proposal? Is it cost saving?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am okay to do it in another PR. sure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea... the only thing about the proposal I am not fond of is the 5 minute updates. Would prefer something like 30. I also don't know how to implement the update in parallel with the service at this moment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important to remember that prices may not even become available so frequently.


clean_address = account.lower()
result = self.memory.data.get(clean_address, AffiliateData.default())
log.debug(f"Found result for {account}: {result}")
return result
15 changes: 15 additions & 0 deletions src/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Basic reusable utility functions"""
import os
import re
from datetime import datetime, timezone

from src.environment import QUERY_PATH

Expand All @@ -13,3 +15,16 @@ def open_query(filename: str) -> str:
def query_file(filename: str) -> str:
"""Returns proper path for filename in QUERY_PATH"""
return os.path.join(QUERY_PATH, filename)


def valid_address(address: str) -> bool:
"""Validates Ethereum addresses"""
match_result = re.match(
pattern=r"^(0x)?[0-9a-f]{40}$", string=address, flags=re.IGNORECASE
)
return match_result is not None


def utc_now() -> datetime:
"""Returns current UTC time"""
return datetime.now(tz=timezone.utc)
38 changes: 38 additions & 0 deletions tests/integration/test_caching_affiliate_fetcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os
import os
import unittest

from dune_client.client import DuneClient

from src.fetch.affiliate_data import (
CachingAffiliateFetcher,
)


class TestCachingAffiliateFetcher(unittest.TestCase):
"""This test requires valid API Key"""

def setUp(self) -> None:
self.dune = DuneClient(os.environ["DUNE_API_KEY"])

def test_affiliate_fetching(self):
fetcher = CachingAffiliateFetcher(self.dune, execution_id=None)
known_trader_and_referrer = "0x0d5dc686d0a2abbfdafdfb4d0533e886517d4e83"
affiliate_data = fetcher.get_affiliate_data(known_trader_and_referrer)
# As of January 17, 2023
self.assertGreater(affiliate_data.totalTrades, 241)
self.assertGreater(affiliate_data.totalReferrals, 7)
self.assertGreater(affiliate_data.tradeVolumeUsd, 75687577)
self.assertGreater(affiliate_data.referralVolumeUsd, 75788291)

def test_affiliate_fetching_deterministic(self):
fetcher = CachingAffiliateFetcher(
self.dune, execution_id="01GQ0YASEDSE1W7MENGTT1Q3AJ"
)
known_trader_and_referrer = "0x0d5dc686d0a2abbfdafdfb4d0533e886517d4e83"
affiliate_data = fetcher.get_affiliate_data(known_trader_and_referrer)

self.assertEqual(affiliate_data.totalTrades, 242)
self.assertEqual(affiliate_data.totalReferrals, 8)
self.assertAlmostEqual(affiliate_data.tradeVolumeUsd, 75687577.26913233)
self.assertAlmostEqual(affiliate_data.referralVolumeUsd, 75788291.13699351)
Loading