diff --git a/README.md b/README.md index fba63b7..2673bac 100755 --- a/README.md +++ b/README.md @@ -192,7 +192,7 @@ NOTE: It is recommended to run `build.sh` in a screen or tmux session so that th git clone https://github.com/PowerLoom/snapshotter-lite-v2.git powerloom-pre-mainnet ``` This will clone the repository into a directory named `powerloom`. - + 3. Change your working directory to the `powerloom-pre-mainnet` directory: ```bash cd powerloom-pre-mainnet @@ -233,7 +233,7 @@ If you want to run the Snapshotter Lite Node without Docker, you need to make su git clone https://github.com/PowerLoom/snapshotter-lite-v2.git powerloom-pre-mainnet ``` This will clone the repository into a directory named `powerloom`. - + 2. Change your working directory to the `powerloom-pre-mainnet` directory: ```bash cd powerloom-pre-mainnet @@ -248,9 +248,17 @@ If you want to run the Snapshotter Lite Node without Docker, you need to make su 5. Your node should start in background and you should start seeing logs in your terminal. 6. To stop the node, you can run `pkill -f snapshotter` in a new terminal window. - + ## Monitoring and Debugging +### Monitoring + +To enable Telegram reporting for snapshotter issues: +1. Open the conversation with [@PowerloomReportingBot](https://t.me/PowerloomReportingBot) in the Telegram App and start a conversation. +2. Start the bot by typing the `/start` command in the chat. You will receive a response containing your `Chat ID` for the bot. +3. Enter the `Chat ID` when prompted on node startup. +4. You will now receive an error report whenever your node fails to process an epoch or snapshot. + ### Debugging Usually the easiest way to fix node related issues is to restart the node. If you're facing issues with the node, you can try going through the logs present in the `logs` directory. If you're unable to find the issue, you can reach out to us on [Discord](https://powerloom.io/discord) and we will be happy to help you out. diff --git a/build-dev.sh b/build-dev.sh index f627103..068c08d 100755 --- a/build-dev.sh +++ b/build-dev.sh @@ -34,6 +34,13 @@ if [ ! -f .env ]; then sed -i'.backup' "s##$SLOT_ID#" .env fi + # ask user for TELEGRAM_CHAT_ID and replace it in .env + if [ -z "$TELEGRAM_CHAT_ID" ]; then + echo "Enter Your TELEGRAM_CHAT_ID (Optional, leave blank to skip.): "; + read TELEGRAM_CHAT_ID; + sed -i'.backup' "s##$TELEGRAM_CHAT_ID#" .env + fi + fi source .env diff --git a/build.sh b/build.sh index d53457c..7eda8fb 100755 --- a/build.sh +++ b/build.sh @@ -34,6 +34,13 @@ if [ ! -f .env ]; then sed -i'.backup' "s##$SLOT_ID#" .env fi + # ask user for TELEGRAM_CHAT_ID and replace it in .env + if [ -z "$TELEGRAM_CHAT_ID" ]; then + echo "Enter Your TELEGRAM_CHAT_ID (Optional, leave blank to skip.): "; + read TELEGRAM_CHAT_ID; + sed -i'.backup' "s##$TELEGRAM_CHAT_ID#" .env + fi + fi source .env diff --git a/docker-compose-dev.yaml b/docker-compose-dev.yaml index 0233d8b..8a117f4 100755 --- a/docker-compose-dev.yaml +++ b/docker-compose-dev.yaml @@ -53,6 +53,8 @@ services: - POWERLOOM_REPORTING_URL=$POWERLOOM_REPORTING_URL - WEB3_STORAGE_TOKEN=$WEB3_STORAGE_TOKEN - NAMESPACE=$NAMESPACE + - TELEGRAM_REPORTING_URL=$TELEGRAM_REPORTING_URL + - TELEGRAM_CHAT_ID=$TELEGRAM_CHAT_ID healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8002/health"] interval: 10s diff --git a/docker-compose.yaml b/docker-compose.yaml index ed292fd..61c3438 100755 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -54,6 +54,8 @@ services: - POWERLOOM_REPORTING_URL=$POWERLOOM_REPORTING_URL - WEB3_STORAGE_TOKEN=$WEB3_STORAGE_TOKEN - NAMESPACE=$NAMESPACE + - TELEGRAM_REPORTING_URL=$TELEGRAM_REPORTING_URL + - TELEGRAM_CHAT_ID=$TELEGRAM_CHAT_ID healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8002/health"] interval: 10s diff --git a/env.example b/env.example index 01b0e9b..c365f70 100755 --- a/env.example +++ b/env.example @@ -19,3 +19,5 @@ IPFS_API_SECRET= SLACK_REPORTING_URL= WEB3_STORAGE_TOKEN= DASHBOARD_ENABLED= +TELEGRAM_REPORTING_URL=https://tg-testing.powerloom.io/ +TELEGRAM_CHAT_ID= diff --git a/snapshotter/processor_distributor.py b/snapshotter/processor_distributor.py index 70caadb..667408d 100644 --- a/snapshotter/processor_distributor.py +++ b/snapshotter/processor_distributor.py @@ -11,8 +11,8 @@ from web3 import Web3 import time +from snapshotter.utils.models.data_models import SnapshotterReportData from snapshotter.utils.models.data_models import SnapshotterReportState -from snapshotter.utils.models.data_models import SnapshotterIssue from snapshotter.settings.config import projects_config from snapshotter.settings.config import settings from snapshotter.utils.data_utils import get_snapshot_submision_window @@ -31,6 +31,9 @@ from snapshotter.utils.snapshot_worker import SnapshotAsyncWorker from snapshotter.utils.snapshot_utils import get_eth_price_usd from snapshotter.utils.callback_helpers import send_failure_notifications_async +from snapshotter.utils.models.data_models import SnapshotterIssue +from snapshotter.utils.models.data_models import SnapshotterStatus + class ProcessorDistributor: _anchor_rpc_helper: RpcHelper @@ -132,7 +135,9 @@ async def init(self): self._epoch_size = epoch_size try: - snapshotter_address = self._protocol_state_contract.functions.slotSnapshotterMapping(settings.slot_id).call() + snapshotter_address = self._protocol_state_contract.functions.slotSnapshotterMapping( + settings.slot_id, + ).call() if snapshotter_address != to_checksum_address(settings.instance_id): self._logger.error('Signer Account is not the one configured in slot, exiting!') exit(0) @@ -229,13 +234,18 @@ async def _epoch_release_processor(self, message: EpochReleasedEvent): 'Exception in getting eth price: {}', e, ) - notification_message = SnapshotterIssue( - instanceID=settings.instance_id, - issueType=SnapshotterReportState.MISSED_SNAPSHOT.value, - projectID='ETH_PRICE_LOAD', - epochId=str(message.epochId), - timeOfReporting=str(time.time()), - extra=json.dumps({'issueDetails': f'Error : {e}'}), + self.snapshot_worker.status.totalMissedSubmissions += 1 + self.snapshot_worker.status.consecutiveMissedSubmissions += 1 + notification_message = SnapshotterReportData( + snapshotterIssue=SnapshotterIssue( + instanceID=settings.instance_id, + issueType=SnapshotterReportState.MISSED_SNAPSHOT.value, + projectID='ETH_PRICE_LOAD', + epochId=str(message.epochId), + timeOfReporting=str(time.time()), + extra=json.dumps({'issueDetails': f'Error : {e}'}), + ), + snapshotterStatus=self.snapshot_worker.status, ) await send_failure_notifications_async( client=self._client, message=notification_message, @@ -246,7 +256,7 @@ async def _epoch_release_processor(self, message: EpochReleasedEvent): # release for snapshotting asyncio.ensure_future( self._distribute_callbacks_snapshotting( - project_type, epoch, eth_price_dict + project_type, epoch, eth_price_dict, ), ) diff --git a/snapshotter/system_event_detector.py b/snapshotter/system_event_detector.py index bdfbfbd..d50f362 100644 --- a/snapshotter/system_event_detector.py +++ b/snapshotter/system_event_detector.py @@ -1,3 +1,4 @@ +import json import asyncio import multiprocessing import resource @@ -12,12 +13,15 @@ import sys from snapshotter.processor_distributor import ProcessorDistributor from snapshotter.settings.config import settings +from snapshotter.utils.callback_helpers import send_epoch_processing_failure_notification_sync from snapshotter.utils.default_logger import logger from snapshotter.utils.exceptions import GenericExitOnSignal from snapshotter.utils.file_utils import read_json_file from snapshotter.utils.models.data_models import DailyTaskCompletedEvent from snapshotter.utils.models.data_models import DayStartedEvent from snapshotter.utils.models.data_models import EpochReleasedEvent +from snapshotter.utils.models.data_models import EpochProcessingIssue +from snapshotter.utils.models.data_models import SnapshotterReportState from snapshotter.utils.rpc import get_event_sig_and_abi from snapshotter.utils.rpc import RpcHelper from urllib.parse import urljoin @@ -76,6 +80,7 @@ def __init__(self, name, **kwargs): abi=self.contract_abi, ) self._last_reporting_service_ping = 0 + self._last_reporting_message_sent = 0 # event EpochReleased(uint256 indexed epochId, uint256 begin, uint256 end, uint256 timestamp); # event DayStartedEvent(uint256 dayId, uint256 timestamp); @@ -132,6 +137,16 @@ async def _init_check_and_report(self): '❌ Dummy Event processing failed! Error: {}', e, ) self._logger.info("Please check your config and if issue persists please reach out to the team!") + notification_message = EpochProcessingIssue( + instanceID=settings.instance_id, + issueType=SnapshotterReportState.UNHEALTHY_EPOCH_PROCESSING.value, + timeOfReporting=str(time.time()), + extra=json.dumps({'issueDetails': f'Error : {e}'}) + ) + send_epoch_processing_failure_notification_sync( + client=self._httpx_client, + message=notification_message + ) sys.exit(1) async def get_events(self, from_block: int, to_block: int): @@ -251,6 +266,19 @@ async def _detect_events(self): settings.rpc.polling_interval, ) + if int(time.time()) - self._last_reporting_message_sent >= 600: + self._last_reporting_message_sent = int(time.time()) + notification_message = EpochProcessingIssue( + instanceID=settings.instance_id, + issueType=SnapshotterReportState.UNHEALTHY_EPOCH_PROCESSING.value, + timeOfReporting=str(time.time()), + extra=json.dumps({'issueDetails': f'Error : {e}'}) + ) + send_epoch_processing_failure_notification_sync( + client=self._httpx_client, + message=notification_message + ) + await asyncio.sleep(settings.rpc.polling_interval) continue @@ -286,6 +314,20 @@ async def _detect_events(self): e, settings.rpc.polling_interval, ) + + if int(time.time()) - self._last_reporting_message_sent >= 600: + self._last_reporting_message_sent = int(time.time()) + notification_message = EpochProcessingIssue( + instanceID=settings.instance_id, + issueType=SnapshotterReportState.UNHEALTHY_EPOCH_PROCESSING.value, + timeOfReporting=str(time.time()), + extra=json.dumps({'issueDetails': f'Error : {e}'}) + ) + send_epoch_processing_failure_notification_sync( + client=self._httpx_client, + message=notification_message + ) + await asyncio.sleep(settings.rpc.polling_interval) continue diff --git a/snapshotter/tests/test_tg_reporting_service.py b/snapshotter/tests/test_tg_reporting_service.py new file mode 100644 index 0000000..36d1dc4 --- /dev/null +++ b/snapshotter/tests/test_tg_reporting_service.py @@ -0,0 +1,63 @@ +import time +import json +import asyncio +from httpx import AsyncClient +from httpx import AsyncHTTPTransport +from httpx import Limits +from httpx import Timeout +from snapshotter.utils.callback_helpers import send_failure_notifications_async +from snapshotter.utils.models.data_models import SnapshotterReportData +from snapshotter.utils.models.data_models import SnapshotterIssue +from snapshotter.utils.models.data_models import SnapshotterReportState +from snapshotter.utils.models.data_models import SnapshotterStatus +from snapshotter.utils.default_logger import logger +from snapshotter.settings.config import settings + + + +# ensure telegram__url / telegram_chat_id are set in config/settings.json +# telegram_url endpoint needs to be active +async def test_tg_reporting_call(): + + project_id = 'test_project_id' + epoch_id = 0 + + async_client = AsyncClient( + timeout=Timeout(timeout=5.0), + follow_redirects=False, + transport=AsyncHTTPTransport( + limits=Limits( + max_connections=200, + max_keepalive_connections=50, + keepalive_expiry=None, + ), + ), + ) + + notification_message = SnapshotterReportData( + snapshotterIssue=SnapshotterIssue( + instanceID=settings.instance_id, + issueType=SnapshotterReportState.MISSED_SNAPSHOT.value, + projectID=project_id, + epochId=epoch_id, + timeOfReporting=str(time.time()), + extra=json.dumps({'issueDetails': f'Error : TEST ERROR MESSAGE'}), + ), + snapshotterStatus=SnapshotterStatus( + projects=[], + ), + ) + + await send_failure_notifications_async( + client=async_client, message=notification_message, + ) + + # wait for the callback to complete + await asyncio.sleep(5) + + +if __name__ == '__main__': + try: + asyncio.get_event_loop().run_until_complete(test_tg_reporting_call()) + except Exception as e: + logger.opt(exception=True).error('exception: {}', e) \ No newline at end of file diff --git a/snapshotter/utils/callback_helpers.py b/snapshotter/utils/callback_helpers.py index 1050421..c08ce05 100644 --- a/snapshotter/utils/callback_helpers.py +++ b/snapshotter/utils/callback_helpers.py @@ -12,6 +12,10 @@ from snapshotter.settings.config import settings from snapshotter.utils.default_logger import logger +from snapshotter.utils.models.data_models import TelegramEpochProcessingReportMessage +from snapshotter.utils.models.data_models import TelegramSnapshotterReportMessage +from snapshotter.utils.models.data_models import SnapshotterReportData +from snapshotter.utils.models.data_models import EpochProcessingIssue from snapshotter.utils.models.message_models import EpochBase from snapshotter.utils.models.message_models import SnapshotProcessMessage from snapshotter.utils.rpc import RpcHelper @@ -66,22 +70,23 @@ def sync_notification_callback_result_handler(f: functools.partial): logger.debug('Callback or notification result:{}', result) -async def send_failure_notifications_async(client: AsyncClient, message: BaseModel): +async def send_failure_notifications_async(client: AsyncClient, message: SnapshotterReportData): """ Sends failure notifications to the configured reporting services. Args: client (AsyncClient): The async HTTP client to use for sending notifications. - message (BaseModel): The message to send as notification. + message (SnapshotterReportData): The message to send as notification. Returns: None """ + if settings.reporting.service_url: f = asyncio.ensure_future( client.post( url=urljoin(settings.reporting.service_url, '/reportIssue'), - json=message.dict(), + json=message.snapshotterIssue.dict(), ), ) f.add_done_callback(misc_notification_callback_result_handler) @@ -90,19 +95,35 @@ async def send_failure_notifications_async(client: AsyncClient, message: BaseMod f = asyncio.ensure_future( client.post( url=settings.reporting.slack_url, - json=message.dict(), + json=message.snapshotterIssue.dict(), ), ) f.add_done_callback(misc_notification_callback_result_handler) + if settings.reporting.telegram_url and settings.reporting.telegram_chat_id: + reporting_message = TelegramSnapshotterReportMessage( + chatId=settings.reporting.telegram_chat_id, + slotId=settings.slot_id, + issue=message.snapshotterIssue, + status=message.snapshotterStatus, + ) + + f = asyncio.ensure_future( + client.post( + url=urljoin(settings.reporting.telegram_url, '/reportSnapshotIssue'), + json=reporting_message.dict(), + ), + ) + f.add_done_callback(misc_notification_callback_result_handler) -def send_failure_notifications_sync(client: SyncClient, message: BaseModel): + +def send_failure_notifications_sync(client: SyncClient, message: SnapshotterReportData): """ - Sends failure notifications synchronously to the reporting service and/or Slack. + Sends failure notifications synchronously to to the configured reporting services. Args: client (SyncClient): The HTTP client to use for sending notifications. - message (BaseModel): The message to send as notification. + message (SnapshotterReportData): The message to send as notification. Returns: None @@ -111,7 +132,7 @@ def send_failure_notifications_sync(client: SyncClient, message: BaseModel): f = functools.partial( client.post, url=urljoin(settings.reporting.service_url, '/reportIssue'), - json=message.dict(), + json=message.snapshotterIssue.dict(), ) sync_notification_callback_result_handler(f) @@ -119,8 +140,76 @@ def send_failure_notifications_sync(client: SyncClient, message: BaseModel): f = functools.partial( client.post, url=settings.reporting.slack_url, - json=message.dict(), + json=message.snapshotterIssue.dict(), + ) + sync_notification_callback_result_handler(f) + + if settings.reporting.telegram_url and settings.reporting.telegram_chat_id: + reporting_message = TelegramSnapshotterReportMessage( + chatId=settings.reporting.telegram_chat_id, + slotId=settings.slot_id, + issue=message.snapshotterIssue, + status=message.snapshotterStatus, + ) + + f = functools.partial( + client.post, + url=urljoin(settings.reporting.telegram_url, '/reportSnapshotIssue'), + json=reporting_message.dict(), + ) + sync_notification_callback_result_handler(f) + + +async def send_epoch_processing_failure_notification_async(client: AsyncClient, message: EpochProcessingIssue): + """ + Sends epoch processing failure notifications synchronously to the telegarm reporting service. + + Args: + client (SyncClient): The HTTP client to use for sending notifications. + message (EpochProcessingIssue): The message to send as notification. + + Returns: + None + """ + if settings.reporting.telegram_url and settings.reporting.telegram_chat_id: + reporting_message = TelegramEpochProcessingReportMessage( + chatId=settings.reporting.telegram_chat_id, + slotId=settings.slot_id, + issue=message, + ) + + f = asyncio.ensure_future( + client.post( + url=urljoin(settings.reporting.telegram_url, '/reportEpochProcessingIssue'), + json=reporting_message.dict(), + ), + ) + f.add_done_callback(misc_notification_callback_result_handler) + + +def send_epoch_processing_failure_notification_sync(client: SyncClient, message: EpochProcessingIssue): + """ + Sends epoch processing failure notifications synchronously to the telegarm reporting service. + + Args: + client (SyncClient): The HTTP client to use for sending notifications. + message (EpochProcessingIssue): The message to send as notification. + + Returns: + None + """ + if settings.reporting.telegram_url and settings.reporting.telegram_chat_id: + reporting_message = TelegramEpochProcessingReportMessage( + chatId=settings.reporting.telegram_chat_id, + slotId=settings.slot_id, + issue=message, ) + + f = functools.partial( + client.post, + url=urljoin(settings.reporting.telegram_url, '/reportEpochProcessingIssue'), + json=reporting_message.dict(), + ) sync_notification_callback_result_handler(f) diff --git a/snapshotter/utils/generic_worker.py b/snapshotter/utils/generic_worker.py index dc07b28..4fdb1f4 100644 --- a/snapshotter/utils/generic_worker.py +++ b/snapshotter/utils/generic_worker.py @@ -19,6 +19,7 @@ from grpclib.client import Channel from httpx import AsyncClient from httpx import AsyncHTTPTransport +from httpx import Client from httpx import Limits from httpx import Timeout from ipfs_cid import cid_sha256_hash @@ -34,10 +35,13 @@ from snapshotter.settings.config import settings from snapshotter.utils.callback_helpers import misc_notification_callback_result_handler from snapshotter.utils.callback_helpers import send_failure_notifications_async +from snapshotter.utils.callback_helpers import send_failure_notifications_sync from snapshotter.utils.default_logger import logger from snapshotter.utils.file_utils import read_json_file from snapshotter.utils.models.data_models import SnapshotterIssue +from snapshotter.utils.models.data_models import SnapshotterReportData from snapshotter.utils.models.data_models import SnapshotterReportState +from snapshotter.utils.models.data_models import SnapshotterStatus from snapshotter.utils.models.message_models import SnapshotProcessMessage from snapshotter.utils.models.message_models import SnapshotSubmittedMessage from snapshotter.utils.models.message_models import SnapshotSubmittedMessageLite @@ -127,6 +131,7 @@ def __init__(self): self.protocol_state_contract_address = settings.protocol_state.address self.initialized = False self.logger = logger.bind(module='GenericAsyncWorker') + self.status = SnapshotterStatus(projects=[]) def _notification_callback_result_handler(self, fut: asyncio.Future): """ @@ -250,6 +255,28 @@ async def _submit_to_snap_api_and_check(self, project_id: str, epoch: SnapshotPr '❌ Simulation snapshot generation failed: {}', epoch, ) self.logger.info('Please check your config and if issue persists please reach out to the team!') + self.status.totalMissedSubmissions += 1 + self.status.consecutiveMissedSubmissions += 1 + notification_message = SnapshotterReportData( + snapshotterIssue=SnapshotterIssue( + instanceID=settings.instance_id, + issueType=SnapshotterReportState.MISSED_SNAPSHOT.value, + projectID=project_id, + epochId=str(epoch.epochId), + timeOfReporting=str(time.time()), + extra=json.dumps({ + 'issueDetails': f'Error : {e}', + }), + ), + snapshotterStatus=self.status, + ) + sync_client = Client( + timeout=Timeout(timeout=5.0), + follow_redirects=False, + ) + send_failure_notifications_sync( + client=sync_client, message=notification_message, + ) sys.exit(1) @asynccontextmanager @@ -378,13 +405,18 @@ async def _commit_payload( 'Exception uploading snapshot to IPFS for epoch {}: {}, Error: {},' 'sending failure notifications', epoch, snapshot, e, ) - notification_message = SnapshotterIssue( - instanceID=settings.instance_id, - issueType=SnapshotterReportState.MISSED_SNAPSHOT.value, - projectID=project_id, - epochId=str(epoch.epochId), - timeOfReporting=str(time.time()), - extra=json.dumps({'issueDetails': f'Error : {e}'}), + self.status.totalMissedSubmissions += 1 + self.status.consecutiveMissedSubmissions += 1 + notification_message = SnapshotterReportData( + snapshotterIssue=SnapshotterIssue( + instanceID=settings.instance_id, + issueType=SnapshotterReportState.MISSED_SNAPSHOT.value, + projectID=project_id, + epochId=str(epoch.epochId), + timeOfReporting=str(time.time()), + extra=json.dumps({'issueDetails': f'Error : {e}'}), + ), + snapshotterStatus=self.status, ) await send_failure_notifications_async( client=self._client, message=notification_message, @@ -398,17 +430,27 @@ async def _commit_payload( 'Exception submitting snapshot to collector for epoch {}: {}, Error: {},' 'sending failure notifications', epoch, snapshot, e, ) - notification_message = SnapshotterIssue( - instanceID=settings.instance_id, - issueType=SnapshotterReportState.MISSED_SNAPSHOT.value, - projectID=project_id, - epochId=str(epoch.epochId), - timeOfReporting=str(time.time()), - extra=json.dumps({'issueDetails': f'Error : {e}'}), + self.status.totalMissedSubmissions += 1 + self.status.consecutiveMissedSubmissions += 1 + + notification_message = SnapshotterReportData( + snapshotterIssue=SnapshotterIssue( + instanceID=settings.instance_id, + issueType=SnapshotterReportState.MISSED_SNAPSHOT.value, + projectID=project_id, + epochId=str(epoch.epochId), + timeOfReporting=str(time.time()), + extra=json.dumps({'issueDetails': f'Error : {e}'}), + ), + snapshotterStatus=self.status, ) await send_failure_notifications_async( client=self._client, message=notification_message, ) + else: + # reset consecutive missed snapshots counter + self.status.consecutiveMissedSubmissions = 0 + self.status.totalSuccessfulSubmissions += 1 # upload to web3 storage if storage_flag: @@ -559,7 +601,11 @@ async def _init_protocol_meta(self): # TODO: combine these into a single call try: source_block_time = await self._anchor_rpc_helper.web3_call( - [self.protocol_state_contract.functions.SOURCE_CHAIN_BLOCK_TIME(Web3.to_checksum_address(settings.data_market))], + [ + self.protocol_state_contract.functions.SOURCE_CHAIN_BLOCK_TIME( + Web3.to_checksum_address(settings.data_market), + ), + ], ) except Exception as e: self.logger.exception( diff --git a/snapshotter/utils/models/data_models.py b/snapshotter/utils/models/data_models.py index 50d681a..0ea6e5d 100644 --- a/snapshotter/utils/models/data_models.py +++ b/snapshotter/utils/models/data_models.py @@ -134,6 +134,7 @@ class SnapshotterStatus(BaseModel): totalSuccessfulSubmissions: int = 0 totalIncorrectSubmissions: int = 0 totalMissedSubmissions: int = 0 + consecutiveMissedSubmissions: int = 0 projects: List[ProjectStatus] @@ -186,3 +187,29 @@ class UnfinalizedSnapshot(BaseModel): class TaskStatusRequest(BaseModel): task_type: str wallet_address: str + + +class SnapshotterReportData(BaseModel): + snapshotterIssue: SnapshotterIssue + snapshotterStatus: SnapshotterStatus + + +class TelegramSnapshotterReportMessage(BaseModel): + chatId: int + slotId: int + issue: SnapshotterIssue + status: SnapshotterStatus + + +class EpochProcessingIssue(BaseModel): + instanceID: str + issueType: str + timeOfReporting: str + extra: Optional[str] = '' + + +class TelegramEpochProcessingReportMessage(BaseModel): + chatId: int + slotId: int + issue: EpochProcessingIssue + diff --git a/snapshotter/utils/models/settings_model.py b/snapshotter/utils/models/settings_model.py index ccd2404..5aa6fdc 100644 --- a/snapshotter/utils/models/settings_model.py +++ b/snapshotter/utils/models/settings_model.py @@ -47,6 +47,9 @@ class Timeouts(BaseModel): class ReportingConfig(BaseModel): slack_url: str service_url: str + telegram_url: str + telegram_chat_id: str + failure_report_frequency: int class Logs(BaseModel): diff --git a/snapshotter/utils/snapshot_worker.py b/snapshotter/utils/snapshot_worker.py index 8f36cf0..aacbe40 100644 --- a/snapshotter/utils/snapshot_worker.py +++ b/snapshotter/utils/snapshot_worker.py @@ -13,6 +13,7 @@ from snapshotter.utils.data_utils import get_snapshot_submision_window from snapshotter.utils.generic_worker import GenericAsyncWorker from snapshotter.utils.models.data_models import SnapshotterIssue +from snapshotter.utils.models.data_models import SnapshotterReportData from snapshotter.utils.models.data_models import SnapshotterReportState from snapshotter.utils.models.message_models import SnapshotProcessMessage @@ -97,13 +98,19 @@ async def _process(self, msg_obj: SnapshotProcessMessage, task_type: str, eth_pr 'sending failure notifications', msg_obj, e, ) - notification_message = SnapshotterIssue( - instanceID=settings.instance_id, - issueType=SnapshotterReportState.MISSED_SNAPSHOT.value, - projectID=f'{task_type}:{settings.namespace}', - epochId=str(msg_obj.epochId), - timeOfReporting=str(time.time()), - extra=json.dumps({'issueDetails': f'Error : {e}'}), + self.status.totalMissedSubmissions += 1 + self.status.consecutiveMissedSubmissions += 1 + + notification_message = SnapshotterReportData( + snapshotterIssue=SnapshotterIssue( + instanceID=settings.instance_id, + issueType=SnapshotterReportState.MISSED_SNAPSHOT.value, + projectID=f'{task_type}:{settings.namespace}', + epochId=str(msg_obj.epochId), + timeOfReporting=str(time.time()), + extra=json.dumps({'issueDetails': f'Error : {e}'}), + ), + snapshotterStatus=self.status, ) await send_failure_notifications_async( diff --git a/snapshotter_autofill.sh b/snapshotter_autofill.sh index fd1ecbe..58d6313 100755 --- a/snapshotter_autofill.sh +++ b/snapshotter_autofill.sh @@ -53,7 +53,15 @@ if [ "$DATA_MARKET_CONTRACT" ]; then fi if [ "$POWERLOOM_REPORTING_URL" ]; then - echo "Found SLACK_REPORTING_URL ${POWERLOOM_REPORTING_URL}"; + echo "Found POWERLOOM_REPORTING_URL ${POWERLOOM_REPORTING_URL}"; +fi + +if [ "$TELEGRAM_REPORTING_URL" ]; then + echo "Found TELEGRAM_REPORTING_URL ${TELEGRAM_REPORTING_URL}"; +fi + +if [ "$TELEGRAM_CHAT_ID" ]; then + echo "Found TELEGRAM_CHAT_ID ${TELEGRAM_CHAT_ID}"; fi if [ "$WEB3_STORAGE_TOKEN" ]; then @@ -76,7 +84,8 @@ export web3_storage_token="${WEB3_STORAGE_TOKEN:-}" export local_collector_port="${LOCAL_COLLECTOR_PORT:-50051}" export slack_reporting_url="${SLACK_REPORTING_URL:-}" export powerloom_reporting_url="${POWERLOOM_REPORTING_URL:-}" - +export telegram_reporting_url="${TELEGRAM_REPORTING_URL:-}" +export telegram_chat_id="${TELEGRAM_CHAT_ID:-}" # If IPFS_URL is empty, clear IPFS API key and secret @@ -94,6 +103,8 @@ echo "Using data market contract: ${DATA_MARKET_CONTRACT}" echo "Using slack reporting url: ${slack_reporting_url}" echo "Using powerloom reporting url: ${powerloom_reporting_url}" echo "Using web3 storage token: ${web3_storage_token}" +echo "Using telegram reporting url: ${telegram_reporting_url}" +echo "Using telegram chat id: ${telegram_chat_id}" sed -i'.backup' "s#relevant-namespace#$namespace#" config/settings.json @@ -123,4 +134,7 @@ sed -i'.backup' "s#signer-account-private-key#$SIGNER_ACCOUNT_PRIVATE_KEY#" conf sed -i'.backup' "s#local-collector-port#$local_collector_port#" config/settings.json +sed -i'.backup' "s#https://telegram-reporting-url#$telegram_reporting_url#" config/settings.json +sed -i'.backup' "s#telegram-chat-id#$telegram_chat_id#" config/settings.json + echo 'settings has been populated!'