Skip to content

Commit

Permalink
Add kairos-bridge to forward NGSI-LD Data from Kafka to Cassandra
Browse files Browse the repository at this point in the history
- Kairosdb bridge is added in docker-compose and built from mqtt-bridge repo
- E2E added as bats tests
- Added Bats test to test target in Makefile

Close Open-IoT-Service-Platform#693
See also Open-IoT-Service-Platform#695 Open-IoT-Service-Platform#665
  • Loading branch information
wagmarcel committed Dec 11, 2022
1 parent 20336dc commit 2d9b716
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 1 deletion.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ test: OISP_TESTS_SKIP_SCALE ?= "true"
test: prepare-tests test-prep-only
kubectl -n $(NAMESPACE) exec $(DEBUGGER_POD) -c debugger \
-- /bin/bash -c "cd /home/$(CURRENT_DIR_BASE)/tests && make test TERM=xterm NAMESPACE=$(NAMESPACE) OISP_TESTS_SKIP_SCALE=$(OISP_TESTS_SKIP_SCALE)"
@$(call msg,"Starting the e2e bats testing ...");
@cd tests/bats && bats test-*

# ==============
# BUILD COMMANDS
Expand Down
7 changes: 6 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ services:
mqtt-gateway:
image: ${DOCKER_PREFIX:-oisp}/mqtt-gateway:${DOCKER_TAG}
build:
context: ./oisp-mqtt-gw
context: ./oisp-mqtt-gw/
dockerfile: Dockerfile
kafka-bridge:
image: ${DOCKER_PREFIX:-oisp}/kafka-bridge:${DOCKER_TAG}
build:
context: ./oisp-mqtt-gw/KafkaBridge
dockerfile: Dockerfile
grafana:
image: ${DOCKER_PREFIX:-oisp}/grafana:${DOCKER_TAG}
Expand Down
26 changes: 26 additions & 0 deletions kubernetes/templates/kafkabridges/bridge-configmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
# yamllint disable rule:line-length
# yamllint disable rule:braces
apiVersion: v1
kind: ConfigMap
metadata:
name: bridge-configmap
namespace: {{ .Release.Namespace }}
labels:
app: bridge-configmap
data:
config.json: |
{
"kafka": {
"brokers": [{{ .Values.kafka.fullBrokerList | quote }}]
},
"kairosdb": {
"topic": {{ .Values.kafkaBridge.kairosdb.listenTopic | quote }},
"hostname": {{ .Values.backend.tsdbUri | quote }},
"port": {{ .Values.backend.tsdbPort | quote }},
"protocol": "http:"
},
"logger": {
"loglevel": "info"
}
}
60 changes: 60 additions & 0 deletions kubernetes/templates/kafkabridges/kairosdb-bridge-deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
---
# yamllint disable rule:line-length
# yamllint disable rule:braces
apiVersion: apps/v1
kind: Deployment
metadata:
name: kairosdb-bridge
namespace: {{ .Release.Namespace }}
labels:
app: kairosdb-bridge
spec:
replicas: {{ .Values.kafkaBridge.kairosdb.replicaCount }}
selector:
matchLabels:
app: kairosdb-bridge
template:
metadata:
labels:
app: kairosdb-bridge
annotations:
checksum/config: {{ include (print $.Template.BasePath "/kafkabridges/bridge-configmap.yaml") . | sha256sum }}
spec:
containers:
- name: kairosdb-bridge
{{ if .Values.use_local_registry }}
image: k3d-iff.localhost:12345/{{ .Values.imagePrefix }}/kafka-bridge:{{ .Values.tag }}
{{ else }}
image: {{ .Values.imagePrefix }}/kafka-bridge:{{ .Values.tag }}
{{ end }}
command: ["node"]
args: ["/opt/kairosdb/app.js"]
imagePullPolicy: IfNotPresent
livenessProbe:
exec:
command:
- cat
- /tmp/healthy
initialDelaySeconds: 300
readinessProbe:
exec:
command:
- cat
- /tmp/ready
initialDelaySeconds: 5
volumeMounts:
- name: config
mountPath: /opt/config
readOnly: true
resources:
volumes:
- name: config
configMap:
# Provide the name of the ConfigMap you want to mount.
name: bridge-configmap
# An array of keys from the ConfigMap to create as files
items:
- key: "config.json"
path: "config.json"
imagePullSecrets:
- name: dockercred
6 changes: 6 additions & 0 deletions kubernetes/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ kafka:
retries: 10
maxPayloadSize: "20000000"

kafkaBridge:
kairosdb:
repliacCount: 1
listenTopic: "iff.ngsild.attributes"


zookeeper:
namespace: zookeeper
name: zookeeper
Expand Down
32 changes: 32 additions & 0 deletions tests/bats/test-kairos-bridge/1-kairosdb-bridge-up.bats
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env bats
# Copyright (c) 2022 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

load "../lib/utils"
load "../lib/detik"

# shellcheck disable=SC2034 # these variables are used by detik
DETIK_CLIENT_NAME="kubectl"
# shellcheck disable=SC2034
DETIK_CLIENT_NAMESPACE="oisp"
# shellcheck disable=SC2034
DETIK_DEBUG="true"


@test "verify that kairosdb bridge is up and running" {
run try "at most 30 times every 60s to find 1 pod named 'kairosdb-bridge' with 'status.containerStatuses[0].ready' being 'true'"
[ "$status" -eq 0 ]

}
183 changes: 183 additions & 0 deletions tests/bats/test-kairos-bridge/2-kairosdb-bridge-test.bats
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#!/usr/bin/env bats

if [ -z "${SELF_HOSTED_RUNNER}" ]; then
SUDO="sudo -E"
fi
DEBUG=${DEBUG:-false} # set this to true to disable starting and stopping of kubefwd
SKIP= # set =skip to skip all test (and only remove $SKIP from the test you are interested in)
OISPNAMESPACE=oisp
IFFNAMESPACE=iff
KAIROSDB_SERVICE=$(kubectl -n oisp -l app=kairosdb get services -o jsonpath='{.items[0].metadata.name}')
KAIROSDB_PORT=$(kubectl -n oisp -l app=kairosdb get services -o jsonpath='{.items[0].spec.ports[0].port}')
KAIROSDB_URL=${KAIROSDB_SERVICE}.${OISPNAMESPACE}:${KAIROSDB_PORT}
KAIROSDB_PATH_GET_DATAPOINTS=/api/v1/datapoints/query
ATTRIBUTES_PROPERTY=/tmp/property.txt
ATTRIBUTES_PROPERTY2=/tmp/property2.txt
ATTRIBUTES_RELATIONSHIP=/tmp/relationship.txt
KAIROSDB_METRICS=/tmp/metrics
KAIROSDB_METRICS_RELATIONSHIP=/tmp/metrics_relationship
ATTRIBUTES_TOPIC=iff.ngsild.attributes
KAFKA_BOOTSTRAP=$(kubectl -n oisp get cm/oisp-config -o jsonpath='{.data.kafka}'| jq .uri| tr -d '"')
URN=urn:iff:serialnr:1
URN2=urn:iff:serialnr:2
STATE=https://industry-fusion.com/types/v0.9/state
REL=https://industry-fusion.com/types/v0.9/relationship
IDSTATE="${URN}\\\\${STATE}"
IDREL="${URN}\\\\${REL}"
VALUE="state"
VALUE2="1"
PROPERTY=https://uri.etsi.org/ngsi-ld/Property
RELATIONSHIP=https://uri.etsi.org/ngsi-ld/Relationship

cat << EOF > ${ATTRIBUTES_PROPERTY}
{
"id": "${IDSTATE}",
"entityId": "${URN}",
"name": "${STATE}",
"type": "${PROPERTY}",
"https://uri.etsi.org/ngsi-ld/hasValue": "${VALUE}",
"nodeType": "@id",
"index": 0
}
EOF

cat << EOF > ${ATTRIBUTES_PROPERTY2}
{
"id": "${IDSTATE}",
"entityId": "${URN}",
"name": "${STATE}",
"type": "${PROPERTY}",
"https://uri.etsi.org/ngsi-ld/hasValue": "${VALUE2}",
"nodeType": "@value",
"valueType": "http://www.w3.org/2001/XMLSchema#string",
"index": 0
}
EOF

cat << EOF > ${ATTRIBUTES_RELATIONSHIP}
{
"id": "${IDREL}",
"entityId": "${URN}",
"name": "${REL}",
"type": "${RELATIONSHIP}",
"https://uri.etsi.org/ngsi-ld/hasObject": "${URN2}",
"nodeType": "@id",
"index": 0
}
EOF

cat << EOF > ${KAIROSDB_METRICS}
{
"start_relative": {
"value": 5,
"unit": "seconds"
},
"metrics": [
{
"name": "default\\\\${IDSTATE}",
"limit": 10
}
]
}
EOF

cat << EOF > ${KAIROSDB_METRICS_RELATIONSHIP}
{
"start_relative": {
"value": 5,
"unit": "seconds"
},
"metrics": [
{
"name": "default\\\\${IDREL}",
"limit": 10
}
]
}
EOF

# send data to kafka bridge
# $1: file to send
# $2: kafka topic
send_to_kafka_bridge() {
tr -d '\n' <"$1" | kafkacat -P -t "$2" -b "${KAFKA_BOOTSTRAP}"
}

# receive datapoints
# $1: payload to post
get_datapoints() {
curl -vv -X POST "${KAIROSDB_URL}${KAIROSDB_PATH_GET_DATAPOINTS}" -d @"$1"
}


# check sample[0]
# $1: object
# $2: expected sample_size
# $3: expected name
# $4: expected value
# $5: expected type
# $6: expected nodeType
check_sample_size() {
echo "# Check_sample_size with $1 $2 $3 $4 $5"
sample_size=$(echo "$1" | jq '.queries[0].sample_size' | tr -d '"')
[ "$sample_size" = "$2" ] || { echo "# wrong value for field $2: $sample_size!=$2"; return 1; }
name=$(echo "$1" | jq '.queries[0].results[0].name' | tr -d '"')
[ "$name" = "$3" ] || { echo "# wrong value for field $3: $name!=$3"; return 1; }
value=$(echo "$1" | jq '.queries[0].results[0].values[0][1]' | tr -d '"')
[ "$value" = "$4" ] || { echo "# wrong value for field $4: $value!=$4"; return 1; }
type=$(echo "$1" | jq '.queries[0].results[0].tags.type[0]' | tr -d '"')
[ "$type" = "$5" ] || { echo "# wrong value for field $5: $type!=$5"; return 1; }
nodeType=$(echo "$1" | jq '.queries[0].results[0].tags.nodeType[0]' | tr -d '"')
[ "$nodeType" = "$6" ] || { echo "# wrong value for field $6: $nodeType!=$6"; return 1; }
return 0

}

setup() {
kubectl -n oisp label svc -l "app=kairosdb" app.kubernetes.io/name=kairosdb --overwrite=true
# shellcheck disable=SC2086
[ $DEBUG = "true" ] || (exec ${SUDO} kubefwd -n ${IFFNAMESPACE} -n ${OISPNAMESPACE} -l "app.kubernetes.io/name in (kafka, kairosdb)" svc) &
echo "# launched kubefwd for kafka, wait some seconds to give kubefwd to launch the services"
sleep 3
}
teardown(){
echo "# now killing kubefwd"
# shellcheck disable=SC2086
[ $DEBUG = "true" ] || ${SUDO} killall kubefwd
}

@test "verify kairosdb-bridge is forwarding iri Property" {
$SKIP
echo "# Sending property to Kafka"
send_to_kafka_bridge ${ATTRIBUTES_PROPERTY} ${ATTRIBUTES_TOPIC}
sleep 2
data=$(get_datapoints "${KAIROSDB_METRICS}")
echo "# result: $data"
echo "# Now checking result."
run check_sample_size "$data" "1" "default\\\\$IDSTATE" "$VALUE" "$PROPERTY" "@id"
[ "$status" -eq "0" ] || (echo -n "$output"; false)
}

@test "verify kairosdb-bridge is forwarding Relationship" {
$SKIP
echo "# Sending relationship to Kafka"
send_to_kafka_bridge ${ATTRIBUTES_RELATIONSHIP} ${ATTRIBUTES_TOPIC}
sleep 2
data=$(get_datapoints "${KAIROSDB_METRICS_RELATIONSHIP}")
echo "# result: $data"
echo "# Now checking result."
run check_sample_size "$data" "1" "default\\\\$IDREL" "$URN2" "$RELATIONSHIP" "@id"
[ "$status" -eq "0" ] || (echo -n "$output"; false)
}

@test "verify kairosdb-bridge is forwarding value Property" {
$SKIP
echo "# Sending property to Kafka"
send_to_kafka_bridge ${ATTRIBUTES_PROPERTY2} ${ATTRIBUTES_TOPIC}
sleep 2
data=$(get_datapoints "${KAIROSDB_METRICS}")
echo "# result: $data"
echo "# Now checking result."
run check_sample_size "$data" "1" "default\\\\$IDSTATE" "$VALUE2" "$PROPERTY" "@value"
[ "$status" -eq "0" ] || (echo -n "$output"; false)
}

0 comments on commit 2d9b716

Please sign in to comment.