This repository has been archived by the owner on Jan 8, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 16
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add kairos-bridge to forward NGSI-LD Data from Kafka to Cassandra
- Kairosdb bridge is added in docker-compose and built from mqtt-bridge repo - E2E added as bats tests - Added Bats test to test target in Makefile Close #693 See also #695 #665
- Loading branch information
Showing
7 changed files
with
315 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
--- | ||
# yamllint disable rule:line-length | ||
# yamllint disable rule:braces | ||
apiVersion: v1 | ||
kind: ConfigMap | ||
metadata: | ||
name: bridge-configmap | ||
namespace: {{ .Release.Namespace }} | ||
labels: | ||
app: bridge-configmap | ||
data: | ||
config.json: | | ||
{ | ||
"kafka": { | ||
"brokers": [{{ .Values.kafka.fullBrokerList | quote }}] | ||
}, | ||
"kairosdb": { | ||
"topic": {{ .Values.kafkaBridge.kairosdb.listenTopic | quote }}, | ||
"hostname": {{ .Values.backend.tsdbUri | quote }}, | ||
"port": {{ .Values.backend.tsdbPort | quote }}, | ||
"protocol": "http:" | ||
}, | ||
"logger": { | ||
"loglevel": "info" | ||
} | ||
} |
60 changes: 60 additions & 0 deletions
60
kubernetes/templates/kafkabridges/kairosdb-bridge-deployment.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
--- | ||
# yamllint disable rule:line-length | ||
# yamllint disable rule:braces | ||
apiVersion: apps/v1 | ||
kind: Deployment | ||
metadata: | ||
name: kairosdb-bridge | ||
namespace: {{ .Release.Namespace }} | ||
labels: | ||
app: kairosdb-bridge | ||
spec: | ||
replicas: {{ .Values.kafkaBridge.kairosdb.replicaCount }} | ||
selector: | ||
matchLabels: | ||
app: kairosdb-bridge | ||
template: | ||
metadata: | ||
labels: | ||
app: kairosdb-bridge | ||
annotations: | ||
checksum/config: {{ include (print $.Template.BasePath "/kafkabridges/bridge-configmap.yaml") . | sha256sum }} | ||
spec: | ||
containers: | ||
- name: kairosdb-bridge | ||
{{ if .Values.use_local_registry }} | ||
image: k3d-iff.localhost:12345/{{ .Values.imagePrefix }}/kafka-bridge:{{ .Values.tag }} | ||
{{ else }} | ||
image: {{ .Values.imagePrefix }}/kafka-bridge:{{ .Values.tag }} | ||
{{ end }} | ||
command: ["node"] | ||
args: ["/opt/kairosdb/app.js"] | ||
imagePullPolicy: IfNotPresent | ||
livenessProbe: | ||
exec: | ||
command: | ||
- cat | ||
- /tmp/healthy | ||
initialDelaySeconds: 300 | ||
readinessProbe: | ||
exec: | ||
command: | ||
- cat | ||
- /tmp/ready | ||
initialDelaySeconds: 5 | ||
volumeMounts: | ||
- name: config | ||
mountPath: /opt/config | ||
readOnly: true | ||
resources: | ||
volumes: | ||
- name: config | ||
configMap: | ||
# Provide the name of the ConfigMap you want to mount. | ||
name: bridge-configmap | ||
# An array of keys from the ConfigMap to create as files | ||
items: | ||
- key: "config.json" | ||
path: "config.json" | ||
imagePullSecrets: | ||
- name: dockercred |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
#!/usr/bin/env bats | ||
# Copyright (c) 2022 Intel Corporation | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
load "../lib/utils" | ||
load "../lib/detik" | ||
|
||
# shellcheck disable=SC2034 # these variables are used by detik | ||
DETIK_CLIENT_NAME="kubectl" | ||
# shellcheck disable=SC2034 | ||
DETIK_CLIENT_NAMESPACE="oisp" | ||
# shellcheck disable=SC2034 | ||
DETIK_DEBUG="true" | ||
|
||
|
||
@test "verify that kairosdb bridge is up and running" { | ||
run try "at most 30 times every 60s to find 1 pod named 'kairosdb-bridge' with 'status.containerStatuses[0].ready' being 'true'" | ||
[ "$status" -eq 0 ] | ||
|
||
} |
183 changes: 183 additions & 0 deletions
183
tests/bats/test-kairos-bridge/2-kairosdb-bridge-test.bats
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
#!/usr/bin/env bats | ||
|
||
if [ -z "${SELF_HOSTED_RUNNER}" ]; then | ||
SUDO="sudo -E" | ||
fi | ||
DEBUG=${DEBUG:-false} # set this to true to disable starting and stopping of kubefwd | ||
SKIP= # set =skip to skip all test (and only remove $SKIP from the test you are interested in) | ||
OISPNAMESPACE=oisp | ||
IFFNAMESPACE=iff | ||
KAIROSDB_SERVICE=$(kubectl -n oisp -l app=kairosdb get services -o jsonpath='{.items[0].metadata.name}') | ||
KAIROSDB_PORT=$(kubectl -n oisp -l app=kairosdb get services -o jsonpath='{.items[0].spec.ports[0].port}') | ||
KAIROSDB_URL=${KAIROSDB_SERVICE}.${OISPNAMESPACE}:${KAIROSDB_PORT} | ||
KAIROSDB_PATH_GET_DATAPOINTS=/api/v1/datapoints/query | ||
ATTRIBUTES_PROPERTY=/tmp/property.txt | ||
ATTRIBUTES_PROPERTY2=/tmp/property2.txt | ||
ATTRIBUTES_RELATIONSHIP=/tmp/relationship.txt | ||
KAIROSDB_METRICS=/tmp/metrics | ||
KAIROSDB_METRICS_RELATIONSHIP=/tmp/metrics_relationship | ||
ATTRIBUTES_TOPIC=iff.ngsild.attributes | ||
KAFKA_BOOTSTRAP=$(kubectl -n oisp get cm/oisp-config -o jsonpath='{.data.kafka}'| jq .uri| tr -d '"') | ||
URN=urn:iff:serialnr:1 | ||
URN2=urn:iff:serialnr:2 | ||
STATE=https://industry-fusion.com/types/v0.9/state | ||
REL=https://industry-fusion.com/types/v0.9/relationship | ||
IDSTATE="${URN}\\\\${STATE}" | ||
IDREL="${URN}\\\\${REL}" | ||
VALUE="state" | ||
VALUE2="1" | ||
PROPERTY=https://uri.etsi.org/ngsi-ld/Property | ||
RELATIONSHIP=https://uri.etsi.org/ngsi-ld/Relationship | ||
|
||
cat << EOF > ${ATTRIBUTES_PROPERTY} | ||
{ | ||
"id": "${IDSTATE}", | ||
"entityId": "${URN}", | ||
"name": "${STATE}", | ||
"type": "${PROPERTY}", | ||
"https://uri.etsi.org/ngsi-ld/hasValue": "${VALUE}", | ||
"nodeType": "@id", | ||
"index": 0 | ||
} | ||
EOF | ||
|
||
cat << EOF > ${ATTRIBUTES_PROPERTY2} | ||
{ | ||
"id": "${IDSTATE}", | ||
"entityId": "${URN}", | ||
"name": "${STATE}", | ||
"type": "${PROPERTY}", | ||
"https://uri.etsi.org/ngsi-ld/hasValue": "${VALUE2}", | ||
"nodeType": "@value", | ||
"valueType": "http://www.w3.org/2001/XMLSchema#string", | ||
"index": 0 | ||
} | ||
EOF | ||
|
||
cat << EOF > ${ATTRIBUTES_RELATIONSHIP} | ||
{ | ||
"id": "${IDREL}", | ||
"entityId": "${URN}", | ||
"name": "${REL}", | ||
"type": "${RELATIONSHIP}", | ||
"https://uri.etsi.org/ngsi-ld/hasObject": "${URN2}", | ||
"nodeType": "@id", | ||
"index": 0 | ||
} | ||
EOF | ||
|
||
cat << EOF > ${KAIROSDB_METRICS} | ||
{ | ||
"start_relative": { | ||
"value": 5, | ||
"unit": "seconds" | ||
}, | ||
"metrics": [ | ||
{ | ||
"name": "default\\\\${IDSTATE}", | ||
"limit": 10 | ||
} | ||
] | ||
} | ||
EOF | ||
|
||
cat << EOF > ${KAIROSDB_METRICS_RELATIONSHIP} | ||
{ | ||
"start_relative": { | ||
"value": 5, | ||
"unit": "seconds" | ||
}, | ||
"metrics": [ | ||
{ | ||
"name": "default\\\\${IDREL}", | ||
"limit": 10 | ||
} | ||
] | ||
} | ||
EOF | ||
|
||
# send data to kafka bridge | ||
# $1: file to send | ||
# $2: kafka topic | ||
send_to_kafka_bridge() { | ||
tr -d '\n' <"$1" | kafkacat -P -t "$2" -b "${KAFKA_BOOTSTRAP}" | ||
} | ||
|
||
# receive datapoints | ||
# $1: payload to post | ||
get_datapoints() { | ||
curl -vv -X POST "${KAIROSDB_URL}${KAIROSDB_PATH_GET_DATAPOINTS}" -d @"$1" | ||
} | ||
|
||
|
||
# check sample[0] | ||
# $1: object | ||
# $2: expected sample_size | ||
# $3: expected name | ||
# $4: expected value | ||
# $5: expected type | ||
# $6: expected nodeType | ||
check_sample_size() { | ||
echo "# Check_sample_size with $1 $2 $3 $4 $5" | ||
sample_size=$(echo "$1" | jq '.queries[0].sample_size' | tr -d '"') | ||
[ "$sample_size" = "$2" ] || { echo "# wrong value for field $2: $sample_size!=$2"; return 1; } | ||
name=$(echo "$1" | jq '.queries[0].results[0].name' | tr -d '"') | ||
[ "$name" = "$3" ] || { echo "# wrong value for field $3: $name!=$3"; return 1; } | ||
value=$(echo "$1" | jq '.queries[0].results[0].values[0][1]' | tr -d '"') | ||
[ "$value" = "$4" ] || { echo "# wrong value for field $4: $value!=$4"; return 1; } | ||
type=$(echo "$1" | jq '.queries[0].results[0].tags.type[0]' | tr -d '"') | ||
[ "$type" = "$5" ] || { echo "# wrong value for field $5: $type!=$5"; return 1; } | ||
nodeType=$(echo "$1" | jq '.queries[0].results[0].tags.nodeType[0]' | tr -d '"') | ||
[ "$nodeType" = "$6" ] || { echo "# wrong value for field $6: $nodeType!=$6"; return 1; } | ||
return 0 | ||
|
||
} | ||
|
||
setup() { | ||
kubectl -n oisp label svc -l "app=kairosdb" app.kubernetes.io/name=kairosdb --overwrite=true | ||
# shellcheck disable=SC2086 | ||
[ $DEBUG = "true" ] || (exec ${SUDO} kubefwd -n ${IFFNAMESPACE} -n ${OISPNAMESPACE} -l "app.kubernetes.io/name in (kafka, kairosdb)" svc) & | ||
echo "# launched kubefwd for kafka, wait some seconds to give kubefwd to launch the services" | ||
sleep 3 | ||
} | ||
teardown(){ | ||
echo "# now killing kubefwd" | ||
# shellcheck disable=SC2086 | ||
[ $DEBUG = "true" ] || ${SUDO} killall kubefwd | ||
} | ||
|
||
@test "verify kairosdb-bridge is forwarding iri Property" { | ||
$SKIP | ||
echo "# Sending property to Kafka" | ||
send_to_kafka_bridge ${ATTRIBUTES_PROPERTY} ${ATTRIBUTES_TOPIC} | ||
sleep 2 | ||
data=$(get_datapoints "${KAIROSDB_METRICS}") | ||
echo "# result: $data" | ||
echo "# Now checking result." | ||
run check_sample_size "$data" "1" "default\\\\$IDSTATE" "$VALUE" "$PROPERTY" "@id" | ||
[ "$status" -eq "0" ] || (echo -n "$output"; false) | ||
} | ||
|
||
@test "verify kairosdb-bridge is forwarding Relationship" { | ||
$SKIP | ||
echo "# Sending relationship to Kafka" | ||
send_to_kafka_bridge ${ATTRIBUTES_RELATIONSHIP} ${ATTRIBUTES_TOPIC} | ||
sleep 2 | ||
data=$(get_datapoints "${KAIROSDB_METRICS_RELATIONSHIP}") | ||
echo "# result: $data" | ||
echo "# Now checking result." | ||
run check_sample_size "$data" "1" "default\\\\$IDREL" "$URN2" "$RELATIONSHIP" "@id" | ||
[ "$status" -eq "0" ] || (echo -n "$output"; false) | ||
} | ||
|
||
@test "verify kairosdb-bridge is forwarding value Property" { | ||
$SKIP | ||
echo "# Sending property to Kafka" | ||
send_to_kafka_bridge ${ATTRIBUTES_PROPERTY2} ${ATTRIBUTES_TOPIC} | ||
sleep 2 | ||
data=$(get_datapoints "${KAIROSDB_METRICS}") | ||
echo "# result: $data" | ||
echo "# Now checking result." | ||
run check_sample_size "$data" "1" "default\\\\$IDSTATE" "$VALUE2" "$PROPERTY" "@value" | ||
[ "$status" -eq "0" ] || (echo -n "$output"; false) | ||
} |