Skip to content

Commit

Permalink
Merge pull request CarmineOptions#330 from CarmineOptions/fix/pipeline
Browse files Browse the repository at this point in the history
fix pipeline
  • Loading branch information
djeck1432 authored and MrRoudyk committed Nov 26, 2024
2 parents 3d6f8c7 + e699ae9 commit 92138bd
Show file tree
Hide file tree
Showing 10 changed files with 870 additions and 112 deletions.
8 changes: 1 addition & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
name: CI Workflow

on:
push:
branches:
- master
pull_request:
branches:
- master
on: [push, pull_request]


jobs:
Expand Down
8 changes: 1 addition & 7 deletions .github/workflows/pylint.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
name: Pylint Check

on:
push:
branches:
- master
pull_request:
branches:
- master
on: [push, pull_request]

jobs:
lint:
Expand Down
19 changes: 11 additions & 8 deletions apps/data_handler/celery_app/celery_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
# run_loan_states_computation_for_hashtack_v0,;
# run_loan_states_computation_for_hashtack_v1,; run_loan_states_computation_for_nostra_alpha,;
# run_loan_states_computation_for_nostra_mainnet,; run_loan_states_computation_for_zklend,;
# run_liquidable_debt_computation_for_nostra_alpha,;
# run_liquidable_debt_computation_for_nostra_alpha,;
# run_liquidable_debt_computation_for_nostra_mainnet,;
# run_liquidable_debt_computation_for_hashstack_v0,;
# run_liquidable_debt_computation_for_hashstack_v0,;
# run_liquidable_debt_computation_for_hashstack_v1,; uniswap_v2_order_book,

# from data_handler.celery_app.tasks import (
Expand Down Expand Up @@ -68,20 +68,23 @@
"task": "ekubo_order_book",
"schedule": ORDER_BOOK_TIME_INTERVAL,
},
f"process_zklend_events_{CRONTAB_TIME}_mins": {
"task": "process_zklend_events",
"schedule": crontab(minute=f"*/{CRONTAB_TIME}"),
f"process_zklend_events_{CRONTAB_TIME}_mins": {
"task": "process_zklend_events",
"schedule": crontab(minute=f"*/{CRONTAB_TIME}"),
},
f"process_nostra_events_{CRONTAB_TIME}_mins": {
"task": "process_nostra_events",
"schedule": crontab(minute=f"*/{CRONTAB_TIME}"),
},
}

from data_handler.celery_app.order_books_tasks import ekubo_order_book
from data_handler.celery_app.tasks import (
run_liquidable_debt_computation_for_zklend, )
from data_handler.celery_app.tasks import run_liquidable_debt_computation_for_zklend

# run_loan_states_computation_for_hashtack_v0,; run_loan_states_computation_for_hashtack_v1,;
# run_loan_states_computation_for_nostra_alpha,; run_loan_states_computation_for_nostra_mainnet,;
# run_loan_states_computation_for_zklend,; run_liquidable_debt_computation_for_nostra_alpha,;
# run_liquidable_debt_computation_for_nostra_mainnet,;
# run_liquidable_debt_computation_for_nostra_mainnet,;
# run_liquidable_debt_computation_for_hashstack_v0,;
# run_liquidable_debt_computation_for_hashstack_v1,; uniswap_v2_order_book,

Expand Down
58 changes: 53 additions & 5 deletions apps/data_handler/celery_app/event_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
from datetime import datetime

from data_handler.celery_app.celery_conf import app
from data_handler.handlers.events.nostra.transform_events import NostraTransformer
from data_handler.handlers.events.zklend.transform_events import ZklendTransformer


logger = logging.getLogger(__name__)


@app.task(name="process_zklend_events")
def process_zklend_events():
"""
Expand All @@ -30,21 +31,68 @@ def process_zklend_events():
"Successfully processed ZkLend events in %.2fs. Blocks: %d to %d",
execution_time,
transformer.last_block - transformer.PAGINATION_SIZE,
transformer.last_block
transformer.last_block,
)
except (ValueError, TypeError, RuntimeError) as exc: # Catching more specific exceptions
except (
ValueError,
TypeError,
RuntimeError,
) as exc: # Catching more specific exceptions
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.error(
"Error processing ZkLend events after %.2fs: %s",
execution_time,
exc,
exc_info=True
exc_info=True,
)
except Exception as exc: # Still keeping a general exception catch as a fallback
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.error(
"Unexpected error processing ZkLend events after %.2fs: %s",
execution_time,
exc,
exc_info=True
exc_info=True,
)


@app.task(name="process_nostra_events")
def process_nostra_events():
"""
Process and store Nostra protocol events.
Fetches events from the blockchain, transforms them into the required format,
and saves them to the database.
"""
start_time = datetime.utcnow()
logger.info("Starting Nostra event processing")
try:
# Initialize and run transformer
transformer = NostraTransformer()
transformer.run()
# Log success metrics
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.info(
"Successfully processed Nostra events in %.2fs. Blocks: %d to %d",
execution_time,
transformer.last_block - transformer.PAGINATION_SIZE,
transformer.last_block,
)
except (
ValueError,
TypeError,
RuntimeError,
) as exc: # Catching more specific exceptions
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.error(
"Error processing Nostra events after %.2fs: %s",
execution_time,
exc,
exc_info=True,
)
except Exception as exc: # Still keeping a general exception catch as a fallback
execution_time = (datetime.utcnow() - start_time).total_seconds()
logger.error(
"Unexpected error processing Nostra events after %.2fs: %s",
execution_time,
exc,
exc_info=True,
)
Loading

0 comments on commit 92138bd

Please sign in to comment.