Skip to content

Commit

Permalink
Merge pull request #28 from daniviga/reloaded
Browse files Browse the repository at this point in the history
Reloaded
  • Loading branch information
daniviga authored Sep 8, 2023
2 parents 681f99d + 465870a commit da92935
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 42 deletions.
13 changes: 4 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Playing with IoT

[![Build Status](https://travis-ci.com/daniviga/bite.svg?branch=master)](https://travis-ci.com/daniviga/bite)
![AGPLv3](./docs/.badges/agpl3.svg)
![Python 3.9](./docs/.badges/python.svg)
![Python 3.11](./docs/.badges/python.svg)
![MQTT](./docs/.badges/mqtt.svg)
![Moby](./docs/.badges/moby.svg)
![docker-compose 3.7+](./docs/.badges/docker-compose.svg)
Expand All @@ -13,13 +13,6 @@ This project is for educational purposes only. It does not implement any
authentication and/or encryption protocol, so it is not suitable for real
production.

![Application Schema](./docs/application_chart.png)

### Future implementations

- Broker HA via [VerneMQ clustering](https://docs.vernemq.com/clustering/introduction)
- Stream analytics via [Apache Spark](https://spark.apache.org/)

## Installation

### Requirements
Expand All @@ -36,8 +29,10 @@ The application stack is composed by the following components:
- [Django](https://www.djangoproject.com/) with
[Django REST framework](https://www.django-rest-framework.org/)
web application (running via `gunicorn` in production mode)
- `mqtt-to-db` custom daemon to dump telemetry into the timeseries database
- `dispatcher` custom daemon to dump telemetry into the Kafka queue
- `handler` custom daemon to dump telemetry into the timeseries database from the Kafka queue
- telemetry payload is stored as json object (via PostgreSQL JSON data type)
- [Kafka](https://kafka.apache.org/) broker
- [Timescale](https://www.timescale.com/) DB,
a [PostgreSQL](https://www.postgresql.org/) database with a timeseries extension
- [Mosquitto](https://mosquitto.org/) MQTT broker (see alternatives below)
Expand Down
9 changes: 9 additions & 0 deletions bite/bite/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,22 @@

STATIC_ROOT = '/srv/appdata/bite/static'

REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': []
}

SKIP_WHITELIST = True

MQTT_BROKER = {
'HOST': 'broker',
'PORT': '1883',
}

KAFKA_BROKER = {
'HOST': 'kafka',
'PORT': '9092',
}

# If no local_settings.py is availble in the current folder let's try to
# load it from the application root
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,27 @@
import json
import time
import paho.mqtt.client as mqtt
from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable
from asgiref.sync import sync_to_async
from asyncio_mqtt import Client
from aiomqtt import Client

from django.conf import settings
from django.core.management.base import BaseCommand
from django.core.exceptions import ObjectDoesNotExist

from api.models import Device
from telemetry.models import Telemetry

MQTT_HOST = settings.MQTT_BROKER['HOST']
MQTT_PORT = int(settings.MQTT_BROKER['PORT'])


class Command(BaseCommand):
help = 'MQTT to DB deamon'

MQTT_HOST = settings.MQTT_BROKER['HOST']
MQTT_PORT = int(settings.MQTT_BROKER['PORT'])
KAFKA_HOST = settings.KAFKA_BROKER['HOST']
KAFKA_PORT = int(settings.KAFKA_BROKER['PORT'])
producer = None

@sync_to_async
def get_device(self, serial):
try:
Expand All @@ -47,24 +51,23 @@ def get_device(self, serial):
return None

@sync_to_async
def store_telemetry(self, device, payload):
Telemetry.objects.create(
device=device,
transport='mqtt',
clock=payload['clock'],
payload=payload['payload']
def dispatch(self, message):
self.producer.send(
'telemetry', {"transport": 'mqtt',
"body": message}
)

async def mqtt_broker(self):
async with Client(MQTT_HOST, port=MQTT_PORT) as client:
async with Client(self.MQTT_HOST, port=self.MQTT_PORT) as client:
# use shared subscription for HA/balancing
await client.subscribe("$share/telemetry/#")
async with client.unfiltered_messages() as messages:
async with client.messages() as messages:
async for message in messages:
payload = json.loads(message.payload.decode('utf-8'))
device = await self.get_device(message.topic)
if device is not None:
await self.store_telemetry(device, payload)
message_body = json.loads(
message.payload.decode('utf-8'))
await self.dispatch(message_body)
else:
self.stdout.write(
self.style.ERROR(
Expand All @@ -74,13 +77,28 @@ def handle(self, *args, **options):
client = mqtt.Client()
while True:
try:
client.connect(MQTT_HOST, MQTT_PORT)
client.connect(self.MQTT_HOST, self.MQTT_PORT)
break
except (socket.gaierror, ConnectionRefusedError):
self.stdout.write(
self.style.WARNING('WARNING: Broker not available'))
self.style.WARNING('WARNING: MQTT broker not available'))
time.sleep(5)

while True:
try:
self.producer = KafkaProducer(
bootstrap_servers='{}:{}'.format(
self.KAFKA_HOST, self.KAFKA_PORT
),
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
retries=5
)
break
except NoBrokersAvailable:
self.stdout.write(
self.style.WARNING('WARNING: Kafka broker not available'))
time.sleep(5)

self.stdout.write(self.style.SUCCESS('INFO: Broker subscribed'))
self.stdout.write(self.style.SUCCESS('INFO: Brokers subscribed'))
client.disconnect()
asyncio.run(self.mqtt_broker())
76 changes: 76 additions & 0 deletions bite/telemetry/management/commands/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# BITE - A Basic/IoT/Example
# Copyright (C) 2020-2021 Daniele Viganò <[email protected]>
#
# BITE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# BITE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import json
import time
from kafka import KafkaConsumer
from kafka.errors import NoBrokersAvailable

from django.conf import settings
from django.core.management.base import BaseCommand
from django.core.exceptions import ObjectDoesNotExist

from api.models import Device
from telemetry.models import Telemetry


class Command(BaseCommand):
help = 'MQTT to DB deamon'

KAFKA_HOST = settings.KAFKA_BROKER['HOST']
KAFKA_PORT = int(settings.KAFKA_BROKER['PORT'])

def get_device(self, serial):
try:
return Device.objects.get(serial=serial)
except ObjectDoesNotExist:
return None

def store_telemetry(self, transport, message):
Telemetry.objects.create(
transport=transport,
device=self.get_device(message["device"]),
clock=message["clock"],
payload=message["payload"]
)

def handle(self, *args, **options):
while True:
try:
consumer = KafkaConsumer(
"telemetry",
bootstrap_servers='{}:{}'.format(
self.KAFKA_HOST, self.KAFKA_PORT
),
group_id="handler",
value_deserializer=lambda m: json.loads(m.decode('utf8')),
)
break
except NoBrokersAvailable:
self.stdout.write(
self.style.WARNING('WARNING: Kafka broker not available'))
time.sleep(5)

self.stdout.write(self.style.SUCCESS('INFO: Kafka broker subscribed'))
for message in consumer:
self.store_telemetry(
message.value["transport"],
message.value["body"]
)
consumer.unsuscribe()
8 changes: 4 additions & 4 deletions docker/django/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

FROM python:3.9-alpine AS builder
FROM python:3.11-alpine AS builder
RUN apk update && apk add gcc musl-dev postgresql-dev \
&& pip install psycopg2-binary

# ---

FROM python:3.9-alpine
FROM python:3.11-alpine
ENV PYTHONUNBUFFERED 1
ENV DJANGO_SETTINGS_MODULE "bite.settings"

RUN apk update && apk add --no-cache postgresql-libs \
&& wget https://github.com/jwilder/dockerize/releases/download/v0.6.1/dockerize-alpine-linux-amd64-v0.6.1.tar.gz -qO- \
&& wget https://github.com/jwilder/dockerize/releases/download/v0.7.0/dockerize-alpine-linux-amd64-v0.7.0.tar.gz -qO- \
| tar -xz -C /usr/local/bin
COPY --from=builder /usr/local/lib/python3.9/site-packages/ /usr/local/lib/python3.9/site-packages/
COPY --from=builder /usr/local/lib/python3.11/site-packages/ /usr/local/lib/python3.11/site-packages/
COPY --chown=1000:1000 requirements.txt /srv/app/bite/requirements.txt
RUN pip3 install --no-cache-dir -r /srv/app/bite/requirements.txt

Expand Down
10 changes: 9 additions & 1 deletion docker/docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ services:
ports:
- "${CUSTOM_DOCKER_IP:-0.0.0.0}:8000:8000"

kafka:
ports:
- "${CUSTOM_DOCKER_IP:-0.0.0.0}:9092:9092"

data-migration:
volumes:
- ../bite:/srv/app/bite
Expand All @@ -44,6 +48,10 @@ services:
volumes:
- ../bite:/srv/app/bite

mqtt-to-db:
dispatcher:
volumes:
- ../bite:/srv/app/bite

handler:
volumes:
- ../bite:/srv/app/bite
6 changes: 5 additions & 1 deletion docker/docker-compose.prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ services:
volumes:
- ./django/production.py.sample:/srv/app/bite/bite/production.py

mqtt-to-db:
dispatcher:
volumes:
- ./django/production.py.sample:/srv/app/bite/bite/production.py

handler:
volumes:
- ./django/production.py.sample:/srv/app/bite/bite/production.py
38 changes: 35 additions & 3 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,30 @@ services:
ports:
- "${CUSTOM_DOCKER_IP:-0.0.0.0}:1883:1883"

zookeeper:
image: confluentinc/cp-zookeeper:latest
networks:
- net
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181

kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
networks:
- net
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

ingress:
<<: *service_default
image: nginx:stable-alpine
Expand Down Expand Up @@ -104,13 +128,21 @@ services:
- staticdata:/srv/appdata/bite/static
command: ["python3", "manage.py", "collectstatic", "--noinput"]

mqtt-to-db:
dispatcher:
<<: *service_default
image: daniviga/bite
command: ["python3", "manage.py", "mqtt-to-db"]
command: ["python3", "manage.py", "dispatcher"]
networks:
- net
depends_on:
- broker

handler:
<<: *service_default
image: daniviga/bite
command: ["python3", "manage.py", "handler"]
networks:
- net
depends_on:
- data-migration
- timescale
- broker
4 changes: 2 additions & 2 deletions docker/ntpd/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

FROM alpine:3.15
FROM alpine:3.18

RUN apk update && apk add chrony && \
RUN apk add --no-cache chrony && \
chown -R chrony:chrony /var/lib/chrony
COPY ./chrony.conf /etc/chrony/chrony.conf

Expand Down
2 changes: 1 addition & 1 deletion docker/simulator/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

FROM python:3.9-alpine
FROM python:3.11-alpine

RUN pip3 install urllib3 paho-mqtt
COPY ./device_simulator.py /opt/bite/device_simulator.py
Expand Down
2 changes: 1 addition & 1 deletion docker/simulator/device_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def main():
parser.add_argument('-s', '--serial',
default=os.environ.get('IOT_SERIAL'),
help='IoT device serial number')
parser.add_argument('-d', '--delay', metavar='s', type=int,
parser.add_argument('-d', '--delay', metavar='s', type=float,
default=os.environ.get('IOT_DELAY', 10),
help='Delay between requests')
args = parser.parse_args()
Expand Down
2 changes: 1 addition & 1 deletion docs/.badges/python.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ ipython
flake8
pyinstrument
django-debug-toolbar
urllib3
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ djangorestframework
django-health-check
psycopg2-binary
paho-mqtt
asyncio-mqtt
kafka-python
aiomqtt
PyYAML
uritemplate
pygments
Expand Down

0 comments on commit da92935

Please sign in to comment.