Perspicuous documentation for OpenTwins
platform by Eshan Jayasundara.
What is a digital twin in a high-level view? Watch This
- Minikube single node kubernetes cluster (for container ocaustration)
- Kubctl (kubernetes command line interface for intract with minikube cluster)
- Docker (for containarization)
- Helm (to copy charts from the helm repository)
- Linux base Operating System (tested on ubuntu)
minikube start --cpus 4 --disk-size 40gb --memory 8192 -p <name-of-the-profile>
change name-of-the-profile
field as you need but keep it remember for later steps.
kubectl create namespace <your-own-name-space>
replace your-own-name-space
with as you wish but keep it remembered for future use.
helm repo add ertis https://ertis-research.github.io/Helm-charts/
Install OpenTwins (https://ertis-research.github.io/opentwins/docs/installation/using-helm)
helm upgrade --install opentwins ertis/OpenTwins -n <your-own-name-space> --dependency-update --debug
- To check the status of pods
kubectl get pods -n <your-own-name-space>
- To check the status of services
kubectl get services -n <your-own-name-space>
- To check the deployments
kubectl get deployments -n <your-own-name-space>
you may need to check those with executing the above commands repeatedly in another terminal
kubectl delete pod <pod-name> -n <your-own-name-space>
Take a note of them. Run the above command by replacing pod_name
with the actual pod name and your-own-name-space
with the namespace that you have noted before.
minikube service -n <your-own-name-space> opentwins-grafana --url -p <name-of-the-profile>
The default username and password both are “admin”
minikube stop -p <name-of-the-profile>
minikube start -p <name-of-the-profile>
Ensure everything is back up and running
Home -> Administration -> Plugins -> OpenTwins
The plugin needs some configuration. The default ditto username and password both are ditto
. For the developer account username is devops
and password is foobar
. As for the URLs use the outputs of the following commands.
minikube service -n <your-own-name-space> opentwins-ditto-nginx --url -p <name-of-the-profile>
minikube service -n <your-own-name-space> opentwins-ditto-extended-api --url -p <name-of-the-profile>
Make sure to replace name-of-the-profile
and your-own-name-space
.
Here I'm going to create a digital twin for a sensor that measures the temperature, humidity, and CO2 level.
click on create new type
button
provide the necessary details and fill the form. Don't forget to use the default_policy
as the policy.
Add features
finally, the preview of the type in Ditto protocol looks like this
click on create new twin
button
provide the necessary namespace and id. Don't use the same names used for creating twins because they are the instances of the type which we have created above.
from existing types
choose the one just created before as the type of twin
.
Important:
Replace uri s with actual ones in your cluster including the curl command url.
Python script and this connection request both should have the same topic.
curl -i -u devops:foobar -X POST http://192.168.49.2:30525/api/2/connections -H 'Content-Type: application/json' -d '{
"name": "mymqtt_connection",
"connectionType": "mqtt-5",
"connectionStatus": "open",
"uri": "tcp://192.168.49.2:30511",
"sources": [
{
"addresses": [
"telemetry/#"
],
"consumerCount": 1,
"qos": 1,
"authorizationContext": [
"nginx:ditto"
],
"headerMapping": {
"correlation-id": "{{header:correlation-id}}",
"namespace": "{{ entity:namespace }}",
"content-type": "{{header:content-type}}",
"connection": "{{ connection:id }}",
"id": "{{ entity:id }}",
"reply-to": "{{header:reply-to}}"
},
"replyTarget": {
"address": "{{header:reply-to}}",
"headerMapping": {
"content-type": "{{header:content-type}}",
"correlation-id": "{{header:correlation-id}}"
},
"expectedResponseTypes": [
"response",
"error"
],
"enabled": true
}
}
],
"targets": [],
"clientCount": 1,
"failoverEnabled": true,
"validateCertificates": true,
"processorPoolSize": 1,
"tags": []
}'
find ditto url with,
minikube service -n <your-own-namespace> opentwins-ditto-nginx --url -p <name-of-the-profile>
find mosquitto url with,
minikube service -n <your-own-namespace> opentwins-mosquitto --url -p <name-of-the-profile>
# check connections with
curl -i -X GET -u devops:foobar http://192.168.49.2:30525/api/2/connections
# delete a connection
curl -i -X DELETE -u devops:foobar http://192.168.49.2:30525/api/2/connections/<connection-name>
You may use ditto protocol preview of digital twin to assign necessary values to the variables in this python file.
import paho.mqtt.client as mqtt
import random
import time
import json
# Digital twin info
namespace = "airquality"
sensor_name = "mysensor"
# MQTT info
broker = "192.168.49.2" # Replace with your MQTT broker address
port = 30511 # Replace with your MQTT broker port
topic = "telemetry/" # Topic where data will be published
# Authentication info (replace with actual username and password)
username = ""
password = ""
# MQTT connection
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Successful connection")
else:
print(f"Connection failed with code {rc}")
client = mqtt.Client()
client.on_connect = on_connect
client.username_pw_set(username, password)
client.connect(broker, port, 60)
def generate_air_data():
temperature = random.uniform(15, 45)
humidity = random.uniform(0, 100)
co2 = random.uniform(0, 5)
return temperature, humidity, co2
def get_ditto_protocol_value_air(time, temperature, humidity, co2):
return {
"temperature": {
"properties": {
"value": temperature,
"time": time
}
},
"humidity": {
"properties": {
"value": humidity,
"time": time
}
},
"co2": {
"properties": {
"value": co2,
"time": time
}
}
}
def get_ditto_protocol_msg(name, value):
return {
"topic": f"{namespace}/{name}/things/twin/commands/merge",
"headers": {
"content-type": "application/merge-patch+json"
},
"path": "/features",
"value": value
}
# Send data
try:
client.loop_start() # Start the MQTT loop
while True:
t = round(time.time() * 1000) # Unix time in ms
# Sensor twin
temperature, humidity, co2 = generate_air_data()
msg = get_ditto_protocol_msg(sensor_name, get_ditto_protocol_value_air(t, temperature, humidity, co2))
client.publish(topic + namespace + "/" + sensor_name, json.dumps(msg))
print(f"{sensor_name} data published")
time.sleep(5)
except KeyboardInterrupt:
client.loop_stop() # Stop the MQTT loop
client.disconnect()
After configuring all, run the Python script. Twin should show the published information as shown in the image below,
For Kafka to work, it is necessary to install ZooKeeper beforehand. In addition, CMAK, a tool to manage Apache Kafka, will be used to make it easier to use. Then, for the deployment, the pod-zookeeper.yaml, svc-zookeeper.yaml, pod-kafka.yaml, svc-kafka.yaml, deploy-kafka-manager.yaml and svc-kafka-manager.yaml files will be needed. Once you have them, you only need to apply them to the chosen namespace.
# envirinment variable
export NS = <your-own-name-space> # replace this with your namespace
kubectl apply -f pod-zookeeper.yaml -n $NS
kubectl apply -f svc-zookeeper.yaml -n $NS
kubectl apply -f pod-kafka.yaml -n $NS
kubectl apply -f svc-kafka.yaml -n $NS
kubectl apply -f deploy-kafka-manager.yaml -n $NS
kubectl apply -f svc-kafka-manager.yaml -n $NS
After manual deployment all services may look like this,
kafka-cluster LoadBalancer
kafka-manager NodePort
opentwins-ditto-extended-api NodePort
opentwins-ditto-gateway ClusterIP
opentwins-ditto-nginx NodePort
opentwins-grafana NodePort
opentwins-influxdb2 NodePort
opentwins-mongodb NodePort
opentwins-mosquitto NodePort
zookeeper NodePort
If there is a service type different from the above, please use the command below and edit the service type,
kubectl edit service <service-name> -n <your-own-namespace>
replace <service-name>
and <your-own-namespace>
with actual names.
Important:
Replace uri s with actual ones in your cluster including the curl command URL.
There are two methods, please use only one from below.
- 1st method
curl -i -X POST -u devops:foobar -H 'Content-Type: application/json' --data '{
"targetActorSelection": "/system/sharding/connection",
"headers": {
"aggregate": false
},
"piggybackCommand": {
"type": "connectivity.commands:createConnection",
"connection": {
"id": "kafka-connection",
"connectionType": "kafka",
"connectionStatus": "open",
"failoverEnabled": true,
"uri": "tcp://192.168.49.2:31102",
"specificConfig": {
"bootstrapServers": "192.168.49.2:31102",
"saslMechanism": "plain"
},
"sources": [],
"targets": [
{
"address": "digitaltwins",
"topics": [
"_/_/things/twin/events",
"_/_/things/live/messages"
],
"authorizationContext": [
"nginx:ditto"
]
}
]
}
}
}' http://192.168.49.2:30525/devops/piggyback/connectivity
- 2nd method
curl -i -u devops:foobar -X POST http://192.168.49.2:30525/api/2/connections -H 'Content-Type: application/json' -d '{
"name": "new-connetion",
"connectionType": "kafka",
"connectionStatus": "open",
"failoverEnabled": true,
"uri": "tcp://192.168.49.2:31102",
"specificConfig": {
"bootstrapServers": "192.168.49.2:31102",
"saslMechanism": "plain"
},
"sources": [],
"targets": [
{
"address": "digitaltwins",
"topics": [
"_/_/things/twin/events",
"_/_/things/live/messages"
],
"authorizationContext": [
"nginx:ditto"
]
}
]
}'
find ditto url with,
minikube service -n <your-own-namespace> opentwins-ditto-nginx --url -p <name-of-the-profile>
find kafka url with,
minikube service -n <your-own-namespace> kafka-cluster --url -p <name-of-the-profile>
Ensure the connections were configured in ditto
# check connections with
curl -i -X GET -u devops:foobar http://192.168.49.2:30525/api/2/connections
# delete a connection
curl -i -X DELETE -u devops:foobar http://192.168.49.2:30525/api/2/connections/<connection-name>
Influxdb pod comes default with the minikube cluster which you have just created initially,
access influxdb + telegraf URL with
kubectl get services -n <your-own-name-space> # list all services
minikube service -n <your-own-name-space> <influxdb-service-name> --url -p <name-of-the-profile>
user name: admin
password: password
generate an API token with all permissions and copy it to chipboard using Influxdb UI copy the apikey/token by selecting and right clicking on it.
Click Create a configuration Choose "default" bucket and kafka-consumer plugin download the configuration which already displays kafka-consumer as input plugin and influxdb_v2 as output plugin after replacing the token, ClusterIP, relevent ports of kafka broker and influxdb services
accessing configmaps
kubectl get configmaps -n <your-own-namespace>
editing them one by one
kubectl edit configmap <telegraf> -n <your-own-namespace>
telegraf
- represents the name of any configmap including the substring "telegraf"
edit all configmaps one by one using the downloaded file (most of the time it may look like the below example)
for example:
data:
telegraf.conf: |
[agent]
collection_jitter = "0s"
debug = true
flush_interval = "10s"
flush_jitter = "0s"
hostname = "" #
interval = "10s"
metric_batch_size = 1000
metric_buffer_limit = 10000
omit_hostname = false
precision = ""
[[outputs.influxdb_v2]]
bucket = "default"
organization = "opentwins"
token = <your-token-extracted>
urls = ["<influxdb-url>"]
[[inputs.kafka_consumer]]
brokers = ["<kafka-broker-url>"]
topics = ["digitaltwins"]
consumer_group = "telegraf_metrics_consumers"
offset = "oldest"
data_format = "json"
dont forget to replace <your-token-extracted>
with actual token.
find the relevent influxdb, kafka urls with,
minikube service -n <your-own-namespace> opentwins-influxdb2 --url -p <name-of-the-profile>
minikube service -n <your-own-namespace> kafka-cluster --url -p <name-of-the-profile>
redeploy the telegraf deployment
kubectl rollout restart deployment telegraf -n opentwins
Home -> Connections -> Connect data -> Search for InfluxDB
create a new InfluxDB data source
Configure it as sown in the images below
minikube service -n <your-own-namespace> opentwins-influxdb2 --url -p <name-of-the-profile>
find the URL with the above command
create a new dashboard
Home -> Dashboards
choose InfluxDB as the data source for querying and gauge
as the visualization. Add a title. Then, apply some overrides to change the display names
of the gauges. Don't forget to save the changes.
example query:
import "strings"
from(bucket: "default")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "kafka_consumer")
|> filter(fn: (r) => r["host"] == "telegraf-57cfdcb85c-dgflc")
|> filter(fn: (r) => r["_field"] == "value_co2_properties_value" or r["_field"] == "value_humidity_properties_value" or r["_field"] == "value_temperature_properties_value")
filter(fn: (r) => r["host"] == "telegraf-57cfdcb85c-dgflc")
telegraf-57cfdcb85c-dgflc
is the name of telegraf pod. I had to change this wehenever restarting the minikube cluster because I got different telegraf pod names after restarting the minikube cluster using minikube stop -p <name-of-the-profile>
and minikube start -p <name-of-the-profile>
and the pod name also includes with the time series data.
This issue was fixed by setting the number of replicas in telegraf deployment to 1