diff --git a/apps/data_handler/celery_app/celery_conf.py b/apps/data_handler/celery_app/celery_conf.py index 4697a5a9..95cd486a 100644 --- a/apps/data_handler/celery_app/celery_conf.py +++ b/apps/data_handler/celery_app/celery_conf.py @@ -5,9 +5,9 @@ # run_loan_states_computation_for_hashtack_v0,; # run_loan_states_computation_for_hashtack_v1,; run_loan_states_computation_for_nostra_alpha,; # run_loan_states_computation_for_nostra_mainnet,; run_loan_states_computation_for_zklend,; -# run_liquidable_debt_computation_for_nostra_alpha,; +# run_liquidable_debt_computation_for_nostra_alpha,; # run_liquidable_debt_computation_for_nostra_mainnet,; -# run_liquidable_debt_computation_for_hashstack_v0,; +# run_liquidable_debt_computation_for_hashstack_v0,; # run_liquidable_debt_computation_for_hashstack_v1,; uniswap_v2_order_book, # from data_handler.celery_app.tasks import ( @@ -68,20 +68,23 @@ "task": "ekubo_order_book", "schedule": ORDER_BOOK_TIME_INTERVAL, }, - f"process_zklend_events_{CRONTAB_TIME}_mins": { - "task": "process_zklend_events", - "schedule": crontab(minute=f"*/{CRONTAB_TIME}"), + f"process_zklend_events_{CRONTAB_TIME}_mins": { + "task": "process_zklend_events", + "schedule": crontab(minute=f"*/{CRONTAB_TIME}"), + }, + f"process_nostra_events_{CRONTAB_TIME}_mins": { + "task": "process_nostra_events", + "schedule": crontab(minute=f"*/{CRONTAB_TIME}"), }, } from data_handler.celery_app.order_books_tasks import ekubo_order_book -from data_handler.celery_app.tasks import ( - run_liquidable_debt_computation_for_zklend, ) +from data_handler.celery_app.tasks import run_liquidable_debt_computation_for_zklend # run_loan_states_computation_for_hashtack_v0,; run_loan_states_computation_for_hashtack_v1,; # run_loan_states_computation_for_nostra_alpha,; run_loan_states_computation_for_nostra_mainnet,; # run_loan_states_computation_for_zklend,; run_liquidable_debt_computation_for_nostra_alpha,; -# run_liquidable_debt_computation_for_nostra_mainnet,; +# run_liquidable_debt_computation_for_nostra_mainnet,; # run_liquidable_debt_computation_for_hashstack_v0,; # run_liquidable_debt_computation_for_hashstack_v1,; uniswap_v2_order_book, diff --git a/apps/data_handler/celery_app/event_tasks.py b/apps/data_handler/celery_app/event_tasks.py index f9d8ceb1..fe7ec2e7 100644 --- a/apps/data_handler/celery_app/event_tasks.py +++ b/apps/data_handler/celery_app/event_tasks.py @@ -6,11 +6,12 @@ from datetime import datetime from data_handler.celery_app.celery_conf import app +from data_handler.handlers.events.nostra.transform_events import NostraTransformer from data_handler.handlers.events.zklend.transform_events import ZklendTransformer - logger = logging.getLogger(__name__) + @app.task(name="process_zklend_events") def process_zklend_events(): """ @@ -30,15 +31,19 @@ def process_zklend_events(): "Successfully processed ZkLend events in %.2fs. Blocks: %d to %d", execution_time, transformer.last_block - transformer.PAGINATION_SIZE, - transformer.last_block + transformer.last_block, ) - except (ValueError, TypeError, RuntimeError) as exc: # Catching more specific exceptions + except ( + ValueError, + TypeError, + RuntimeError, + ) as exc: # Catching more specific exceptions execution_time = (datetime.utcnow() - start_time).total_seconds() logger.error( "Error processing ZkLend events after %.2fs: %s", execution_time, exc, - exc_info=True + exc_info=True, ) except Exception as exc: # Still keeping a general exception catch as a fallback execution_time = (datetime.utcnow() - start_time).total_seconds() @@ -46,5 +51,48 @@ def process_zklend_events(): "Unexpected error processing ZkLend events after %.2fs: %s", execution_time, exc, - exc_info=True + exc_info=True, + ) + + +@app.task(name="process_nostra_events") +def process_nostra_events(): + """ + Process and store Nostra protocol events. + Fetches events from the blockchain, transforms them into the required format, + and saves them to the database. + """ + start_time = datetime.utcnow() + logger.info("Starting Nostra event processing") + try: + # Initialize and run transformer + transformer = NostraTransformer() + transformer.run() + # Log success metrics + execution_time = (datetime.utcnow() - start_time).total_seconds() + logger.info( + "Successfully processed Nostra events in %.2fs. Blocks: %d to %d", + execution_time, + transformer.last_block - transformer.PAGINATION_SIZE, + transformer.last_block, + ) + except ( + ValueError, + TypeError, + RuntimeError, + ) as exc: # Catching more specific exceptions + execution_time = (datetime.utcnow() - start_time).total_seconds() + logger.error( + "Error processing Nostra events after %.2fs: %s", + execution_time, + exc, + exc_info=True, + ) + except Exception as exc: # Still keeping a general exception catch as a fallback + execution_time = (datetime.utcnow() - start_time).total_seconds() + logger.error( + "Unexpected error processing Nostra events after %.2fs: %s", + execution_time, + exc, + exc_info=True, ) diff --git a/apps/data_handler/db/crud.py b/apps/data_handler/db/crud.py index 9d88eeb5..8a104301 100644 --- a/apps/data_handler/db/crud.py +++ b/apps/data_handler/db/crud.py @@ -1,6 +1,7 @@ """Classes: - DBConnector: Manages database connections and CRUD operations. - InitializerDBConnector: Handles ZkLendCollateralDebt-specific operations. +- NostraEventDBConnector: Manages Nostra event-specific operations. - ZkLendEventDBConnector: Manages ZkLend event-specific operations.""" import logging @@ -8,12 +9,6 @@ from typing import List, Optional, Type, TypeVar from data_handler.db.database import SQLALCHEMY_DATABASE_URL -from shared.constants import ProtocolIDs -from sqlalchemy import Subquery, and_, create_engine, desc, func, select -from sqlalchemy.dialects.postgresql import insert -from sqlalchemy.exc import SQLAlchemyError -from sqlalchemy.orm import Query, Session, aliased, scoped_session, sessionmaker - from data_handler.db.models import ( Base, HashtackCollateralDebt, @@ -22,16 +17,26 @@ OrderBookModel, ZkLendCollateralDebt, ) - +from data_handler.db.models.nostra_events import ( + BearingCollateralBurnEventModel, + DebtBurnEventModel, + DebtMintEventModel, + DebtTransferEventModel, +) from data_handler.db.models.zklend_events import ( AccumulatorsSyncEventModel, + BorrowingEventModel, + CollateralEnabledDisabledEventModel, + DepositEventModel, LiquidationEventModel, RepaymentEventModel, - DepositEventModel, - CollateralEnabledDisabledEventModel, - BorrowingEventModel, - WithdrawalEventModel + WithdrawalEventModel, ) +from shared.constants import ProtocolIDs +from sqlalchemy import Subquery, and_, create_engine, desc, func, select +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.orm import Query, Session, aliased, scoped_session, sessionmaker logger = logging.getLogger(__name__) ModelType = TypeVar("ModelType", bound=Base) @@ -77,7 +82,9 @@ def write_to_db(self, obj: Base = None) -> None: finally: db.close() - def get_object(self, model: Type[ModelType] = None, obj_id: uuid = None) -> ModelType | None: + def get_object( + self, model: Type[ModelType] = None, obj_id: uuid = None + ) -> ModelType | None: """ Retrieves an object by its ID from the database. :param: model: type[Base] = None @@ -122,9 +129,11 @@ def _get_subquery(self) -> Subquery: """ session = self.Session() return ( - session.query(LoanState.user, - func.max(LoanState.block).label("latest_block")).group_by(LoanState.user - ).subquery() + session.query( + LoanState.user, func.max(LoanState.block).label("latest_block") + ) + .group_by(LoanState.user) + .subquery() ) def get_latest_block_loans(self) -> Query: @@ -136,13 +145,15 @@ def get_latest_block_loans(self) -> Query: subquery = self._get_subquery() result = ( - session.query(LoanState).join( + session.query(LoanState) + .join( subquery, and_( LoanState.user == subquery.c.user, LoanState.block == subquery.c.latest_block, ), - ).all() + ) + .all() ) return result @@ -189,10 +200,10 @@ def get_last_hashstack_loan_state(self, user_id: str) -> HashtackCollateralDebt: db = self.Session() try: return ( - db.query(HashtackCollateralDebt).filter(HashtackCollateralDebt.user_id == user_id - ).order_by( - HashtackCollateralDebt.loan_id.desc() - ).first() + db.query(HashtackCollateralDebt) + .filter(HashtackCollateralDebt.user_id == user_id) + .order_by(HashtackCollateralDebt.loan_id.desc()) + .first() ) finally: db.close() @@ -207,8 +218,9 @@ def get_last_block(self, protocol_id: ProtocolIDs) -> int: db = self.Session() try: max_block = ( - db.query(func.max(LoanState.block)).filter(LoanState.protocol_id == protocol_id - ).scalar() + db.query(func.max(LoanState.block)) + .filter(LoanState.protocol_id == protocol_id) + .scalar() ) return max_block or 0 finally: @@ -248,7 +260,8 @@ def write_loan_states_to_db(self, objects: List[LoanState]) -> None: "timestamp": obj.timestamp, "block": obj.block, "deposit": obj.deposit, - } for obj in objects + } + for obj in objects ] try: # Use PostgreSQL's insert with on_conflict_do_update for upserting records @@ -264,7 +277,9 @@ def write_loan_states_to_db(self, objects: List[LoanState]) -> None: # Execute the upsert statement db.execute(stmt) - logger.info(f"Updating or adding {len(objects)} loan states to the database.") + logger.info( + f"Updating or adding {len(objects)} loan states to the database." + ) # Commit the changes db.commit() @@ -277,7 +292,9 @@ def write_loan_states_to_db(self, objects: List[LoanState]) -> None: db.close() logging.info("Loan states have been written to the database.") - def get_latest_order_book(self, dex: str, token_a: str, token_b: str) -> OrderBookModel | None: + def get_latest_order_book( + self, dex: str, token_a: str, token_b: str + ) -> OrderBookModel | None: """ Retrieves the latest order book for a given pair of tokens and DEX. :param dex: str - The DEX name. @@ -293,17 +310,21 @@ def get_latest_order_book(self, dex: str, token_a: str, token_b: str) -> OrderBo OrderBookModel.token_b == token_b, ) max_timestamp = ( - select(func.max(OrderBookModel.timestamp) - ).where(order_book_condition).scalar_subquery() + select(func.max(OrderBookModel.timestamp)) + .where(order_book_condition) + .scalar_subquery() ) return db.execute( - select(OrderBookModel - ).where(OrderBookModel.timestamp == max_timestamp, order_book_condition) + select(OrderBookModel).where( + OrderBookModel.timestamp == max_timestamp, order_book_condition + ) ).scalar() finally: db.close() - def get_unique_users_last_block_objects(self, protocol_id: ProtocolIDs) -> LoanState: + def get_unique_users_last_block_objects( + self, protocol_id: ProtocolIDs + ) -> LoanState: """ Retrieves the latest loan states for unique users. """ @@ -311,11 +332,10 @@ def get_unique_users_last_block_objects(self, protocol_id: ProtocolIDs) -> LoanS try: # Create a subquery to get the max block for each user subquery = ( - db.query(LoanState.user, - func.max( - LoanState.block - ).label("max_block")).filter(LoanState.protocol_id == protocol_id - ).group_by(LoanState.user).subquery() + db.query(LoanState.user, func.max(LoanState.block).label("max_block")) + .filter(LoanState.protocol_id == protocol_id) + .group_by(LoanState.user) + .subquery() ) # Alias the subquery for clarity @@ -323,13 +343,16 @@ def get_unique_users_last_block_objects(self, protocol_id: ProtocolIDs) -> LoanS # Join the main LoanState table with the subquery return ( - db.query(LoanState).join( + db.query(LoanState) + .join( alias_subquery, and_( LoanState.user == alias_subquery.c.user, LoanState.block == alias_subquery.c.max_block, ), - ).filter(LoanState.protocol_id == protocol_id).all() + ) + .filter(LoanState.protocol_id == protocol_id) + .all() ) finally: db.close() @@ -345,15 +368,19 @@ def get_last_interest_rate_record_by_protocol_id( db = self.Session() try: return ( - db.query(InterestRate).filter(InterestRate.protocol_id == protocol_id - ).order_by(InterestRate.block.desc()).first() + db.query(InterestRate) + .filter(InterestRate.protocol_id == protocol_id) + .order_by(InterestRate.block.desc()) + .first() ) finally: db.close() - def get_interest_rate_by_block(self, block_number: int, protocol_id: str) -> InterestRate: + def get_interest_rate_by_block( + self, block_number: int, protocol_id: str + ) -> InterestRate: """ - Fetch the closest InterestRate instance by block number that is less than or equal + Fetch the closest InterestRate instance by block number that is less than or equal to the given block number. :param protocol_id: The protocol ID to search for. @@ -363,9 +390,11 @@ def get_interest_rate_by_block(self, block_number: int, protocol_id: str) -> Int db = self.Session() try: return ( - db.query(InterestRate).filter(InterestRate.protocol_id == protocol_id - ).filter(InterestRate.block <= block_number - ).order_by(desc(InterestRate.block)).first() + db.query(InterestRate) + .filter(InterestRate.protocol_id == protocol_id) + .filter(InterestRate.block <= block_number) + .order_by(desc(InterestRate.block)) + .first() ) finally: db.close() @@ -415,15 +444,16 @@ def get_zklend_by_user_ids(self, user_ids: List[str]) -> List[ZkLendCollateralDe session = self.Session() try: return ( - session.query(ZkLendCollateralDebt).filter( - ZkLendCollateralDebt.user_id.in_(user_ids) - ).all() + session.query(ZkLendCollateralDebt) + .filter(ZkLendCollateralDebt.user_id.in_(user_ids)) + .all() ) finally: session.close() - def get_hashtack_by_loan_ids(self, loan_ids: List[str], - version: int) -> List[HashtackCollateralDebt]: + def get_hashtack_by_loan_ids( + self, loan_ids: List[str], version: int + ) -> List[HashtackCollateralDebt]: """ Retrieve HashtackCollateralDebt records by loan_ids. :param loan_ids: A list of user IDs to filter by. @@ -433,9 +463,10 @@ def get_hashtack_by_loan_ids(self, loan_ids: List[str], session = self.Session() try: return ( - session.query(HashtackCollateralDebt).filter( - HashtackCollateralDebt.loan_id.in_(loan_ids) - ).filter(HashtackCollateralDebt.version == version).all() + session.query(HashtackCollateralDebt) + .filter(HashtackCollateralDebt.loan_id.in_(loan_ids)) + .filter(HashtackCollateralDebt.version == version) + .all() ) finally: session.close() @@ -472,7 +503,9 @@ def save_collateral_enabled_by_user( collateral = self._convert_decimal_to_float(collateral) debt = self._convert_decimal_to_float(debt) try: - record = (session.query(ZkLendCollateralDebt).filter_by(user_id=user_id).first()) + record = ( + session.query(ZkLendCollateralDebt).filter_by(user_id=user_id).first() + ) if record: # Update existing record if collateral is not None: @@ -525,7 +558,9 @@ def save_debt_category( borrowed_collateral = self._convert_decimal_to_float(borrowed_collateral) try: - record = (session.query(HashtackCollateralDebt).filter_by(loan_id=loan_id).first()) + record = ( + session.query(HashtackCollateralDebt).filter_by(loan_id=loan_id).first() + ) logger.info(f"Going to save loan_id {loan_id}") # if debt category is the same, update the record if record and record.debt_category == debt_category: @@ -823,3 +858,161 @@ def apply_filters(query): finally: db.close() + + +class NostraEventDBConnector(DBConnector): + """ + Provides CRUD operations specifically for Nostra events, such as DebtMint, DebtBurn, DebtTransfer, + BearingCollateralMint, BearingCollateralBurn, and InterestRateModelUpdate events. + """ + + def create_debt_mint_event( + self, protocol_id: str, event_name: str, block_number: int, event_data: dict + ) -> None: + """ + Creates a DebtMintEventModel record in the database. + """ + db = self.Session() + try: + event = DebtMintEventModel( + protocol_id=protocol_id, + event_name=event_name, + block_number=block_number, + user=event_data.get("user"), + amount=event_data.get("amount"), + ) + db.add(event) + db.commit() + logger.info(f"DebtMint event saved: {event}") + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Error creating DebtMintEventModel: {e}") + raise e + finally: + db.close() + + def create_debt_burn_event( + self, protocol_id: str, event_name: str, block_number: int, event_data: dict + ) -> None: + """ + Creates a DebtBurnEventModel record in the database. + """ + db = self.Session() + try: + event = DebtBurnEventModel( + protocol_id=protocol_id, + event_name=event_name, + block_number=block_number, + user=event_data.get("user"), + amount=event_data.get("amount"), + ) + db.add(event) + db.commit() + logger.info(f"DebtBurn event saved: {event}") + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Error creating DebtBurnEventModel: {e}") + raise e + finally: + db.close() + + def create_debt_transfer_event( + self, protocol_id: str, event_name: str, block_number: int, event_data: dict + ) -> None: + """ + Creates a DebtTransferEventModel record in the database. + """ + db = self.Session() + try: + event = DebtTransferEventModel( + protocol_id=protocol_id, + event_name=event_name, + block_number=block_number, + sender=event_data.get("sender"), + recipient=event_data.get("recipient"), + amount=event_data.get("amount"), + ) + db.add(event) + db.commit() + logger.info(f"DebtTransfer event saved: {event}") + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Error creating DebtTransferEventModel: {e}") + raise e + finally: + db.close() + + def create_bearing_collateral_burn_event( + self, protocol_id: str, event_name: str, block_number: int, event_data: dict + ) -> None: + """ + Creates a BearingCollateralBurnEventModel record in the database. + """ + db = self.Session() + try: + event = BearingCollateralBurnEventModel( + protocol_id=protocol_id, + event_name=event_name, + block_number=block_number, + user=event_data.get("user"), + amount=event_data.get("amount"), + ) + db.add(event) + db.commit() + logger.info(f"BearingCollateralBurn event saved: {event}") + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Error creating BearingCollateralBurnEventModel: {e}") + raise e + finally: + db.close() + + def get_all_events( + self, + protocol_id: Optional[str] = None, + event_name: Optional[str] = None, + block_number: Optional[int] = None, + ) -> List: + """ + Retrieves all types of Nostra events based on filtering criteria. + """ + db = self.Session() + try: + + def apply_filters(query, model): + if protocol_id is not None: + query = query.filter(model.protocol_id == protocol_id) + if event_name is not None: + query = query.filter(model.event_name == event_name) + if block_number is not None: + query = query.filter(model.block_number == block_number) + return query + + debt_mint_query = apply_filters( + db.query(DebtMintEventModel), DebtMintEventModel + ) + debt_burn_query = apply_filters( + db.query(DebtBurnEventModel), DebtBurnEventModel + ) + debt_transfer_query = apply_filters( + db.query(DebtTransferEventModel), DebtTransferEventModel + ) + bearing_collateral_burn_query = apply_filters( + db.query(BearingCollateralBurnEventModel), + BearingCollateralBurnEventModel, + ) + + combined_query = debt_mint_query.union_all( + debt_burn_query, + debt_transfer_query, + bearing_collateral_burn_query, + ) + events = combined_query.all() + return events + + except SQLAlchemyError as e: + logger.error(f"Error retrieving Nostra events: {e}") + raise e + + finally: + db.close() diff --git a/apps/data_handler/handler_tools/data_parser/nostra.py b/apps/data_handler/handler_tools/data_parser/nostra.py index 46f068ff..94a19801 100644 --- a/apps/data_handler/handler_tools/data_parser/nostra.py +++ b/apps/data_handler/handler_tools/data_parser/nostra.py @@ -1,16 +1,18 @@ """ This module contains the logic to parse the nostra data to human-readable format. """ +from decimal import Decimal from typing import Any, List + from data_handler.handler_tools.data_parser.serializers import ( - DebtMintEventData, + BearingCollateralBurnEventData, + BearingCollateralMintEventData, DebtBurnEventData, - InterestRateModelEventData, + DebtMintEventData, DebtTransferEventData, - BearingCollateralMintEventData, - BearingCollateralBurnEventData, + InterestRateModelEventData, + NonInterestBearingCollateralBurnEventData, NonInterestBearingCollateralMintEventData, - NonInterestBearingCollateralBurnEventData ) @@ -18,6 +20,7 @@ class NostraDataParser: """ Parses the nostra data to human-readable format. """ + @classmethod def parse_interest_rate_model_event( cls, event_data: List[Any] @@ -26,11 +29,11 @@ def parse_interest_rate_model_event( Parses the interest rate model event data into a human-readable format. The event data is fetched from on-chain logs and is structured in the following way: - event_data[0]: The debt token address (as a hexadecimal string). - - event_data[5]: The lending interest rate index (as a hexadecimal in 18 decimal places). + - event_data[5]: The lending interest rate index (as a hexadecimal in 18 decimal places). - event_data[7]: The borrow interest rate index (as a hexadecimal in 18 decimal places). Args: event_data (List[Any]): A list containing the raw event data. - Expected order: [debt_token, lending_rate, _, borrow_rate, _, + Expected order: [debt_token, lending_rate, _, borrow_rate, _, lending_index, _, borrow_index, _] Returns: InterestRateModelEventData: A model with the parsed event data. @@ -38,9 +41,9 @@ def parse_interest_rate_model_event( return InterestRateModelEventData( debt_token=event_data[0], lending_index=event_data[5], - borrow_index=event_data[7] + borrow_index=event_data[7], ) - + @classmethod def parse_non_interest_bearing_collateral_mint_event( cls, event_data: list[Any] @@ -50,7 +53,7 @@ def parse_non_interest_bearing_collateral_mint_event( The event data is structured as follows: - event_data[0]: sender address - - event_data[1]: recipient address + - event_data[1]: recipient address - event_data[2]: raw amount Args: @@ -61,9 +64,7 @@ def parse_non_interest_bearing_collateral_mint_event( NonInterestBearingCollateralMintEventData: A model with the parsed event data. """ return NonInterestBearingCollateralMintEventData( - sender=event_data[0], - recipient=event_data[1], - raw_amount=event_data[2] + sender=event_data[0], recipient=event_data[1], raw_amount=event_data[2] ) @classmethod @@ -85,14 +86,15 @@ def parse_non_interest_bearing_collateral_burn_event( NonInterestBearingCollateralBurnEventData: A model with the parsed event data. """ return NonInterestBearingCollateralBurnEventData( - user=event_data[0], - face_amount=event_data[1] + user=event_data[0], face_amount=event_data[1] ) - def parse_interest_bearing_collateral_mint_event(self, event_data: list[Any]) -> BearingCollateralMintEventData: + def parse_interest_bearing_collateral_mint_event( + self, event_data: list[Any] + ) -> BearingCollateralMintEventData: """ Parses the BearingCollateralMint event data into a human-readable format using the BearingCollateralMintEventData serializer. - + The event data is fetched from on-chain logs and is structured in the following way: - event_data[0]: The user address (as a hexadecimal string). - event_data[1]: Token amount @@ -109,10 +111,12 @@ def parse_interest_bearing_collateral_mint_event(self, event_data: list[Any]) -> amount=event_data[1], ) - def parse_interest_bearing_collateral_burn_event(self, event_data: list[Any]) -> BearingCollateralBurnEventData: + def parse_interest_bearing_collateral_burn_event( + self, event_data: list[Any] + ) -> BearingCollateralBurnEventData: """ Parses the BearingCollateralMint event data into a human-readable format using the BearingCollateralMintEventData serializer. - + The event data is fetched from on-chain logs and is structured in the following way: - event_data[0]: The user address (as a hexadecimal string). - event_data[1]: Token amount @@ -126,12 +130,10 @@ def parse_interest_bearing_collateral_burn_event(self, event_data: list[Any]) -> """ return BearingCollateralBurnEventData( user=event_data[0], - amount=event_data[1], + amount=Decimal(int(event_data[1], 16)), ) - def parse_debt_transfer_event( - cls, event_data: List[Any], from_address: str - ) -> DebtTransferEventData: + def parse_debt_transfer_event(self, event_data: List[Any]) -> DebtTransferEventData: """ Parses the debt transfer event data into a human-readable format using the DebtBurnEventData serializer. @@ -139,7 +141,6 @@ def parse_debt_transfer_event( Args: event_data (List[Any]): A list containing the raw event data. Expected order: [sender, recipient, value, _] - from_address (str): The address of the token contract Returns: DebtTransferEventData: A model with the parsed event data. @@ -147,8 +148,7 @@ def parse_debt_transfer_event( return DebtTransferEventData( sender=event_data[0], recipient=event_data[1], - amount=event_data[2], - token=from_address + amount=Decimal(int(event_data[2], 16)), ) @classmethod @@ -166,7 +166,7 @@ def parse_debt_mint_event(cls, event_data: list[Any]) -> DebtMintEventData: """ return DebtMintEventData( user=event_data[0], - token=event_data[1] + amount=Decimal(int(event_data[1], 16)), ) @classmethod @@ -184,5 +184,5 @@ def parse_debt_burn_event(cls, event_data: list[Any]) -> DebtBurnEventData: """ return DebtBurnEventData( user=event_data[0], - amount=event_data[1], + amount=Decimal(int(event_data[1], 16)), ) diff --git a/apps/data_handler/handlers/events/nostra/__init__.py b/apps/data_handler/handlers/events/nostra/__init__.py new file mode 100644 index 00000000..4eed3336 --- /dev/null +++ b/apps/data_handler/handlers/events/nostra/__init__.py @@ -0,0 +1 @@ +"""Module docstring placeholder.""" diff --git a/apps/data_handler/handlers/events/nostra/transform_events.py b/apps/data_handler/handlers/events/nostra/transform_events.py new file mode 100644 index 00000000..12bdbbe8 --- /dev/null +++ b/apps/data_handler/handlers/events/nostra/transform_events.py @@ -0,0 +1,220 @@ +""" +This module contains the NostraTransformer class, +which is used to transform Nostra events. +""" + +import logging +from decimal import Decimal +from typing import Any, Callable, Dict, Tuple, Type + +from data_handler.db.crud import NostraEventDBConnector +from data_handler.db.models.base import Base +from data_handler.db.models.nostra_events import ( + BearingCollateralBurnEventModel, + DebtBurnEventModel, + DebtMintEventModel, + DebtTransferEventModel, +) +from data_handler.handler_tools.api_connector import DeRiskAPIConnector +from data_handler.handler_tools.constants import ProtocolAddresses +from data_handler.handler_tools.data_parser.nostra import NostraDataParser +from data_handler.handler_tools.data_parser.serializers import ( + BearingCollateralBurnEventData, + DebtBurnEventData, + DebtMintEventData, + DebtTransferEventData, +) +from shared.constants import ProtocolIDs + +logger = logging.getLogger(__name__) + +EVENT_MAPPING: Dict[str, Tuple[Callable, str]] = { + "DebtMint": (NostraDataParser.parse_debt_mint_event, "save_debt_mint_event"), + "DebtBurn": (NostraDataParser.parse_debt_burn_event, "save_debt_burn_event"), + "DebtTransfer": ( + NostraDataParser.parse_debt_transfer_event, + "save_debt_transfer_event", + ), + "BearingCollateralBurn": ( + NostraDataParser.parse_interest_bearing_collateral_burn_event, + "save_bearing_collateral_burn_event", + ), +} + + +class NostraTransformer: + """ + A class that is used to transform Nostra events into database models. + """ + + PROTOCOL_ADDRESSES: str = ProtocolAddresses().NOSTRA_ALPHA_ADDRESSES + PROTOCOL_TYPE: ProtocolIDs = ProtocolIDs.NOSTRA_ALPHA + PAGINATION_SIZE: int = 1000 + + def __init__(self): + """ + Initialize the NostraTransformer instance. + Initializes API and database connectors, and retrieves the last processed block number. + """ + self.api_connector = DeRiskAPIConnector() + self.db_connector = NostraEventDBConnector() + self.last_block = self.db_connector.get_last_block(self.PROTOCOL_TYPE) + self.data_parser = NostraDataParser() + + self.EVENT_MAPPING: Dict[str, Tuple[Callable, str], Type[Base]] = { + "DebtMint": ( + self.data_parser.parse_debt_mint_event, + "save_debt_mint_event", + ), + "DebtBurn": ( + self.data_parser.parse_debt_burn_event, + "save_debt_burn_event", + ), + "DebtTransfer": ( + self.data_parser.parse_debt_transfer_event, + "save_debt_transfer_event", + ), + "BearingCollateralBurn": ( + self.data_parser.parse_interest_bearing_collateral_burn_event, + "save_bearing_collateral_burn_event", + ), + } + + def fetch_and_transform_events( + self, from_address: str, min_block: int, max_block: int + ) -> None: + """ + Fetch events from the DeRisk API and transform them into database models. + """ + # Fetch events using the API connector + response = self.api_connector.get_data( + from_address=from_address, + min_block_number=min_block, + max_block_number=max_block, + ) + + if "error" in response: + raise ValueError(f"Error fetching events: {response['error']}") + + # Process each event based on its type + for event in response: + event_type = event.get("key_name") + if event_type in self.EVENT_MAPPING: + parser_func, save_to_db_method_name = self.EVENT_MAPPING[event_type] + parsed_data = parser_func(event["data"]) + + getattr(self, save_to_db_method_name)( + event_name=event_type, + block_number=event.get("block_number"), + event_data=parsed_data, + ) + else: + logger.info(f"Event type {event_type} not supported, yet...") + + def save_debt_mint_event( + self, event_name: str, block_number: int, event_data: DebtMintEventData + ) -> None: + """ + Save a DebtMint event to the database. + """ + self.db_connector.create_debt_mint_event( + protocol_id=self.PROTOCOL_TYPE.value, + event_name=event_name, + block_number=block_number, + event_data={ + "user": event_data.user, + "amount": Decimal(event_data.amount), + }, + ) + logger.info( + f"Saved DebtMint event: User={event_data.user}, Amount={event_data.amount}" + ) + + def save_debt_burn_event( + self, event_name: str, block_number: int, event_data: DebtBurnEventData + ) -> None: + """ + Save a DebtBurn event to the database. + """ + self.db_connector.create_debt_burn_event( + protocol_id=self.PROTOCOL_TYPE.value, + event_name=event_name, + block_number=block_number, + event_data={ + "user": event_data.user, + "amount": Decimal(event_data.amount), + }, + ) + logger.info( + f"Saved DebtBurn event: User={event_data.user}, Amount={event_data.amount}" + ) + + def save_debt_transfer_event( + self, event_name: str, block_number: int, event_data: DebtTransferEventData + ) -> None: + """ + Save a DebtTransfer event to the database. + """ + self.db_connector.create_debt_transfer_event( + protocol_id=self.PROTOCOL_TYPE.value, + event_name=event_name, + block_number=block_number, + event_data={ + "sender": event_data.sender, + "recipient": event_data.recipient, + "amount": Decimal(event_data.amount), + }, + ) + logger.info( + f"Saved DebtTransfer event: Sender={event_data.sender}, Recipient={event_data.recipient}, Amount={event_data.amount}" + ) + + def save_bearing_collateral_burn_event( + self, + event_name: str, + block_number: int, + event_data: BearingCollateralBurnEventData, + ) -> None: + """ + Save a BearingCollateralBurn event to the database. + """ + self.db_connector.create_bearing_collateral_burn_event( + protocol_id=self.PROTOCOL_TYPE.value, + event_name=event_name, + block_number=block_number, + event_data={ + "user": event_data.user, + "amount": Decimal(event_data.amount), + }, + ) + logger.info( + f"Saved BearingCollateralBurn event: User={event_data.user}, Amount={event_data.amount}" + ) + + def run(self) -> None: + """ + Run the NostraTransformer class. + """ + max_retries = 5 + retry = 0 + while retry < max_retries: + try: + self.fetch_and_transform_events( + from_address=self.PROTOCOL_ADDRESSES, + min_block=self.last_block, + max_block=self.last_block + self.PAGINATION_SIZE, + ) + self.last_block += self.PAGINATION_SIZE + retry += 1 + except Exception as e: + logger.error(f"Error during fetching or saving events: {e}") + if retry == max_retries: + logger.info(f"Reached max retries for address {self.PROTOCOL_ADDRESSES}") + + +if __name__ == "__main__": + """ + This is the init function for when NostraTransformer class is called directly. + """ + transformer = NostraTransformer() + transformer.run() diff --git a/apps/data_handler/tests/conftest.py b/apps/data_handler/tests/conftest.py index e6e55f9e..6d528d76 100644 --- a/apps/data_handler/tests/conftest.py +++ b/apps/data_handler/tests/conftest.py @@ -2,16 +2,19 @@ This module contains the fixtures for the tests. """ -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest from data_handler.db.crud import ( DBConnector, InitializerDBConnector, - ZkLendEventDBConnector + NostraEventDBConnector, + ZkLendEventDBConnector, ) from data_handler.handler_tools.api_connector import DeRiskAPIConnector +from data_handler.handler_tools.data_parser.nostra import NostraDataParser from data_handler.handler_tools.data_parser.zklend import ZklendDataParser +from data_handler.handlers.events.nostra.transform_events import NostraTransformer @pytest.fixture(scope="module") @@ -64,3 +67,51 @@ def mock_zklend_data_parser(): """ mock_zklend_data_parser = MagicMock(spec=ZklendDataParser) yield mock_zklend_data_parser + + +@pytest.fixture(scope="function") +def mock_nostra_event_db_connector(): + """ + Mock for NostraEventDBConnector + :return: None + """ + mock_nostra_event_db_connector = MagicMock(spec=NostraEventDBConnector) + mock_nostra_event_db_connector.get_last_block.return_value = 0 + mock_nostra_event_db_connector.create_debt_mint_event = MagicMock() + mock_nostra_event_db_connector.create_debt_burn_event = MagicMock() + mock_nostra_event_db_connector.create_debt_transfer_event = MagicMock() + mock_nostra_event_db_connector.create_bearing_collateral_burn_event = MagicMock() + yield mock_nostra_event_db_connector + + +@pytest.fixture(scope="function") +def mock_nostra_data_parser(): + """ + Mock for NostraDataParser + :return: None + """ + mock_nostra_data_parser = MagicMock(spec=NostraDataParser) + yield mock_nostra_data_parser + + +@pytest.fixture(scope="function") +def transformer( + mock_api_connector, + mock_nostra_event_db_connector, + mock_nostra_data_parser, +): + """ + Creates an instance of NostraTransformer with mocked dependencies. + """ + with patch( + "data_handler.handlers.events.nostra.transform_events.NostraEventDBConnector", + return_value=mock_nostra_event_db_connector, + ), patch( + "data_handler.handlers.events.nostra.transform_events.DeRiskAPIConnector", + return_value=mock_api_connector, + ): + transformer = NostraTransformer() + transformer.api_connector = mock_api_connector + transformer.db_connector = mock_nostra_event_db_connector + transformer.data_parser = mock_nostra_data_parser + return transformer diff --git a/apps/data_handler/tests/test_nostra_transformer.py b/apps/data_handler/tests/test_nostra_transformer.py new file mode 100644 index 00000000..8daa49cd --- /dev/null +++ b/apps/data_handler/tests/test_nostra_transformer.py @@ -0,0 +1,254 @@ +""" +Test the nostra transformer +""" + +from decimal import Decimal +from typing import Any, Dict +from unittest.mock import MagicMock, patch + +import pytest +from data_handler.handler_tools.data_parser.serializers import ( + BearingCollateralBurnEventData, + DebtBurnEventData, + DebtMintEventData, + DebtTransferEventData, +) +from data_handler.handlers.events.nostra.transform_events import NostraTransformer +from shared.constants import ProtocolIDs + + +@pytest.fixture(scope="function") +def sample_debt_mint_event_data() -> Dict[str, Any]: + """ + Sample debt mint event data + """ + return { + "id": "0x00a00637ed8fd6f3f83a1eb743a36c894c6fe5af5a87e8ab35697afb3422967e_3", + "block_hash": "0x07d1b221c40b6a19c0381d61ebbe8d048018b49314ae1bdc93938059e29febdf", + "block_number": 630008, + "transaction_hash": "0x00a00637ed8fd6f3f83a1eb743a36c894c6fe5af5a87e8ab35697afb3422967e", + "event_index": 3, + "from_address": "0x04c0a5193d58f74fbace4b74dcf65481e734ed1714121bdc571da345540efa05", + "keys": ["0xfa3f9acdb7b24dcf6d40d77ff2f87a87bca64a830a2169aebc9173db23ff41"], + "data": [ + "0x1a0027d1bf86904d1051fe0ca94c39b659135f19504d663d66771a7424ca2eb", # user + "0x9184e72a000", # amount in hex + ], + "timestamp": 1712276824, + "key_name": "DebtMint", + } + + +@pytest.fixture(scope="function") +def sample_debt_burn_event_data() -> Dict[str, Any]: + """ + Sample debt burn event data + """ + return { + "id": "0x0216d505e065501a8f26be71516b6f624dedff0dee50c5ccbc33142f378d8028_5", + "block_hash": "0x056914ef72facffc6e7fbb651d7ee91fa3a0bcc49de3058129ece5c706a72bd8", + "block_number": 630004, + "transaction_hash": "0x0216d505e065501a8f26be71516b6f624dedff0dee50c5ccbc33142f378d8028", + "event_index": 5, + "from_address": "0x04c0a5193d58f74fbace4b74dcf65481e734ed1714121bdc571da345540efa05", + "keys": ["0x7ae0ab7952bbfc33a72035e5eccec7c8816723421c0acb315bd4690a71d46e"], + "data": [ + "0x7f121e44b3f446cdcaa28b230546956208d51e96894acc3b482947356bc10ed", # user + "0x9184e72a000", # amount in hex + ], + "timestamp": 1712275350, + "key_name": "DebtBurn", + } + + +@pytest.fixture(scope="function") +def sample_debt_transfer_event_data() -> Dict[str, Any]: + """ + Sample debt transfer event data + """ + return { + "id": "0x04123456789abcdef0abcdef1234567890abcdef1234567890abcdef12345678_2", + "block_hash": "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef", + "block_number": 630010, + "transaction_hash": "0x04123456789abcdef0abcdef1234567890abcdef1234567890abcdef12345678", + "event_index": 2, + "from_address": "0x04c0a5193d58f74fbace4b74dcf65481e734ed1714121bdc571da345540efa05", + "keys": ["0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd"], + "data": [ + "0xabcdef1234567890abcdef1234567890abcdef12", # sender + "0x1234567890abcdef1234567890abcdef12345678", # recipient + "0x1bc16d674ec80000", # amount in hex (2e18) + ], + "timestamp": 1712278000, + "key_name": "DebtTransfer", + } + + +@pytest.fixture(scope="function") +def sample_bearing_collateral_burn_event_data() -> Dict[str, Any]: + """ + Sample bearing collateral burn event data + """ + return { + "id": "0x07abcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd_0", + "block_hash": "0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd", + "block_number": 630013, + "transaction_hash": "0x07abcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd", + "event_index": 0, + "from_address": "0x04c0a5193d58f74fbace4b74dcf65481e734ed1714121bdc571da345540efa05", + "keys": ["0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd"], + "data": [ + "0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd", # user + "0x2386f26fc10000", # amount in hex (0.1e18) + ], + "timestamp": 1712278300, + "key_name": "BearingCollateralBurn", + } + + +def test_save_debt_mint_event(transformer, sample_debt_mint_event_data): + """ + Test saving a debt mint event. + """ + # Setup API response + transformer.api_connector.get_data.return_value = [sample_debt_mint_event_data] + + expected_parsed_data = DebtMintEventData( + user=sample_debt_mint_event_data["data"][0], + amount=Decimal(int(sample_debt_mint_event_data["data"][1], 16)), + ) + + # Call the method + transformer.fetch_and_transform_events( + from_address=transformer.PROTOCOL_ADDRESSES, min_block=0, max_block=1000 + ) + + # Verify DB connector was called with correct data + transformer.db_connector.create_debt_mint_event.assert_called_once_with( + protocol_id=ProtocolIDs.NOSTRA_ALPHA.value, + event_name=sample_debt_mint_event_data["key_name"], + block_number=sample_debt_mint_event_data["block_number"], + event_data={ + "user": expected_parsed_data.user, + "amount": expected_parsed_data.amount, + }, + ) + + +def test_save_debt_burn_event(transformer, sample_debt_burn_event_data): + """ + Test saving a debt burn event. + """ + transformer.api_connector.get_data.return_value = [sample_debt_burn_event_data] + + expected_parsed_data = DebtBurnEventData( + user=sample_debt_burn_event_data["data"][0], + amount=Decimal(int(sample_debt_burn_event_data["data"][1], 16)), + ) + + transformer.fetch_and_transform_events( + from_address=transformer.PROTOCOL_ADDRESSES, min_block=0, max_block=1000 + ) + + transformer.db_connector.create_debt_burn_event.assert_called_once_with( + protocol_id=ProtocolIDs.NOSTRA_ALPHA.value, + event_name=sample_debt_burn_event_data["key_name"], + block_number=sample_debt_burn_event_data["block_number"], + event_data={ + "user": expected_parsed_data.user, + "amount": expected_parsed_data.amount, + }, + ) + + +def test_save_debt_transfer_event(transformer, sample_debt_transfer_event_data): + """ + Test saving a debt transfer event. + """ + transformer.api_connector.get_data.return_value = [sample_debt_transfer_event_data] + + expected_parsed_data = DebtTransferEventData( + sender=sample_debt_transfer_event_data["data"][0], + recipient=sample_debt_transfer_event_data["data"][1], + amount=Decimal(int(sample_debt_transfer_event_data["data"][2], 16)), + ) + + transformer.fetch_and_transform_events( + from_address=transformer.PROTOCOL_ADDRESSES, min_block=0, max_block=1000 + ) + + transformer.db_connector.create_debt_transfer_event.assert_called_once_with( + protocol_id=ProtocolIDs.NOSTRA_ALPHA.value, + event_name=sample_debt_transfer_event_data["key_name"], + block_number=sample_debt_transfer_event_data["block_number"], + event_data={ + "sender": expected_parsed_data.sender, + "recipient": expected_parsed_data.recipient, + "amount": expected_parsed_data.amount, + }, + ) + + +def test_save_bearing_collateral_burn_event( + transformer, sample_bearing_collateral_burn_event_data +): + """ + Test saving a bearing collateral burn event. + """ + transformer.api_connector.get_data.return_value = [ + sample_bearing_collateral_burn_event_data + ] + + expected_parsed_data = BearingCollateralBurnEventData( + user=sample_bearing_collateral_burn_event_data["data"][0], + amount=Decimal(int(sample_bearing_collateral_burn_event_data["data"][1], 16)), + ) + + transformer.fetch_and_transform_events( + from_address=transformer.PROTOCOL_ADDRESSES, min_block=0, max_block=1000 + ) + + transformer.db_connector.create_bearing_collateral_burn_event.assert_called_once_with( + protocol_id=ProtocolIDs.NOSTRA_ALPHA.value, + event_name=sample_bearing_collateral_burn_event_data["key_name"], + block_number=sample_bearing_collateral_burn_event_data["block_number"], + event_data={ + "user": expected_parsed_data.user, + "amount": expected_parsed_data.amount, + }, + ) + + +def test_unsupported_event_type(transformer): + """ + Test handling of unsupported event types. + """ + unsupported_event = { + "key_name": "UnsupportedEvent", + "data": [], + "block_number": 1000, + } + transformer.api_connector.get_data.return_value = [unsupported_event] + + # Should not raise an exception + transformer.fetch_and_transform_events( + from_address=transformer.PROTOCOL_ADDRESSES, min_block=0, max_block=1000 + ) + + # Verify no DB calls were made for Nostra events + transformer.db_connector.create_debt_mint_event.assert_not_called() + transformer.db_connector.create_debt_burn_event.assert_not_called() + transformer.db_connector.create_debt_transfer_event.assert_not_called() + transformer.db_connector.create_bearing_collateral_burn_event.assert_not_called() + + +def test_api_error_handling(transformer): + """ + Test handling of API errors. + """ + transformer.api_connector.get_data.return_value = {"error": "API Error"} + + with pytest.raises(ValueError, match="Error fetching events: API Error"): + transformer.fetch_and_transform_events( + from_address=transformer.PROTOCOL_ADDRESSES, min_block=0, max_block=1000 + )