Skip to content

Commit

Permalink
Merge pull request #51 from PowerLoom/feat/telegram-health-monitoring
Browse files Browse the repository at this point in the history
 Telegram notifications
  • Loading branch information
SwaroopH authored Sep 2, 2024
2 parents 203a542 + 3f50726 commit 42be68c
Show file tree
Hide file tree
Showing 15 changed files with 375 additions and 46 deletions.
14 changes: 11 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ NOTE: It is recommended to run `build.sh` in a screen or tmux session so that th
git clone https://github.com/PowerLoom/snapshotter-lite-v2.git powerloom-pre-mainnet
```
This will clone the repository into a directory named `powerloom`.

3. Change your working directory to the `powerloom-pre-mainnet` directory:
```bash
cd powerloom-pre-mainnet
Expand Down Expand Up @@ -233,7 +233,7 @@ If you want to run the Snapshotter Lite Node without Docker, you need to make su
git clone https://github.com/PowerLoom/snapshotter-lite-v2.git powerloom-pre-mainnet
```
This will clone the repository into a directory named `powerloom`.
2. Change your working directory to the `powerloom-pre-mainnet` directory:
```bash
cd powerloom-pre-mainnet
Expand All @@ -248,9 +248,17 @@ If you want to run the Snapshotter Lite Node without Docker, you need to make su
5. Your node should start in background and you should start seeing logs in your terminal.
6. To stop the node, you can run `pkill -f snapshotter` in a new terminal window.
## Monitoring and Debugging
### Monitoring
To enable Telegram reporting for snapshotter issues:
1. Open the conversation with [@PowerloomReportingBot](https://t.me/PowerloomReportingBot) in the Telegram App and start a conversation.
2. Start the bot by typing the `/start` command in the chat. You will receive a response containing your `Chat ID` for the bot.
3. Enter the `Chat ID` when prompted on node startup.
4. You will now receive an error report whenever your node fails to process an epoch or snapshot.
### Debugging
Usually the easiest way to fix node related issues is to restart the node. If you're facing issues with the node, you can try going through the logs present in the `logs` directory. If you're unable to find the issue, you can reach out to us on [Discord](https://powerloom.io/discord) and we will be happy to help you out.
Expand Down
7 changes: 7 additions & 0 deletions build-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ if [ ! -f .env ]; then
sed -i'.backup' "s#<slot-id>#$SLOT_ID#" .env
fi

# ask user for TELEGRAM_CHAT_ID and replace it in .env
if [ -z "$TELEGRAM_CHAT_ID" ]; then
echo "Enter Your TELEGRAM_CHAT_ID (Optional, leave blank to skip.): ";
read TELEGRAM_CHAT_ID;
sed -i'.backup' "s#<telegram-chat-id>#$TELEGRAM_CHAT_ID#" .env
fi

fi

source .env
Expand Down
7 changes: 7 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ if [ ! -f .env ]; then
sed -i'.backup' "s#<slot-id>#$SLOT_ID#" .env
fi

# ask user for TELEGRAM_CHAT_ID and replace it in .env
if [ -z "$TELEGRAM_CHAT_ID" ]; then
echo "Enter Your TELEGRAM_CHAT_ID (Optional, leave blank to skip.): ";
read TELEGRAM_CHAT_ID;
sed -i'.backup' "s#<telegram-chat-id>#$TELEGRAM_CHAT_ID#" .env
fi

fi

source .env
Expand Down
2 changes: 2 additions & 0 deletions docker-compose-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ services:
- POWERLOOM_REPORTING_URL=$POWERLOOM_REPORTING_URL
- WEB3_STORAGE_TOKEN=$WEB3_STORAGE_TOKEN
- NAMESPACE=$NAMESPACE
- TELEGRAM_REPORTING_URL=$TELEGRAM_REPORTING_URL
- TELEGRAM_CHAT_ID=$TELEGRAM_CHAT_ID
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8002/health"]
interval: 10s
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ services:
- POWERLOOM_REPORTING_URL=$POWERLOOM_REPORTING_URL
- WEB3_STORAGE_TOKEN=$WEB3_STORAGE_TOKEN
- NAMESPACE=$NAMESPACE
- TELEGRAM_REPORTING_URL=$TELEGRAM_REPORTING_URL
- TELEGRAM_CHAT_ID=$TELEGRAM_CHAT_ID
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8002/health"]
interval: 10s
Expand Down
2 changes: 2 additions & 0 deletions env.example
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ IPFS_API_SECRET=
SLACK_REPORTING_URL=
WEB3_STORAGE_TOKEN=
DASHBOARD_ENABLED=
TELEGRAM_REPORTING_URL=https://tg-testing.powerloom.io/
TELEGRAM_CHAT_ID=<telegram-chat-id>
30 changes: 20 additions & 10 deletions snapshotter/processor_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from web3 import Web3
import time

from snapshotter.utils.models.data_models import SnapshotterReportData
from snapshotter.utils.models.data_models import SnapshotterReportState
from snapshotter.utils.models.data_models import SnapshotterIssue
from snapshotter.settings.config import projects_config
from snapshotter.settings.config import settings
from snapshotter.utils.data_utils import get_snapshot_submision_window
Expand All @@ -31,6 +31,9 @@
from snapshotter.utils.snapshot_worker import SnapshotAsyncWorker
from snapshotter.utils.snapshot_utils import get_eth_price_usd
from snapshotter.utils.callback_helpers import send_failure_notifications_async
from snapshotter.utils.models.data_models import SnapshotterIssue
from snapshotter.utils.models.data_models import SnapshotterStatus


class ProcessorDistributor:
_anchor_rpc_helper: RpcHelper
Expand Down Expand Up @@ -132,7 +135,9 @@ async def init(self):
self._epoch_size = epoch_size

try:
snapshotter_address = self._protocol_state_contract.functions.slotSnapshotterMapping(settings.slot_id).call()
snapshotter_address = self._protocol_state_contract.functions.slotSnapshotterMapping(
settings.slot_id,
).call()
if snapshotter_address != to_checksum_address(settings.instance_id):
self._logger.error('Signer Account is not the one configured in slot, exiting!')
exit(0)
Expand Down Expand Up @@ -229,13 +234,18 @@ async def _epoch_release_processor(self, message: EpochReleasedEvent):
'Exception in getting eth price: {}',
e,
)
notification_message = SnapshotterIssue(
instanceID=settings.instance_id,
issueType=SnapshotterReportState.MISSED_SNAPSHOT.value,
projectID='ETH_PRICE_LOAD',
epochId=str(message.epochId),
timeOfReporting=str(time.time()),
extra=json.dumps({'issueDetails': f'Error : {e}'}),
self.snapshot_worker.status.totalMissedSubmissions += 1
self.snapshot_worker.status.consecutiveMissedSubmissions += 1
notification_message = SnapshotterReportData(
snapshotterIssue=SnapshotterIssue(
instanceID=settings.instance_id,
issueType=SnapshotterReportState.MISSED_SNAPSHOT.value,
projectID='ETH_PRICE_LOAD',
epochId=str(message.epochId),
timeOfReporting=str(time.time()),
extra=json.dumps({'issueDetails': f'Error : {e}'}),
),
snapshotterStatus=self.snapshot_worker.status,
)
await send_failure_notifications_async(
client=self._client, message=notification_message,
Expand All @@ -246,7 +256,7 @@ async def _epoch_release_processor(self, message: EpochReleasedEvent):
# release for snapshotting
asyncio.ensure_future(
self._distribute_callbacks_snapshotting(
project_type, epoch, eth_price_dict
project_type, epoch, eth_price_dict,
),
)

Expand Down
42 changes: 42 additions & 0 deletions snapshotter/system_event_detector.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import asyncio
import multiprocessing
import resource
Expand All @@ -12,12 +13,15 @@
import sys
from snapshotter.processor_distributor import ProcessorDistributor
from snapshotter.settings.config import settings
from snapshotter.utils.callback_helpers import send_epoch_processing_failure_notification_sync
from snapshotter.utils.default_logger import logger
from snapshotter.utils.exceptions import GenericExitOnSignal
from snapshotter.utils.file_utils import read_json_file
from snapshotter.utils.models.data_models import DailyTaskCompletedEvent
from snapshotter.utils.models.data_models import DayStartedEvent
from snapshotter.utils.models.data_models import EpochReleasedEvent
from snapshotter.utils.models.data_models import EpochProcessingIssue
from snapshotter.utils.models.data_models import SnapshotterReportState
from snapshotter.utils.rpc import get_event_sig_and_abi
from snapshotter.utils.rpc import RpcHelper
from urllib.parse import urljoin
Expand Down Expand Up @@ -76,6 +80,7 @@ def __init__(self, name, **kwargs):
abi=self.contract_abi,
)
self._last_reporting_service_ping = 0
self._last_reporting_message_sent = 0

# event EpochReleased(uint256 indexed epochId, uint256 begin, uint256 end, uint256 timestamp);
# event DayStartedEvent(uint256 dayId, uint256 timestamp);
Expand Down Expand Up @@ -132,6 +137,16 @@ async def _init_check_and_report(self):
'❌ Dummy Event processing failed! Error: {}', e,
)
self._logger.info("Please check your config and if issue persists please reach out to the team!")
notification_message = EpochProcessingIssue(
instanceID=settings.instance_id,
issueType=SnapshotterReportState.UNHEALTHY_EPOCH_PROCESSING.value,
timeOfReporting=str(time.time()),
extra=json.dumps({'issueDetails': f'Error : {e}'})
)
send_epoch_processing_failure_notification_sync(
client=self._httpx_client,
message=notification_message
)
sys.exit(1)

async def get_events(self, from_block: int, to_block: int):
Expand Down Expand Up @@ -251,6 +266,19 @@ async def _detect_events(self):
settings.rpc.polling_interval,
)

if int(time.time()) - self._last_reporting_message_sent >= 600:
self._last_reporting_message_sent = int(time.time())
notification_message = EpochProcessingIssue(
instanceID=settings.instance_id,
issueType=SnapshotterReportState.UNHEALTHY_EPOCH_PROCESSING.value,
timeOfReporting=str(time.time()),
extra=json.dumps({'issueDetails': f'Error : {e}'})
)
send_epoch_processing_failure_notification_sync(
client=self._httpx_client,
message=notification_message
)

await asyncio.sleep(settings.rpc.polling_interval)
continue

Expand Down Expand Up @@ -286,6 +314,20 @@ async def _detect_events(self):
e,
settings.rpc.polling_interval,
)

if int(time.time()) - self._last_reporting_message_sent >= 600:
self._last_reporting_message_sent = int(time.time())
notification_message = EpochProcessingIssue(
instanceID=settings.instance_id,
issueType=SnapshotterReportState.UNHEALTHY_EPOCH_PROCESSING.value,
timeOfReporting=str(time.time()),
extra=json.dumps({'issueDetails': f'Error : {e}'})
)
send_epoch_processing_failure_notification_sync(
client=self._httpx_client,
message=notification_message
)

await asyncio.sleep(settings.rpc.polling_interval)
continue

Expand Down
63 changes: 63 additions & 0 deletions snapshotter/tests/test_tg_reporting_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import time
import json
import asyncio
from httpx import AsyncClient
from httpx import AsyncHTTPTransport
from httpx import Limits
from httpx import Timeout
from snapshotter.utils.callback_helpers import send_failure_notifications_async
from snapshotter.utils.models.data_models import SnapshotterReportData
from snapshotter.utils.models.data_models import SnapshotterIssue
from snapshotter.utils.models.data_models import SnapshotterReportState
from snapshotter.utils.models.data_models import SnapshotterStatus
from snapshotter.utils.default_logger import logger
from snapshotter.settings.config import settings



# ensure telegram__url / telegram_chat_id are set in config/settings.json
# telegram_url endpoint needs to be active
async def test_tg_reporting_call():

project_id = 'test_project_id'
epoch_id = 0

async_client = AsyncClient(
timeout=Timeout(timeout=5.0),
follow_redirects=False,
transport=AsyncHTTPTransport(
limits=Limits(
max_connections=200,
max_keepalive_connections=50,
keepalive_expiry=None,
),
),
)

notification_message = SnapshotterReportData(
snapshotterIssue=SnapshotterIssue(
instanceID=settings.instance_id,
issueType=SnapshotterReportState.MISSED_SNAPSHOT.value,
projectID=project_id,
epochId=epoch_id,
timeOfReporting=str(time.time()),
extra=json.dumps({'issueDetails': f'Error : TEST ERROR MESSAGE'}),
),
snapshotterStatus=SnapshotterStatus(
projects=[],
),
)

await send_failure_notifications_async(
client=async_client, message=notification_message,
)

# wait for the callback to complete
await asyncio.sleep(5)


if __name__ == '__main__':
try:
asyncio.get_event_loop().run_until_complete(test_tg_reporting_call())
except Exception as e:
logger.opt(exception=True).error('exception: {}', e)
Loading

0 comments on commit 42be68c

Please sign in to comment.