Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Commit

Permalink
Removed rosutils.py and test.py, Documentation
Browse files Browse the repository at this point in the history
Also added checks for responses on the Context-Broker and some preperations for Issue #3
  • Loading branch information
iml-dlux committed Jan 7, 2019
1 parent e4093ce commit 6872f73
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 254 deletions.
95 changes: 71 additions & 24 deletions scripts/include/contextbroker/cbPublisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,79 +21,126 @@
__status__ = "Developement"

import os
import copy

import json
import rospy
import requests

from include.logger import Log
from include.constants import DATA_CONTEXTBROKER
from include.FiwareObjectConverter.objectFiwareConverter import ObjectFiwareConverter


CB_BASE_URL = "http://{}:{}/v2/entities/".format(DATA_CONTEXTBROKER["ADDRESS"], DATA_CONTEXTBROKER["PORT"])
CB_HEADER = {'Content-Type': 'application/json'}

class CbPublisher(object):
''' The CbPublisher handles the Enities on CONTEXT_BROKER / v2 / entities .
It creates not creaed Entities and updates their attributes via 'publishToCB'.
On Shutdown the tracked Entities are deleted.
Also the rawMsg is converted here via the Object Converter
THIS IS THE ONLY FILE WHICH OPERATES ON /v2/entities
'''

# Keeps track of the posted Content on the ContextBroker
# via posted_history[ROBOT_ID][TOPIC]
posted_history = {}

def publishToCB(self, robotID, topic, datatype, rawMsg, msgDefintionDict):
#TODO DL set publish frequency!
def publishToCB(self, robotID, topic, rawMsg, msgDefintionDict):
''' This is the actual publish-Routine which updates and creates Entities on the
ContextBroker. It also keeps track via posted_history on already posted entities and topics
robotID: A string corresponding to the Robot-Id
topic: Also a string, corresponding to the topic of the robot
rawMsg: the raw data directly obtained from rospy
msgDefintionDict: The Definition as obtained FROM ros2Definition TODO DL
TODO DL During Runtime an Entitiy might get deleted, check it here!
TODO DL set publish frequency
'''
# if struct not initilized, intitilize it even on ContextBroker!
if robotID not in self.posted_history:
self.posted_history[robotID] = {}
self.posted_history[robotID]['type'] = 'ROBOT'
self.posted_history[robotID]['id'] = robotID
# Intitialize Entitiy/Robot-Construct on ContextBroker
jsonStr = ObjectFiwareConverter.obj2Fiware(self.posted_history[robotID], ind=0, ignorePythonMetaData=True)
# TODO Error Checking!
url = "http://{}:{}/v2/entities".format(DATA_CONTEXTBROKER["ADDRESS"], DATA_CONTEXTBROKER["PORT"])
response = requests.post(url, data=jsonStr, headers={'Content-Type': 'application/json'})
response = requests.post(CB_BASE_URL, data=jsonStr, headers=CB_HEADER)
self._responseCheck(response, attrAction=0, topEnt=robotID)

if topic not in self.posted_history[robotID]:
self.posted_history[robotID][topic] = {}

# Check if descriptions are already added
# Check if descriptions are already added, if not execute again with descriptions!
if 'descriptions' not in self.posted_history[robotID]:
self.posted_history[robotID]['descriptions'] = self.loadDescriptions(robotID)
self.posted_history[robotID]['descriptions'] = self._loadDescriptions(robotID)
if self.posted_history[robotID]['descriptions'] is not None:
self.publishToCB(robotID, 'descriptions', self.posted_history[robotID]['descriptions'], None)

# check if previous posted topic type is the same, iff not, we do not post it to the context broker
if self.posted_history[robotID][topic] != {} and rawMsg._type != self.posted_history[robotID][topic]._type:
Log("WARNING", "Received Msg-Type '{}' but expected '{}' on Topic '{}'".format(rawMsg._type, self.posted_history[robotID][topic]._type, topic))
if (self.posted_history[robotID][topic] != {} and topic != "descriptions"
and rawMsg._type != self.posted_history[robotID][topic]._type):
Log("ERROR", "Received Msg-Type '{}' but expected '{}' on Topic '{}'".format(rawMsg._type, self.posted_history[robotID][topic]._type, topic))
return


# Replace previous rawMsg with current one
self.posted_history[robotID][topic] = rawMsg

# Set Definition-Dict TODO DL do not rebuild every definition Dict!
# Set Definition-Dict
definitionDict = {}
definitionDict[topic] = msgDefintionDict
completeJsonStr = ObjectFiwareConverter.obj2Fiware(self.posted_history[robotID], ind=0, dataTypeDict=definitionDict, ignorePythonMetaData=True)

# format json, so that the contextbroker accept it.
# format json, so that the contextbroker accepts it.
partJsonStr = json.dumps({
topic: json.loads(completeJsonStr)[topic]
})


# TODO Error Checking!
url = "http://{}:{}/v2/entities/{}/attrs".format(DATA_CONTEXTBROKER["ADDRESS"], DATA_CONTEXTBROKER["PORT"], robotID)
response = requests.post(url, data=partJsonStr, headers={'Content-Type': 'application/json'})
# Update attribute on ContextBroker
response = requests.post(CB_BASE_URL + robotID + "/attrs", data=partJsonStr, headers=CB_HEADER)
self._responseCheck(response, attrAction=1, topEnt=topic)


def unpublishALLFromCB(self):
# Removes all on start-up created entities on shutdown
puburl = "http://{}:{}/v2/entities/".format(DATA_CONTEXTBROKER["ADDRESS"], DATA_CONTEXTBROKER["PORT"])
''' Removes all previously tracked Entities/Robots on ContextBroker
'''
for robotID in self.posted_history:
response = requests.delete(puburl + robotID)
# TODO DL ERRORCHecking!
response = requests.delete(CB_BASE_URL + robotID)
self._responseCheck(response, attrAction=2, topEnt=robotID)


def loadDescriptions(self, robotID):
# TODO DL refactor and no hardcoded reference to file!
def _loadDescriptions(self, robotID):
''' Simply load the descriptions from the 'robotdescriptions.json'-file and
return its value
robotID: The Robot-Id-String
'''
# TODO DL no hardcoded reference to files!
current_path = os.path.dirname(os.path.abspath(__file__))
json_path = current_path.replace("scripts/include/contextbroker", "config/robotdescriptions.json")
description_data = json.load(open(json_path))

# Check if a robotID has descriptions
if robotID in description_data:
if 'descriptions' in description_data[robotID]:
return description_data[robotID]['descriptions']

return None
return None


def _responseCheck(self, response, attrAction=0, topEnt=None):
''' Check if Response is ok (2XX and some 3XX). If not print an individual Error.
response: the actual response
attrAction: One of [0, 1, 2] which maps to -> [Creation, Update, Deletion]
topEnt: the String of an Entity or a topic, which was used
'''
if not response.ok:
if attrAction == 0:
Log("WARNING", "Could not create Entitiy/Robot {} in Contextbroker. Maybe it already exists?".format(topEnt))
elif attrAction == 1:
Log("ERROR", "Cannot update attributes in Contextbroker for topic: {}".format(topEnt))
else:
Log("WARNING", "Could not delete Entitiy/Robot {} in Contextbroker.".format(topEnt))
1 change: 1 addition & 0 deletions scripts/include/contextbroker/cbQueryBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class CbQueryBuilder(object):
# TODO DL used in requestHandler
## \brief Query data to context broker
def findById(self, entity_id, data_type="ROBOT", isPattern=False):
# TODO DL change to v2, maybe not working?
## \brief Get entity data from context broker
# \param entity name (can be regular expression)
# \param entity type
Expand Down
138 changes: 90 additions & 48 deletions scripts/include/contextbroker/cbSubscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,100 +23,142 @@
import time
import thread
import requests
import json

from include.FiwareObjectConverter.objectFiwareConverter import ObjectFiwareConverter

from include.constants import *
from include.constants import DATA_CONTEXTBROKER, IP, SERVER_PORT
from include.logger import Log


CB_BASE_URL = "http://{}:{}".format(DATA_CONTEXTBROKER["ADDRESS"], DATA_CONTEXTBROKER["PORT"])
FIROS_NOTIFY_URL = "http://{}:{}/firos".format(IP, SERVER_PORT) # TODO DL HTTP?

class CbSubscriber(object):
## \brief Context broker subscription handler
subscriptions = {} # TODO DL remove?
subscriptionThreads = {}
''' The CbSubscriber handles the subscriptions on the ContextBroker.
Only the url CONTEXT_BROKER / v2 / subcriptions is used here!
As on CbPublisher on shutdown all subscriptions are deleted form
ContextBroker.
this Objects also converts the received data from ContextBroker
back into a Python-Object.
Here each topic of an robot is subscribed seperately.
THIS IS THE ONLY FILE WHICH OPERATES ON /v2/subscriptions
'''

# Saves the subscriptions IDs returned from ContextBroker.
# Follwoing Structure: subscriptionIds[ROBOT_ID][TOPIC] returns a sub-Id in String
subscriptionIds = {}
refresh_thread = None

def subscribeToCB(self, robotID, topic):
# If not already subscribed, start a new thread which handles the subscription
def subscribeToCB(self, robotID, topicList):
''' This method starts for each topic an own thread, which handles the subscription
robotID: The string of the robotID
topicList: A list of topics, corresponding to the robotID
'''
# If not already subscribed, start a new thread which handles the subscription for each topic for an robot.
# And only If the topic list is not empty!
if robotID not in self.subscriptions and topic:
Log("INFO", "Subscribing on context broker to " + robotID + " and topics: " + str(topic))
self.subscriptionThreads[robotID] = thread.start_new_thread(self.subscribeThread, (robotID, topic))
#Start Thread via subscription
if robotID not in self.subscriptionIds and topicList:
Log("INFO", "Subscribing on Context-Broker to " + robotID + " and topics: " + str(topicList))
self.subscriptionIds[robotID] = {}
for topic in topicList:
thread.start_new_thread(self.subscribeThread, (robotID, topic)) #Start Thread via subscription


def unsubscribeALLFromCB(self):
suburl = "http://{}:{}".format(DATA_CONTEXTBROKER["ADDRESS"], DATA_CONTEXTBROKER["PORT"])
''' Simply unsubscribed from all tracked subscriptions
'''
for robotID in self.subscriptionIds:
response = requests.delete(suburl + self.subscriptionIds[robotID])
# TODO DL Error Handling!
for topic in self.subscriptionIds[robotID]:
response = requests.delete(CB_BASE_URL + self.subscriptionIds[robotID][topic])
self._checkResponse(response, subID=self.subscriptionIds[robotID][topic])


def subscribeThread(self, robotID, topic):
''' A Subscription-Thread. It Life-Cycle is as follows:
-> Subscribe -> Delete old Subs-ID -> Save new Subs-ID -> Wait ->
def subscribeThread(self, robotID, topicList):
# Subscribe New, Unsubscribe old and sleep. If time has passed re-do!
suburl = "http://{}:{}".format(DATA_CONTEXTBROKER["ADDRESS"], DATA_CONTEXTBROKER["PORT"])

robotID: A string corresponding to the robotID
topic: The Topic (string) to subscribe to.
'''
while True:
# Subscribe
jsonData = self.subscribeJSONGenerator(robotID, topicList)
response = requests.post(suburl + "/v2/subscriptions", data=jsonData, headers={'Content-Type': 'application/json'})
jsonData = self.subscribeJSONGenerator(robotID, topic)
response = requests.post(CB_BASE_URL + "/v2/subscriptions", data=jsonData, headers={'Content-Type': 'application/json'})
self._checkResponse(response, created=True, robTop=(robotID, topic))

newSubID = response.headers['Location'] # <- get subscription-ID

# Unsubscribe
if robotID in self.subscriptionIds:
response = requests.delete(suburl + self.subscriptionIds[robotID])
self.subscriptionIds[robotID] = newSubID

#Sleep
if robotID in self.subscriptionIds and topic in self.subscriptionIds[robotID]:
response = requests.delete(CB_BASE_URL + self.subscriptionIds[robotID][topic])
self._checkResponse(response, subID=self.subscriptionIds[robotID][topic])

# Save new ID
self.subscriptionIds[robotID][topic] = newSubID

# Wait
time.sleep(290) # sleep TODO DL from config loaded seconds
Log("INFO", "Refreshing Subscription for " + robotID + " and topics: " + str(topicList))


# TODO DL Error Handling!
Log("INFO", "Refreshing Subscription for " + robotID + " and topics: " + str(topic))


def subscribeJSONGenerator(self, robotID, topic):
''' This method returns the correct JSON-format to subscribe to the ContextBroker.
The Expiration-Date/Throttle and Type of robots is retreived here via configuration (TODO DL)
def subscribeJSONGenerator(self, robotID, topicList):
# This method generates the JSON for the 'v2/subscriptions'-API of Orion CB
robotID: The String of the Robot-Id.
topic: The actual topic to subscribe to.
'''
# This struct correspondes to following JSON-format:
# https://fiware-orion.readthedocs.io/en/master/user/walkthrough_apiv2/index.html#subscriptions
struct = {
"subject": {
"entities": [
{
"id": str(robotID),
"type": "ROBOT" # Type is currently always a robot! #TODO DL load from cfg
"type": "ROBOT" # TODO DL load via Configuration
}
],
"condition": {
"attrs": topicList
"attrs": [str(topic)]
}
},
"notification": {
"http": {
"url": "http://{}:{}/firos".format(IP, SERVER_PORT) # TODO DL HTTP?
"url": FIROS_NOTIFY_URL
},
"attrs": topicList
"attrs": [str(topic)]
},
"expires": time.strftime("%Y-%m-%dT%H:%M:%S.00Z", time.gmtime(time.time() + 300)) # TODO DL set expire in Config, ISO 8601, 300 secs
"expires": time.strftime("%Y-%m-%dT%H:%M:%S.00Z", time.gmtime(time.time() + 300)) # TODO DL load via Configuration, ISO 8601
# "throttling": 5 # TODO DL Maybe throttle?
}
return json.dumps(struct)


def convertReceivedDataFromCB(self, jsonData):
''' This parses the Input Back into a TypeValue-object via the
Object-Converter. This method is here to uniform the Obejct-Conversions
in CbPublisher and CbSubscriber
def receivedData(self, robotID, topic, jsonData):
# parse the Input and let the TopicHandler (TODO DL remove from there)
# publish the data back into ROS
topic: The topic, which should be converted.
the topic should have "id", "type" and "TOPIC" in it
'''
kv = self.TypeValue()
ObjectFiwareConverter.fiware2Obj(jsonData, kv, setAttr=True, useMetadata=False)
return kv

## DL - Back Parsing Stub Class
class TypeValue(object):
def __init__(self):
pass

# TODO DL remove wrapper!
def subscribe(self, namespace, data_type, robot):
# unused -> data_type
self.subscribeToCB(namespace, robot["publisher"].keys())
class TypeValue(object):
''' A Stub-Object to parse the received data
'''


def _checkResponse(self, response, robTop=None, subID=None, created=False):
''' RESPONSE TODO DL
'''
if not response.ok:
if created:
Log("ERROR", "Could not create subscription for Robot {} and topic {} in Context-Broker".format(robTop[0], robTop[1]))
else:
Log("WARNING", "Could not delete subscription {} from Context-Broker".format(subID))
1 change: 0 additions & 1 deletion scripts/include/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ def Log(level, *args):
if level == 'CRITICAL':
_logger.critical(text)
elif level == 'ERROR':
print "error" + text
_logger.error(text)
elif level == 'WARNING':
_logger.warning(text)
Expand Down
Loading

0 comments on commit 6872f73

Please sign in to comment.