Skip to content

Commit

Permalink
Merge pull request #324 from MrRoudyk/feature/script-for-saving-nostr…
Browse files Browse the repository at this point in the history
…a-events-to-db

feat: Script for saving Nostra events to db
  • Loading branch information
djeck1432 authored Nov 27, 2024
2 parents ded72e5 + cdc745d commit 39a820c
Show file tree
Hide file tree
Showing 8 changed files with 868 additions and 98 deletions.
19 changes: 11 additions & 8 deletions apps/data_handler/celery_app/celery_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
# run_loan_states_computation_for_hashtack_v0,;
# run_loan_states_computation_for_hashtack_v1,; run_loan_states_computation_for_nostra_alpha,;
# run_loan_states_computation_for_nostra_mainnet,; run_loan_states_computation_for_zklend,;
# run_liquidable_debt_computation_for_nostra_alpha,;
# run_liquidable_debt_computation_for_nostra_alpha,;
# run_liquidable_debt_computation_for_nostra_mainnet,;
# run_liquidable_debt_computation_for_hashstack_v0,;
# run_liquidable_debt_computation_for_hashstack_v0,;
# run_liquidable_debt_computation_for_hashstack_v1,; uniswap_v2_order_book,

# from data_handler.celery_app.tasks import (
Expand Down Expand Up @@ -68,20 +68,23 @@
"task": "ekubo_order_book",
"schedule": ORDER_BOOK_TIME_INTERVAL,
},
f"process_zklend_events_{CRONTAB_TIME}_mins": {
"task": "process_zklend_events",
"schedule": crontab(minute=f"*/{CRONTAB_TIME}"),
f"process_zklend_events_{CRONTAB_TIME}_mins": {
"task": "process_zklend_events",
"schedule": crontab(minute=f"*/{CRONTAB_TIME}"),
},
f"process_nostra_events_{CRONTAB_TIME}_mins": {
"task": "process_nostra_events",
"schedule": crontab(minute=f"*/{CRONTAB_TIME}"),
},
}

from data_handler.celery_app.order_books_tasks import ekubo_order_book
from data_handler.celery_app.tasks import (
run_liquidable_debt_computation_for_zklend, )
from data_handler.celery_app.tasks import run_liquidable_debt_computation_for_zklend

# run_loan_states_computation_for_hashtack_v0,; run_loan_states_computation_for_hashtack_v1,;
# run_loan_states_computation_for_nostra_alpha,; run_loan_states_computation_for_nostra_mainnet,;
# run_loan_states_computation_for_zklend,; run_liquidable_debt_computation_for_nostra_alpha,;
# run_liquidable_debt_computation_for_nostra_mainnet,;
# run_liquidable_debt_computation_for_nostra_mainnet,;
# run_liquidable_debt_computation_for_hashstack_v0,;
# run_liquidable_debt_computation_for_hashstack_v1,; uniswap_v2_order_book,

Expand Down
58 changes: 53 additions & 5 deletions apps/data_handler/celery_app/event_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
from datetime import datetime

from data_handler.celery_app.celery_conf import app
from data_handler.handlers.events.nostra.transform_events import NostraTransformer
from data_handler.handlers.events.zklend.transform_events import ZklendTransformer


logger = logging.getLogger(__name__)


@app.task(name="process_zklend_events")
def process_zklend_events():
"""
Expand All @@ -30,21 +31,68 @@ def process_zklend_events():
"Successfully processed ZkLend events in %.2fs. Blocks: %d to %d",
execution_time,
transformer.last_block - transformer.PAGINATION_SIZE,
transformer.last_block
transformer.last_block,
)
except (ValueError, TypeError, RuntimeError) as exc: # Catching more specific exceptions
except (
ValueError,
TypeError,
RuntimeError,
) as exc: # Catching more specific exceptions
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.error(
"Error processing ZkLend events after %.2fs: %s",
execution_time,
exc,
exc_info=True
exc_info=True,
)
except Exception as exc: # Still keeping a general exception catch as a fallback
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.error(
"Unexpected error processing ZkLend events after %.2fs: %s",
execution_time,
exc,
exc_info=True
exc_info=True,
)


@app.task(name="process_nostra_events")
def process_nostra_events():
"""
Process and store Nostra protocol events.
Fetches events from the blockchain, transforms them into the required format,
and saves them to the database.
"""
start_time = datetime.utcnow()
logger.info("Starting Nostra event processing")
try:
# Initialize and run transformer
transformer = NostraTransformer()
transformer.run()
# Log success metrics
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.info(
"Successfully processed Nostra events in %.2fs. Blocks: %d to %d",
execution_time,
transformer.last_block - transformer.PAGINATION_SIZE,
transformer.last_block,
)
except (
ValueError,
TypeError,
RuntimeError,
) as exc: # Catching more specific exceptions
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.error(
"Error processing Nostra events after %.2fs: %s",
execution_time,
exc,
exc_info=True,
)
except Exception as exc: # Still keeping a general exception catch as a fallback
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.error(
"Unexpected error processing Nostra events after %.2fs: %s",
execution_time,
exc,
exc_info=True,
)
Loading

0 comments on commit 39a820c

Please sign in to comment.