Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: complete serializers reorganization #317

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion apps/data_handler/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
"""Module docstring placeholder."""

1 change: 0 additions & 1 deletion apps/data_handler/celery_app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,3 @@
It sets up the necessary configurations and integrates with the broader
application to manage asynchronous tasks efficiently.
"""

15 changes: 8 additions & 7 deletions apps/data_handler/celery_app/celery_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
# run_loan_states_computation_for_hashtack_v0,;
# run_loan_states_computation_for_hashtack_v1,; run_loan_states_computation_for_nostra_alpha,;
# run_loan_states_computation_for_nostra_mainnet,; run_loan_states_computation_for_zklend,;
# run_liquidable_debt_computation_for_nostra_alpha,;
# run_liquidable_debt_computation_for_nostra_alpha,;
# run_liquidable_debt_computation_for_nostra_mainnet,;
# run_liquidable_debt_computation_for_hashstack_v0,;
# run_liquidable_debt_computation_for_hashstack_v0,;
# run_liquidable_debt_computation_for_hashstack_v1,; uniswap_v2_order_book,

# from data_handler.celery_app.tasks import (
Expand Down Expand Up @@ -68,20 +68,21 @@
"task": "ekubo_order_book",
"schedule": ORDER_BOOK_TIME_INTERVAL,
},
f"process_zklend_events_{CRONTAB_TIME}_mins": {
"task": "process_zklend_events",
"schedule": crontab(minute=f"*/{CRONTAB_TIME}"),
f"process_zklend_events_{CRONTAB_TIME}_mins": {
"task": "process_zklend_events",
"schedule": crontab(minute=f"*/{CRONTAB_TIME}"),
},
}

from data_handler.celery_app.order_books_tasks import ekubo_order_book
from data_handler.celery_app.tasks import (
run_liquidable_debt_computation_for_zklend, )
run_liquidable_debt_computation_for_zklend,
)

# run_loan_states_computation_for_hashtack_v0,; run_loan_states_computation_for_hashtack_v1,;
# run_loan_states_computation_for_nostra_alpha,; run_loan_states_computation_for_nostra_mainnet,;
# run_loan_states_computation_for_zklend,; run_liquidable_debt_computation_for_nostra_alpha,;
# run_liquidable_debt_computation_for_nostra_mainnet,;
# run_liquidable_debt_computation_for_nostra_mainnet,;
# run_liquidable_debt_computation_for_hashstack_v0,;
# run_liquidable_debt_computation_for_hashstack_v1,; uniswap_v2_order_book,

Expand Down
13 changes: 9 additions & 4 deletions apps/data_handler/celery_app/event_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

logger = logging.getLogger(__name__)


@app.task(name="process_zklend_events")
def process_zklend_events():
"""
Expand All @@ -30,21 +31,25 @@ def process_zklend_events():
"Successfully processed ZkLend events in %.2fs. Blocks: %d to %d",
execution_time,
transformer.last_block - transformer.PAGINATION_SIZE,
transformer.last_block
transformer.last_block,
)
except (ValueError, TypeError, RuntimeError) as exc: # Catching more specific exceptions
except (
ValueError,
TypeError,
RuntimeError,
) as exc: # Catching more specific exceptions
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.error(
"Error processing ZkLend events after %.2fs: %s",
execution_time,
exc,
exc_info=True
exc_info=True,
)
except Exception as exc: # Still keeping a general exception catch as a fallback
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.error(
"Unexpected error processing ZkLend events after %.2fs: %s",
execution_time,
exc,
exc_info=True
exc_info=True,
)
15 changes: 11 additions & 4 deletions apps/data_handler/celery_app/order_books_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,27 @@ def ekubo_order_book():
"""
pool_states = EkuboAPIConnector().get_pools()
filtered_pool_states = [
pool_state for pool_state in pool_states if isinstance(pool_state, dict)
and pool_state["token0"] in TOKEN_MAPPING and pool_state["token1"] in TOKEN_MAPPING
pool_state
for pool_state in pool_states
if isinstance(pool_state, dict)
and pool_state["token0"] in TOKEN_MAPPING
and pool_state["token1"] in TOKEN_MAPPING
]
for pool_state in filtered_pool_states:
token_a = pool_state["token0"]
token_b = pool_state["token1"]
logging.getLogger().info(f"Fetching data for token pair: {token_a} and {token_b}")
logging.getLogger().info(
f"Fetching data for token pair: {token_a} and {token_b}"
)
try:
order_book = EkuboOrderBook(token_a, token_b)
order_book.fetch_price_and_liquidity()
serialized_data = order_book.serialize()
connector.write_to_db(OrderBookModel(**serialized_data.model_dump()))
except Exception as exc:
logger.info(f"With token pair: {token_a} and {token_b} something happened: {exc}")
logger.info(
f"With token pair: {token_a} and {token_b} something happened: {exc}"
)
continue


Expand Down
120 changes: 75 additions & 45 deletions apps/data_handler/db/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
DepositEventModel,
CollateralEnabledDisabledEventModel,
BorrowingEventModel,
WithdrawalEventModel
WithdrawalEventModel,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -77,7 +77,9 @@ def write_to_db(self, obj: Base = None) -> None:
finally:
db.close()

def get_object(self, model: Type[ModelType] = None, obj_id: uuid = None) -> ModelType | None:
def get_object(
self, model: Type[ModelType] = None, obj_id: uuid = None
) -> ModelType | None:
"""
Retrieves an object by its ID from the database.
:param: model: type[Base] = None
Expand Down Expand Up @@ -122,9 +124,11 @@ def _get_subquery(self) -> Subquery:
"""
session = self.Session()
return (
session.query(LoanState.user,
func.max(LoanState.block).label("latest_block")).group_by(LoanState.user
).subquery()
session.query(
LoanState.user, func.max(LoanState.block).label("latest_block")
)
.group_by(LoanState.user)
.subquery()
)

def get_latest_block_loans(self) -> Query:
Expand All @@ -136,13 +140,15 @@ def get_latest_block_loans(self) -> Query:
subquery = self._get_subquery()

result = (
session.query(LoanState).join(
session.query(LoanState)
.join(
subquery,
and_(
LoanState.user == subquery.c.user,
LoanState.block == subquery.c.latest_block,
),
).all()
)
.all()
)

return result
Expand Down Expand Up @@ -189,10 +195,10 @@ def get_last_hashstack_loan_state(self, user_id: str) -> HashtackCollateralDebt:
db = self.Session()
try:
return (
db.query(HashtackCollateralDebt).filter(HashtackCollateralDebt.user_id == user_id
).order_by(
HashtackCollateralDebt.loan_id.desc()
).first()
db.query(HashtackCollateralDebt)
.filter(HashtackCollateralDebt.user_id == user_id)
.order_by(HashtackCollateralDebt.loan_id.desc())
.first()
)
finally:
db.close()
Expand All @@ -207,8 +213,9 @@ def get_last_block(self, protocol_id: ProtocolIDs) -> int:
db = self.Session()
try:
max_block = (
db.query(func.max(LoanState.block)).filter(LoanState.protocol_id == protocol_id
).scalar()
db.query(func.max(LoanState.block))
.filter(LoanState.protocol_id == protocol_id)
.scalar()
)
return max_block or 0
finally:
Expand Down Expand Up @@ -248,7 +255,8 @@ def write_loan_states_to_db(self, objects: List[LoanState]) -> None:
"timestamp": obj.timestamp,
"block": obj.block,
"deposit": obj.deposit,
} for obj in objects
}
for obj in objects
]
try:
# Use PostgreSQL's insert with on_conflict_do_update for upserting records
Expand All @@ -264,7 +272,9 @@ def write_loan_states_to_db(self, objects: List[LoanState]) -> None:
# Execute the upsert statement
db.execute(stmt)

logger.info(f"Updating or adding {len(objects)} loan states to the database.")
logger.info(
f"Updating or adding {len(objects)} loan states to the database."
)

# Commit the changes
db.commit()
Expand All @@ -277,7 +287,9 @@ def write_loan_states_to_db(self, objects: List[LoanState]) -> None:
db.close()
logging.info("Loan states have been written to the database.")

def get_latest_order_book(self, dex: str, token_a: str, token_b: str) -> OrderBookModel | None:
def get_latest_order_book(
self, dex: str, token_a: str, token_b: str
) -> OrderBookModel | None:
"""
Retrieves the latest order book for a given pair of tokens and DEX.
:param dex: str - The DEX name.
Expand All @@ -293,43 +305,49 @@ def get_latest_order_book(self, dex: str, token_a: str, token_b: str) -> OrderBo
OrderBookModel.token_b == token_b,
)
max_timestamp = (
select(func.max(OrderBookModel.timestamp)
).where(order_book_condition).scalar_subquery()
select(func.max(OrderBookModel.timestamp))
.where(order_book_condition)
.scalar_subquery()
)
return db.execute(
select(OrderBookModel
).where(OrderBookModel.timestamp == max_timestamp, order_book_condition)
select(OrderBookModel).where(
OrderBookModel.timestamp == max_timestamp, order_book_condition
)
).scalar()
finally:
db.close()

def get_unique_users_last_block_objects(self, protocol_id: ProtocolIDs) -> LoanState:
def get_unique_users_last_block_objects(
self, protocol_id: ProtocolIDs
) -> LoanState:
"""
Retrieves the latest loan states for unique users.
"""
db = self.Session()
try:
# Create a subquery to get the max block for each user
subquery = (
db.query(LoanState.user,
func.max(
LoanState.block
).label("max_block")).filter(LoanState.protocol_id == protocol_id
).group_by(LoanState.user).subquery()
db.query(LoanState.user, func.max(LoanState.block).label("max_block"))
.filter(LoanState.protocol_id == protocol_id)
.group_by(LoanState.user)
.subquery()
)

# Alias the subquery for clarity
alias_subquery = aliased(subquery)

# Join the main LoanState table with the subquery
return (
db.query(LoanState).join(
db.query(LoanState)
.join(
alias_subquery,
and_(
LoanState.user == alias_subquery.c.user,
LoanState.block == alias_subquery.c.max_block,
),
).filter(LoanState.protocol_id == protocol_id).all()
)
.filter(LoanState.protocol_id == protocol_id)
.all()
)
finally:
db.close()
Expand All @@ -345,15 +363,19 @@ def get_last_interest_rate_record_by_protocol_id(
db = self.Session()
try:
return (
db.query(InterestRate).filter(InterestRate.protocol_id == protocol_id
).order_by(InterestRate.block.desc()).first()
db.query(InterestRate)
.filter(InterestRate.protocol_id == protocol_id)
.order_by(InterestRate.block.desc())
.first()
)
finally:
db.close()

def get_interest_rate_by_block(self, block_number: int, protocol_id: str) -> InterestRate:
def get_interest_rate_by_block(
self, block_number: int, protocol_id: str
) -> InterestRate:
"""
Fetch the closest InterestRate instance by block number that is less than or equal
Fetch the closest InterestRate instance by block number that is less than or equal
to the given block number.

:param protocol_id: The protocol ID to search for.
Expand All @@ -363,9 +385,11 @@ def get_interest_rate_by_block(self, block_number: int, protocol_id: str) -> Int
db = self.Session()
try:
return (
db.query(InterestRate).filter(InterestRate.protocol_id == protocol_id
).filter(InterestRate.block <= block_number
).order_by(desc(InterestRate.block)).first()
db.query(InterestRate)
.filter(InterestRate.protocol_id == protocol_id)
.filter(InterestRate.block <= block_number)
.order_by(desc(InterestRate.block))
.first()
)
finally:
db.close()
Expand Down Expand Up @@ -415,15 +439,16 @@ def get_zklend_by_user_ids(self, user_ids: List[str]) -> List[ZkLendCollateralDe
session = self.Session()
try:
return (
session.query(ZkLendCollateralDebt).filter(
ZkLendCollateralDebt.user_id.in_(user_ids)
).all()
session.query(ZkLendCollateralDebt)
.filter(ZkLendCollateralDebt.user_id.in_(user_ids))
.all()
)
finally:
session.close()

def get_hashtack_by_loan_ids(self, loan_ids: List[str],
version: int) -> List[HashtackCollateralDebt]:
def get_hashtack_by_loan_ids(
self, loan_ids: List[str], version: int
) -> List[HashtackCollateralDebt]:
"""
Retrieve HashtackCollateralDebt records by loan_ids.
:param loan_ids: A list of user IDs to filter by.
Expand All @@ -433,9 +458,10 @@ def get_hashtack_by_loan_ids(self, loan_ids: List[str],
session = self.Session()
try:
return (
session.query(HashtackCollateralDebt).filter(
HashtackCollateralDebt.loan_id.in_(loan_ids)
).filter(HashtackCollateralDebt.version == version).all()
session.query(HashtackCollateralDebt)
.filter(HashtackCollateralDebt.loan_id.in_(loan_ids))
.filter(HashtackCollateralDebt.version == version)
.all()
)
finally:
session.close()
Expand Down Expand Up @@ -472,7 +498,9 @@ def save_collateral_enabled_by_user(
collateral = self._convert_decimal_to_float(collateral)
debt = self._convert_decimal_to_float(debt)
try:
record = (session.query(ZkLendCollateralDebt).filter_by(user_id=user_id).first())
record = (
session.query(ZkLendCollateralDebt).filter_by(user_id=user_id).first()
)
if record:
# Update existing record
if collateral is not None:
Expand Down Expand Up @@ -525,7 +553,9 @@ def save_debt_category(
borrowed_collateral = self._convert_decimal_to_float(borrowed_collateral)

try:
record = (session.query(HashtackCollateralDebt).filter_by(loan_id=loan_id).first())
record = (
session.query(HashtackCollateralDebt).filter_by(loan_id=loan_id).first()
)
logger.info(f"Going to save loan_id {loan_id}")
# if debt category is the same, update the record
if record and record.debt_category == debt_category:
Expand Down
Loading