Skip to content

Commit

Permalink
Feature/telegram bot (#36)
Browse files Browse the repository at this point in the history
* telegram bot structure

* Telegram Bot "Harry" is ready to get on duty!

* Added short readme

* Update README.md

Co-authored-by: tomazmm <[email protected]>
Co-authored-by: Tomaž Mesarec <[email protected]>
  • Loading branch information
3 people authored Sep 18, 2020
1 parent 3471e92 commit da65e1b
Show file tree
Hide file tree
Showing 9 changed files with 336 additions and 192 deletions.
6 changes: 2 additions & 4 deletions distribute/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#base image
FROM python:3.7

COPY main.py/ /
COPY requirements.txt/ /
COPY variables.env/ /
ADD . /

RUN pip3 install -r requirements.txt

CMD [ "python3", "./main.py" ]
CMD [ "python3", "bot/licx_distribute_bot.py" ]
17 changes: 17 additions & 0 deletions distribute/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Liquid-ICX Telegram Bot

### Development
The easiest way to start a bot is via:
```
python3 licx_distribute_bot.py
```
### Production
It is recommended to run a bot in a docker container.
Execute to command bellow to build an image:
```
docker build -t licx-bot .
```
And later, to run the docker container execute:
```
docker run --env-file variables.env --name licx-bot licx-bot
```
Empty file added distribute/bot/__init__.py
Empty file.
24 changes: 24 additions & 0 deletions distribute/bot/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import logging
import os

from iconsdk.icon_service import IconService
from iconsdk.providers.http_provider import HTTPProvider
from iconsdk.wallet.wallet import KeyWallet

TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
ICX_SERVICE = IconService(HTTPProvider(os.getenv("ICON_SERVICE_PROVIDER")))
SCORE_ADDRESS = os.getenv("SCORE_ADDRESS")
TRACKER_API = os.getenv("TRACKER_API_URL")
WALLET = KeyWallet.load(bytes.fromhex(os.getenv("PRIVATE_KEY")))
ADMIN_USER_IDS = [int(admin_id) for admin_id in
os.environ['ADMIN_USER_IDS'].split(",")] if 'ADMIN_USER_IDS' in os.environ else []

# Enable logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)

# Paths
storage_path = os.sep.join([os.path.dirname(os.path.realpath(__file__)), os.path.pardir, 'storage'])
session_data_path = os.sep.join([storage_path, 'session.data'])

JOB_INTERVAL_IN_SECONDS = 10
190 changes: 190 additions & 0 deletions distribute/bot/licx_distribute_bot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
from telegram import TelegramError
from telegram.ext import Updater, PicklePersistence, CommandHandler, run_async
from constants import *
from service.icon_network_service import *

SCORE_CREATED_HEIGHT = getCreatedSCOREHeight(getCreateTX())


def error(update, context):
logger.warning('Update "%s" caused error: %s', update, context.error)


def setup_existing_user(dispatcher):
"""
Tasks to ensure smooth user experience for existing users upon Bot restart
"""

# Iterate over all existing users
chat_ids = dispatcher.user_data.keys()
delete_chat_ids = []
for chat_id in filter(lambda x: x in ADMIN_USER_IDS, chat_ids):
# Send a notification to existing users that the Bot got restarted
restart_message = '여보세요!\n' \
'Me, *Harry*, just got restarted on the server! 🤖\n' \
'To make sure you have the latest features, please start ' \
'a fresh chat with me by typing /start.'
try:
dispatcher.bot.send_message(chat_id, restart_message, parse_mode='markdown')
except TelegramError as e:
if 'bot was blocked by the user' in e.message:
delete_chat_ids.append(chat_id)
continue
else:
print("Got Error\n" + str(e) + "\nwith telegram user " + str(chat_id))

# Start monitoring jobs for all existing users
if 'job_started' not in dispatcher.user_data[chat_id]:
dispatcher.user_data[chat_id]['job_started'] = True
dispatcher.job_queue.run_repeating(distribution_ready_check, interval=JOB_INTERVAL_IN_SECONDS, context={
'chat_id': chat_id,
'user_data': dispatcher.user_data[chat_id]
})

for chat_id in delete_chat_ids:
logger.warning("Telegram user " + str(chat_id) + " blocked me; removing him from the user list")
delete_user(dispatcher, chat_id)


def delete_user(dispatcher, chat_id):
del dispatcher.user_data[chat_id]
del dispatcher.chat_data[chat_id]
del dispatcher.persistence.user_data[chat_id]
del dispatcher.persistence.chat_data[chat_id]

# Somehow session.data does not get updated if all users block the bot.
# That's why we delete the file ourselves.
if len(dispatcher.persistence.user_data) == 0:
if os.path.exists(session_data_path):
os.remove(session_data_path)


@run_async
def start(update, context):
"""
Send start message and start distribute job
"""

if not is_admin(update):
return

# Start job for user
if 'job_started' not in context.user_data:
context.job_queue.run_repeating(distribution_ready_check, interval=JOB_INTERVAL_IN_SECONDS, context={
'chat_id': update.message.chat.id,
'user_data': context.user_data
})
context.user_data['job_started'] = True
context.user_data['nodes'] = {}

update.message.reply_text(f"여보세요!\n"
f"I'm *Harry*, your *Liquid ICX distribution officer*! 🤖\n\n"
f"Once per *ICON Term* I call the *distribute* function "
f"of the LICX SCORE and notify you.\n"
f"To *manually* invoke distribute send me the /distribute command.\n\n"
f"ICON Term = 43120 blocks\n"
f"SCORE Address = {SCORE_ADDRESS}\n\n"
f"See you later!", parse_mode='markdown')

@run_async
def distribute_handler(update, context):
"""
Distribute ready check called by hand
"""

if not is_admin(update):
return

term_bounds = getCurrentTermBounds()
last_distribute_height = getLastDistributeEventHeight()
chat_id = update.message.chat.id
try:
distribute(context, chat_id, term_bounds, last_distribute_height)
except Exception as e:
logger.error(f"Distribute call failed:\n{e}")
context.bot.send_message(chat_id,
f"‼️ *LICX* Distribute called *failed* "
f"for term {term_bounds['start']} - {term_bounds['end']}\n\n"
f"Error message:\n"
f"{e.message}",
parse_mode='markdown')


def distribution_ready_check(context):
"""
This job executes distribute at the right time
"""

logger.info("checking if distribution is ready")
try:
term_bounds = getCurrentTermBounds()
last_distribute_height = getLastDistributeEventHeight()
if SCORE_CREATED_HEIGHT + (43120 * 2) < term_bounds["start"] and term_bounds["start"] > last_distribute_height:
distribute(context, context.job.context['chat_id'], term_bounds, last_distribute_height)
except Exception as e:
logger.error(e)


def distribute(context, chat_id, term_bounds, initial_distribute_height):
"""
Send distribute TX until new Distribute Event is emitted
"""

logger.info("distribution starts")
context.bot.send_message(chat_id,
f"*LICX* Joining, Reward Distribution, and Leaving is *starting* for "
f"term {term_bounds['start']} - {term_bounds['end']}!",
parse_mode='markdown')

while True:
logger.info("distribution iteration")
send_distribute_tx()
sleep(3)
if initial_distribute_height != getLastDistributeEventHeight():
logger.info("distribution ended")
context.bot.send_message(chat_id,
f"*LICX* Joining, Reward Distribution, and Leaving *successfully "
f"finished* for term {term_bounds['start']} - {term_bounds['end']}!",
parse_mode='markdown')
break


def is_admin(update):
if update.effective_user.id not in ADMIN_USER_IDS:
update.message.reply_text(f"❌ You are not an Admin! ❌\n"
f"I'm *Harry*, I'm a loyal bot.",
parse_mode='markdown')
return 0
return 1


def main():
"""
Init telegram bot, attach handlers and wait for incoming requests.
"""

# Init telegram bot
bot = Updater(TELEGRAM_BOT_TOKEN, persistence=PicklePersistence(filename=session_data_path),
use_context=True)
dispatcher = bot.dispatcher

setup_existing_user(dispatcher)

dispatcher.add_handler(CommandHandler('start', start))
dispatcher.add_handler(CommandHandler('distribute', distribute_handler))

# Add error handler
dispatcher.add_error_handler(error)

# Start the bot
bot.start_polling()
logger.info('LICX Distribute Bot is running...')

# Run the bot until you press Ctrl-C or the process receives SIGINT,
# SIGTERM or SIGABRT. This should be used most of the time, since
# start_polling() is non-blocking and will stop the bot gracefully.
bot.idle()


if __name__ == '__main__':
main()
97 changes: 97 additions & 0 deletions distribute/bot/service/icon_network_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import requests
from iconsdk.builder.transaction_builder import CallTransactionBuilder

from iconsdk.exception import JSONRPCException
from iconsdk.signed_transaction import SignedTransaction
from tbears.libs.icon_integrate_test import SCORE_INSTALL_ADDRESS
from iconsdk.builder.call_builder import CallBuilder
from time import sleep
from constants import *


def getCurrentTermBounds() -> dict:
"""
:return: Term start/end-block height
"""

call = CallBuilder() \
.to(SCORE_INSTALL_ADDRESS) \
.method("getPRepTerm") \
.build()
prep_term = ICX_SERVICE.call(call)
return {
"start": int(prep_term["startBlockHeight"], 16),
"end": int(prep_term["endBlockHeight"], 16)
}


def getTXResult(tx_hash) -> dict:
while True:
try:
return ICX_SERVICE.get_transaction_result(tx_hash)
except JSONRPCException as e:
if e.args[0]["message"] == "Pending transaction":
sleep(1)


def getLastDistributeEventHeight() -> int:
params = {"page": 1, "count": 1000, "contractAddr": SCORE_ADDRESS}
contract_addr_response = requests.get(TRACKER_API + "/contract/eventLogList", params=params)

if contract_addr_response.status_code != 200:
raise BadRequestStatusException(contract_addr_response)

for log in contract_addr_response.json()["data"]:
found = log["eventLog"].find("Distribute")
if found > 0:
return log["height"]

return 0


def getCreateTX() -> str:
while True:
params = {"addr": SCORE_ADDRESS}
addr_response = requests.get(TRACKER_API + "/contract/info", params=params)
if addr_response.status_code == 200:
return addr_response.json()["data"]["createTx"]


def getCreatedSCOREHeight(create_tx) -> int:
while True:
params = {"txHash": create_tx}
tx_hash_response = requests.get(TRACKER_API + "/transaction/txDetail", params=params)
if tx_hash_response.status_code == 200:
return tx_hash_response.json()["data"]["height"]


def send_distribute_tx():
tx = CallTransactionBuilder() \
.from_(WALLET.get_address()) \
.to(SCORE_ADDRESS) \
.value(0) \
.nid(3) \
.step_limit(500000000) \
.nonce(100) \
.method("distribute") \
.params({}) \
.build()
tx = SignedTransaction(tx, WALLET)
tx_result = getTXResult(ICX_SERVICE.send_transaction(tx))
if tx_result['status'] == 0:
raise BadTxStatusException(tx_result)
logger.info(tx_result)


class BadRequestStatusException(Exception):
def __init__(self, response: requests.Response):
self.message = "Error while network request.\nReceived status code: " + \
str(response.status_code) + '\nReceived response: ' + response.text


class BadTxStatusException(Exception):
def __init__(self, tx_result: dict):
self.message = f"Error while *sending TX*.\n" \
f"Received status code: {tx_result['status']}\n" \
f"Received failure code: {tx_result['failure']['code']}\n" \
f"Received failure message: '{tx_result['failure']['message']}'"
Loading

0 comments on commit da65e1b

Please sign in to comment.