Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: quite a lot of stuff I've maintained for the past few months #715

Merged
merged 55 commits into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
69eab43
chore: replace deprecated Logger.warn
BobTheBuidler Oct 8, 2024
b2602e2
feat: filter out lps with vaults
BobTheBuidler Oct 8, 2024
63887a0
feat: refactor to prep for bal apy previews
BobTheBuidler Oct 8, 2024
44e9f83
update make
0xBasically Oct 8, 2024
60988ef
feat: delay curve loading
BobTheBuidler Oct 8, 2024
7448733
chore: remove deprecated code
BobTheBuidler Oct 8, 2024
9e93063
chore: add comment
BobTheBuidler Oct 8, 2024
1fe6f0e
feat: async log loading
BobTheBuidler Oct 8, 2024
aa03fa7
feat: more stuff
BobTheBuidler Oct 8, 2024
589d446
feat: more stuff
BobTheBuidler Oct 8, 2024
d6c18c9
fix: load_strategies
BobTheBuidler Oct 8, 2024
8f8ae24
fix: Harvests
BobTheBuidler Oct 8, 2024
008913d
fix: registry loader time
BobTheBuidler Oct 8, 2024
5392357
str not encodable
BobTheBuidler Oct 8, 2024
bac8316
feat: support rkp3r in ypm
BobTheBuidler Oct 8, 2024
e19b386
fix: vault loader
BobTheBuidler Oct 8, 2024
8bee62e
chore: cleanup treasury-txs logs
BobTheBuidler Oct 8, 2024
fb5fb53
chore: contract -> Contract
BobTheBuidler Oct 8, 2024
c18f5fa
chore: cleanup
BobTheBuidler Oct 8, 2024
66f87f5
chore: add comment
BobTheBuidler Oct 8, 2024
3ee9aa3
feat: asyncify curve simple
BobTheBuidler Oct 8, 2024
3867862
chore: remove unneeded code
BobTheBuidler Oct 8, 2024
08eaa3a
fix: treasury txs TypeErr
BobTheBuidler Oct 8, 2024
1c60a6f
fix: from_address is None
BobTheBuidler Oct 8, 2024
090fa54
fix: SyntaxError
BobTheBuidler Oct 8, 2024
f6a19f1
fix(temp): logs for treasury-tx
BobTheBuidler Oct 8, 2024
d822ac2
chore: refactor with new dep members
BobTheBuidler Oct 8, 2024
a9b10dd
chore: replace contract with Contract
BobTheBuidler Oct 8, 2024
2a901c0
feat: optimize db entity handling
BobTheBuidler Oct 8, 2024
e22c38e
feat: load streams with treasry txs exporter
BobTheBuidler Oct 8, 2024
0a04813
fix: asyncify yeth
BobTheBuidler Oct 8, 2024
de1bb0a
feat: make build --no-cache
BobTheBuidler Oct 8, 2024
fbe8fe9
feat: processing queue
BobTheBuidler Oct 8, 2024
b35b609
fix: adapt to ypm 3.6
BobTheBuidler Oct 8, 2024
9a7d3b0
feat: make way less eth_getLogs calls
BobTheBuidler Oct 8, 2024
32354d1
feat: disable sms exporter on base
BobTheBuidler Oct 8, 2024
0117236
chore: replace deprecated dank_w3
BobTheBuidler Oct 8, 2024
885ae0a
fix: treasury and sms on non-mainnet
BobTheBuidler Oct 8, 2024
0363c0e
feat: use Queue, save ram
BobTheBuidler Oct 8, 2024
ca902c5
feat: sort txs
BobTheBuidler Oct 8, 2024
84774db
fix: hanging sorter
BobTheBuidler Oct 8, 2024
b752259
feat: brownie 1.20.3 disables eager caching
BobTheBuidler Oct 8, 2024
1627d97
feat: faster eth-abi
BobTheBuidler Oct 8, 2024
c8aec10
feat: faster builds with poetry
BobTheBuidler Nov 9, 2024
db9cff1
chore: add shitcoins to shitcoins
BobTheBuidler Nov 9, 2024
0c73a52
feat: sort txs
BobTheBuidler Nov 9, 2024
29c3c52
chore: bump eth-portfolio and ypricemagic
BobTheBuidler Nov 9, 2024
5f53f2f
feat: a bunch of old stuff
BobTheBuidler Nov 9, 2024
716ab75
chore: remove unused imports
BobTheBuidler Nov 9, 2024
49992cf
chore: refactor accountant
BobTheBuidler Nov 9, 2024
b3a4f1c
chore: bump deps
BobTheBuidler Nov 9, 2024
2cf031b
feat: optimize streams with caches
BobTheBuidler Nov 9, 2024
95f6f3b
chore: add detail to exc
BobTheBuidler Nov 9, 2024
df2aa4a
chore: rename constant
BobTheBuidler Nov 9, 2024
661963a
chore: bump brownie to 1.20
BobTheBuidler Nov 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: load streams with treasry txs exporter
  • Loading branch information
BobTheBuidler committed Oct 8, 2024
commit e22c38ee27f7fbb7b58a80a906ea09762ae6aa30
8 changes: 7 additions & 1 deletion scripts/exporters/treasury_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from yearn.entities import TreasuryTx, deduplicate_internal_transfers
from yearn.outputs.postgres.utils import address_dbid, chain_dbid, token_dbid
from yearn.treasury import accountant
from yearn.treasury import accountant, streams
from yearn.treasury.treasury import YearnTreasury

sentry_sdk.set_tag('script','treasury_transactions_exporter')
Expand Down Expand Up @@ -55,6 +55,10 @@ def main() -> NoReturn:
@a_sync(default='sync')
async def load_new_txs(start_block: Block, end_block: Block) -> int:
"""returns: number of new txs"""
# NOTE: ensure stream loader task has been started
global _streams_task
if _streams_task is None:
_streams_task = asyncio.create_task(streams._get_coro())
futs = []
async for entry in treasury.ledger[start_block: end_block]:
if isinstance(entry, InternalTransfer) and entry.to_address == GNOSIS_SINGLETON:
Expand Down Expand Up @@ -145,3 +149,5 @@ def _validate_integrity_error(entry: LedgerEntry, log_index: int) -> None:
@db_session
def sort() -> None:
accountant.sort_txs(accountant.unsorted_txs())

_streams_task = None
177 changes: 3 additions & 174 deletions scripts/load_streams.py
Original file line number Diff line number Diff line change
@@ -1,185 +1,14 @@

import asyncio
import typing
from datetime import date, datetime, timedelta
from decimal import Decimal
from functools import lru_cache
from typing import Tuple, Optional

from a_sync import AsyncThreadPoolExecutor
import logging
from async_lru import alru_cache
from brownie import chain
from pony.orm import db_session
from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio
from y import Contract, get_price
from y.time import closest_block_after_timestamp_async

from yearn.entities import Stream, StreamedFunds, create_views
from yearn.treasury.constants import BUYER
from yearn.treasury.streams import YearnStreams
from yearn.utils import dates_generator
from yearn.entities import create_views
from yearn.treasury import streams

logger = logging.getLogger(__name__)

create_views()
streams = YearnStreams()

# TODO: Fit this better into the rest of the exporter so it runs in tandem with treasury txs exporter

@db_session
def main():
#print('buybacks active stream(s)')
for stream in streams.buyback_streams():
#stream.print()
pass

team_dai_streams = [stream for stream in all_other_dai_streams() if int(stream.amount_per_second) == 385802469135802432]

#print(f'{len(team_dai_streams)} team dai streams')
total_rate = 0
for i, stream in enumerate(team_dai_streams):
total_rate += stream.amount_per_second
total_rate /= stream.scale
#print(f'team dai per second: {total_rate}')
#print(f'team dai per day: {total_rate * 60 * 60 * 24}')

v3_multisig_i_think = "0x16388463d60FFE0661Cf7F1f31a7D658aC790ff7"
v3_dai_streams = [stream for stream in all_other_dai_streams() if stream.to_address.address == v3_multisig_i_think]

print(f'{len(v3_dai_streams)} v3 dai streams')
total_rate = 0
for i, stream in enumerate(v3_dai_streams):
total_rate += stream.amount_per_second
total_rate /= stream.scale
print(f'v3 dai per second: {total_rate}')
print(f'v3 dai per day: {total_rate * 60 * 60 * 24}')

misc_dai_streams = [stream for stream in all_other_dai_streams() if stream not in team_dai_streams and stream.to_address.address != v3_multisig_i_think]

print('all other active streams')
total_rate = 0
for i, stream in enumerate(misc_dai_streams):
print(f'stream {i}')
total_rate += stream.amount_per_second
#print(stream.amount_per_second)
stream.print()
total_rate /= stream.scale
print(f'all other streams dai per second: {total_rate}')
print(f'all other streams dai per day: {total_rate * 60 * 60 * 24}')

yfi_streams = streams.yfi_streams()

normal_daily_rate = Decimal(5.2950767511632e-07) * 60 * 60 * 24
print(f'{len(yfi_streams)} active yfi streams')
total_rate = 0
for i, stream in enumerate(yfi_streams):
rate = stream.amount_per_second
total_rate += rate
if rate != 5.2950767511632e-07:
print(f'stream {i}')
stream.print()
print(f'normal YFI per day: {normal_daily_rate}')
daily_rate = rate * 60 * 60 * 24
daily_difference = daily_rate - normal_daily_rate
print(f'daily difference: {daily_difference} YFI')
print(f'monthly difference: {daily_difference * 30} YFI')
print(f'recipient: {stream.to_address.address}')

print('')
print(f'total yfi per second: {total_rate}')
print(f'total yfi per day: {total_rate * 60 * 60 * 24}')

asyncio.get_event_loop().run_until_complete(process_streams())

@db_session
def all_other_dai_streams():
return [s for s in streams.dai_streams() if s.to_address.address != BUYER]


threads = AsyncThreadPoolExecutor(8)

@lru_cache
@db_session
def _get_token_for_stream(stream_id):
return Stream[stream_id].token.address.address

@db_session
def _get_stream_contract(stream_id: str) -> str:
return Stream[stream_id].contract.address

@db_session
def get_start_date(stream_id: str) -> date:
return Stream[stream_id].start_date

@db_session
def is_closed(stream_id: str) -> bool:
return bool(StreamedFunds.get(stream=Stream[stream_id], is_last_day=True))

@alru_cache
async def get_stream_contract(stream_id: str) -> Contract:
address = await threads.run(_get_stream_contract, stream_id)
return await Contract.coroutine(address)

async def start_timestamp(stream_id: str, block: typing.Optional[int] = None) -> int:
contract = await get_stream_contract(stream_id)
return int(await contract.streamToStart.coroutine(f'0x{stream_id}', block_identifier=block))

ONE_DAY = 60 * 60 * 24

async def process_stream_for_date(stream_id: str, date: datetime) -> Optional[StreamedFunds]:
if entity := await threads.run(StreamedFunds.get_entity, stream_id, date):
return entity

stream_token = _get_token_for_stream(stream_id)
check_at = date + timedelta(days=1) - timedelta(seconds=1)
block = await closest_block_after_timestamp_async(int(check_at.timestamp()))
price_fut = asyncio.create_task(get_price(stream_token, block, sync=False))
_start_timestamp = await start_timestamp(stream_id, block)
if _start_timestamp == 0:
# If the stream was already closed, we can return `None`.
if await threads.run(is_closed, stream_id):
price_fut.cancel()
return None

while _start_timestamp == 0:
# is active last block?
block -= 1
_start_timestamp = await start_timestamp(stream_id, block)

block_datetime = datetime.fromtimestamp(chain[block].timestamp)
assert block_datetime.date() == date.date()
seconds_active = (check_at - block_datetime).seconds
is_last_day = True
else:
seconds_active = int(check_at.timestamp()) - _start_timestamp
is_last_day = False

# How many seconds was the stream active on `date`?
seconds_active_today = seconds_active if seconds_active < ONE_DAY else ONE_DAY
if seconds_active_today < ONE_DAY and not is_last_day:
if date.date() == await threads.run(get_start_date, stream_id):
logger.debug('stream started today, partial day accepted')
else:
seconds_active_today = ONE_DAY
logger.debug(F"active for {seconds_active_today} seconds on {date.date()}")
if is_last_day:
logger.debug('is last day')

price = Decimal(await price_fut)
return await threads.run(StreamedFunds.create_entity, stream_id, date, price, seconds_active_today, is_last_day)

def get_start_and_end(stream: Stream) -> Tuple[datetime, datetime]:
start_timestamp = datetime.fromtimestamp(chain[stream.start_block].timestamp)
end_timestamp = datetime.fromtimestamp(chain[stream.end_block].timestamp) if stream.end_block else datetime.utcnow()
return start_timestamp, end_timestamp

async def process_stream(stream, run_forever: bool = False) -> None:
# NOTE: We need to go one-by-one for the math to be right
async for date in dates_generator(*get_start_and_end(stream), stop_at_today=not run_forever):
if await process_stream_for_date(stream.stream_id, date) is None:
return

async def process_streams(run_forever: bool = False):
await tqdm_asyncio.gather(*[process_stream(stream, run_forever=run_forever) for stream in streams.streams(include_inactive=True)], desc='Loading streams')
asyncio.run(streams._get_coro())
Loading