Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Script for saving Nostra events to db #324

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions apps/data_handler/celery_app/celery_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
# run_loan_states_computation_for_hashtack_v0,;
# run_loan_states_computation_for_hashtack_v1,; run_loan_states_computation_for_nostra_alpha,;
# run_loan_states_computation_for_nostra_mainnet,; run_loan_states_computation_for_zklend,;
# run_liquidable_debt_computation_for_nostra_alpha,;
# run_liquidable_debt_computation_for_nostra_alpha,;
# run_liquidable_debt_computation_for_nostra_mainnet,;
# run_liquidable_debt_computation_for_hashstack_v0,;
# run_liquidable_debt_computation_for_hashstack_v0,;
# run_liquidable_debt_computation_for_hashstack_v1,; uniswap_v2_order_book,

# from data_handler.celery_app.tasks import (
Expand Down Expand Up @@ -68,20 +68,23 @@
"task": "ekubo_order_book",
"schedule": ORDER_BOOK_TIME_INTERVAL,
},
f"process_zklend_events_{CRONTAB_TIME}_mins": {
"task": "process_zklend_events",
"schedule": crontab(minute=f"*/{CRONTAB_TIME}"),
f"process_zklend_events_{CRONTAB_TIME}_mins": {
"task": "process_zklend_events",
"schedule": crontab(minute=f"*/{CRONTAB_TIME}"),
},
f"process_nostra_events_{CRONTAB_TIME}_mins": {
"task": "process_nostra_events",
"schedule": crontab(minute=f"*/{CRONTAB_TIME}"),
},
}

from data_handler.celery_app.order_books_tasks import ekubo_order_book
from data_handler.celery_app.tasks import (
run_liquidable_debt_computation_for_zklend, )
from data_handler.celery_app.tasks import run_liquidable_debt_computation_for_zklend

# run_loan_states_computation_for_hashtack_v0,; run_loan_states_computation_for_hashtack_v1,;
# run_loan_states_computation_for_nostra_alpha,; run_loan_states_computation_for_nostra_mainnet,;
# run_loan_states_computation_for_zklend,; run_liquidable_debt_computation_for_nostra_alpha,;
# run_liquidable_debt_computation_for_nostra_mainnet,;
# run_liquidable_debt_computation_for_nostra_mainnet,;
# run_liquidable_debt_computation_for_hashstack_v0,;
# run_liquidable_debt_computation_for_hashstack_v1,; uniswap_v2_order_book,

Expand Down
58 changes: 53 additions & 5 deletions apps/data_handler/celery_app/event_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
from datetime import datetime

from data_handler.celery_app.celery_conf import app
from data_handler.handlers.events.nostra.transform_events import NostraTransformer
from data_handler.handlers.events.zklend.transform_events import ZklendTransformer


logger = logging.getLogger(__name__)


@app.task(name="process_zklend_events")
def process_zklend_events():
"""
Expand All @@ -30,21 +31,68 @@ def process_zklend_events():
"Successfully processed ZkLend events in %.2fs. Blocks: %d to %d",
execution_time,
transformer.last_block - transformer.PAGINATION_SIZE,
transformer.last_block
transformer.last_block,
)
except (ValueError, TypeError, RuntimeError) as exc: # Catching more specific exceptions
except (
ValueError,
TypeError,
RuntimeError,
) as exc: # Catching more specific exceptions
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.error(
"Error processing ZkLend events after %.2fs: %s",
execution_time,
exc,
exc_info=True
exc_info=True,
)
except Exception as exc: # Still keeping a general exception catch as a fallback
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.error(
"Unexpected error processing ZkLend events after %.2fs: %s",
execution_time,
exc,
exc_info=True
exc_info=True,
)


@app.task(name="process_nostra_events")
def process_nostra_events():
"""
Process and store Nostra protocol events.
Fetches events from the blockchain, transforms them into the required format,
and saves them to the database.
"""
start_time = datetime.utcnow()
logger.info("Starting Nostra event processing")
try:
# Initialize and run transformer
transformer = NostraTransformer()
transformer.run()
# Log success metrics
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.info(
"Successfully processed Nostra events in %.2fs. Blocks: %d to %d",
execution_time,
transformer.last_block - transformer.PAGINATION_SIZE,
transformer.last_block,
)
except (
ValueError,
TypeError,
RuntimeError,
) as exc: # Catching more specific exceptions
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.error(
"Error processing Nostra events after %.2fs: %s",
execution_time,
exc,
exc_info=True,
)
except Exception as exc: # Still keeping a general exception catch as a fallback
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.error(
"Unexpected error processing Nostra events after %.2fs: %s",
execution_time,
exc,
exc_info=True,
)
Loading