Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[add] MQTT backend #41

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 27 additions & 24 deletions chanconfig.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,31 @@
from plugins import (UrlShow, Twitter, Topic, Space, Reminder, TechWednesday, React,
TwitterStream, VUBMenu, Ascii, Giphy, Poll, StationMaster, LesRepublicains, CCC35)
TwitterStream, VUBMenu, Ascii, Giphy, Poll, StationMaster, LesRepublicains, CCC35,
MQTTPlugin)
from ircbot.plugin import HelpPlugin
from config import TWITTER_CONFIG, GIPHY_KEY
from config import TWITTER_CONFIG, GIPHY_KEY, MQTT_HOST, MQTT_TOPICS

# Rate limit for incoming UrLab notifications in seconds
RATELIMIT = {
# Hal events
'bell': 120,
'passage': 3600,
'kitchen_move': 3600,
'doors_stairs': 900,

# Incubator activity stream
'Event.a créé': 900,
'Event.a édité': 900,

'Project.a créé': 900,
'Project.a édité': 900,
'Project.participe à': 900,

'Task.a ajouté la tâche': 3600,
'Task.a fini la tâche': 3600,
'Task.a ré-ajouté la tâche': 3600,

'wiki.revision': 300,
}


CHANS = {
Expand All @@ -22,6 +46,7 @@
StationMaster(),
LesRepublicains(),
CCC35(),
MQTTPlugin(MQTT_HOST, MQTT_TOPICS, RATELIMIT),
],
'QUERY': [
TechWednesday(),
Expand All @@ -30,28 +55,6 @@
],
}

# Rate limit for incoming UrLab notifications in seconds
RATELIMIT = {
# Hal events
'bell': 120,
'passage': 3600,
'kitchen_move': 3600,
'doors_stairs': 900,

# Incubator activity stream
'Event.a créé': 900,
'Event.a édité': 900,

'Project.a créé': 900,
'Project.a édité': 900,
'Project.participe à': 900,

'Task.a ajouté la tâche': 3600,
'Task.a fini la tâche': 3600,
'Task.a ré-ajouté la tâche': 3600,

'wiki.revision': 300,
}

try:
from local_chanconfig import CHANS, RATELIMIT
Expand Down
7 changes: 7 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@

GIPHY_KEY = "secret"

# MQTT configuration
MQTT_HOST = "hal.lan"
MQTT_TOPICS = [
"incubator/actstream",
"hal/eventstream",
]

try:
from local_config import * # pragma: no flakes # NOQA
except ImportError:
Expand Down
1 change: 1 addition & 0 deletions plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@
from .station_master import StationMaster
from .les_republicains import LesRepublicains
from .ccc35 import CCC35
from .mqtt import MQTTPlugin
68 changes: 68 additions & 0 deletions plugins/mqtt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import asyncio
from ircbot.plugin import BotPlugin
import paho.mqtt.client as mqtt
import json
from datetime import datetime

class MQTTPlugin(BotPlugin):
"""
MQTT subscriber for lechbot.
"""

def __init__(self, host, topics, ratelimit):
self.host = host
self.topics = topics
self.last_seen_keys = {}
self.ratelimit = ratelimit

self.mqtt = mqtt.Client()
self.mqtt.on_connect = self.on_mqtt_connect
self.mqtt.on_message = self.on_mqtt_message

def on_mqtt_connect(self, client, userdata, flags, rc):
self.bot.log.info("Connected to MQTT")
for topic in self.topics:
self.mqtt.subscribe(topic)

def on_mqtt_message(self, client, userdata, msg):
payload = msg.payload.decode()
try:
data = json.loads(payload)
except Exception as e:
self.bot.log.info("Got incorrect message: '%s' (%s)", str(payload), e)
return

key = data.get("key")
text = data.get("text")
now = datetime.now()

if not (key and text):
self.bot.log.info("Missing informations in the message: " + repr(data))
return

# Rate limit
last_seen = self.last_seen_keys.get(key, datetime.fromtimestamp(0))
if (now - last_seen).total_seconds() < self.ratelimit.get(key, 0):
self.bot.log.info("Got rate-limited event " + repr({
'key': key, 'time': now, 'text': text
}) + " / Last seen: " + repr(last_seen))
return

self.say(text)
self.bot.log.debug("Got " + repr({
'key': key, 'text': text
}))

self.last_seen_keys[key] = now


@asyncio.coroutine
def loop(self):
self.mqtt.loop()

@BotPlugin.on_connect
def startup(self):
self.bot.log.debug("Starting.")
self.mqtt.connect(self.host)
while True:
yield from self.loop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be simpler to simply yield from self.mqtt.loop() here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't a asyc function, it doesn't work

Traceback (most recent call last):
  File "/usr/lib64/python3.4/asyncio/tasks.py", line 240, in _step
    result = coro.send(None)
  File "/home/etnarek/Projet/urlab/lechbot/plugins/mqtt.py", line 64, in startup
    yield from self.mqtt.loop()
TypeError: 'int' object is not iterable

1 change: 1 addition & 0 deletions requirements-frozen.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ pytz==2016.2
six==1.10.0
txaio==2.2.2
roman==2.0.0
paho-mqtt==1.4.0
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ autobahn
beautifulsoup4
pytz
roman
paho-mqtt