Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update CARLA Ambassador to use XMLRPC Client #165

Merged
merged 12 commits into from
Nov 7, 2023
34 changes: 34 additions & 0 deletions co-simulation/NOTICE-THIRD-PARTY.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,30 @@ Apache Commons Text (1.6)
* Source: https://git-wip-us.apache.org/repos/asf?p=commons-text


Apache WebServices Common Utilities (1.0.2)

* License: The Apache Software License, Version 2.0
* Maven artifact: `org.apache.ws.commons.util:ws-commons-util:1.0.2`
* Project: http://ws.apache.org/commons/util
* Source: https://svn.apache.org/viewcvs.cgi/webservices/commons/trunk/util


Apache XML-RPC Client Library (3.1.3)

* License: The Apache Software License, Version 2.0
* Maven artifact: `org.apache.xmlrpc:xmlrpc-client:3.1.3`
* Project: http://ws.apache.org/xmlrpc/xmlrpc-client/
* Source: https://svn.apache.org/viewvc/webservices/xmlrpc/tags/xmlrpc-3.1.3/xmlrpc-client


Apache XML-RPC Common Library (3.1.3)

* License: The Apache Software License, Version 2.0
* Maven artifact: `org.apache.xmlrpc:xmlrpc-common:3.1.3`
* Project: http://ws.apache.org/xmlrpc/xmlrpc-common/
* Source: https://svn.apache.org/viewvc/webservices/xmlrpc/tags/xmlrpc-3.1.3/xmlrpc-common


Commons Compiler (2.7.5)

* License: New BSD License
Expand Down Expand Up @@ -355,3 +379,13 @@ Woodstox (5.1.0)
* Maven artifact: `com.fasterxml.woodstox:woodstox-core:5.1.0`
* Project: https://github.com/FasterXML/woodstox
* Source: https://github.com/FasterXML/woodstox


XML Commons External Components XML APIs (1.0.b2)

* License: The Apache Software License, Version 2.0
* Maven artifact: `xml-apis:xml-apis:1.0.b2`
* Project: http://xml.apache.org/commons/#external
* Source: https://svn.apache.org/viewvc/xml/commons/tags/xml-commons-1_0_b2


Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@
"updateInterval": 100,
"carlaUE4Path": "/opt/carla/",
"bridgePath": "/opt/carma-simulation/scenarios/Town04/carla; bridge.sh",
"carlaConnectionPort": 8913
"carlaConnectionPort": 8913,
"carlaCDASimAdapterUrl":"http://127.0.0.1:8090/RPC2"

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"updateInterval": 100,
"carlaUE4Path": "/opt/carla/",
"bridgePath": "/opt/carma-simulation/scenarios/Town04_test/carla; bridge.sh",
"carlaConnectionPort": 8913
"carlaConnectionPort": 8913,
"carlaCDASimAdapterUrl":"http://127.0.0.1:8090/RPC2"
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
import com.google.common.collect.Lists;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.StringUtils;
import org.apache.xmlrpc.XmlRpcException;
import org.eclipse.mosaic.fed.carla.carlaconnect.CarlaConnection;
import org.eclipse.mosaic.fed.carla.carlaconnect.CarlaXmlRpcClient;
import org.eclipse.mosaic.fed.carla.config.CarlaConfiguration;
import org.eclipse.mosaic.fed.sumo.traci.constants.CommandSimulationControl;
import org.eclipse.mosaic.fed.sumo.traci.writer.ListTraciWriter;
import org.eclipse.mosaic.fed.sumo.traci.writer.StringTraciWriter;
import org.eclipse.mosaic.interactions.application.*;
import org.eclipse.mosaic.interactions.detector.DetectedObjectInteraction;
import org.eclipse.mosaic.interactions.detector.DetectorRegistration;
import org.eclipse.mosaic.lib.objects.detector.DetectedObject;
import org.eclipse.mosaic.lib.util.ProcessLoggingThread;
import org.eclipse.mosaic.lib.util.objects.ObjectInstantiation;
import org.eclipse.mosaic.rti.TIME;
Expand Down Expand Up @@ -110,6 +114,8 @@ public class CarlaAmbassador extends AbstractFederateAmbassador {
*/
private final PriorityBlockingQueue<CarlaV2xMessageReception> carlaV2xInteractionQueue = new PriorityBlockingQueue<>();

private List<DetectorRegistration> registeredDetectors = new ArrayList<>();

/**
* Creates a new {@link CarlaAmbassador} object.
*
Expand Down Expand Up @@ -210,13 +216,13 @@ public void initialize(long startTime, long endTime) throws InternalFederateExce
//initialize CarlaXmlRpcClient
//set the connected server URL
try{
URL xmlRpcServerUrl = new URL("http://127.0.0.1:8090/RPC2");
URL xmlRpcServerUrl = new URL(carlaConfig.carlaCDASimAdapterUrl);
carlaXmlRpcClient = new CarlaXmlRpcClient(xmlRpcServerUrl);
carlaXmlRpcClient.initialize();
}
catch (MalformedURLException m)
{
log.error("Errors occurred with {}", m.getMessage());
carlaXmlRpcClient.closeConnection();
}
// Start the CARLA simulator
startCarlaLocal();
Expand Down Expand Up @@ -351,15 +357,29 @@ public synchronized void processTimeAdvanceGrant(long time) throws InternalFeder
nextTimeStep += carlaConfig.updateInterval * TIME.MILLI_SECOND;
isSimulationStep = false;
}
// TODO: What is this. Why are we request a time advance based on this counter and
// what is it counting. It is labeled as counting the times we attempt to connect to
// CARLA but it seems to increment every time processTimeAdvanceGrant is called
rti.requestAdvanceTime(nextTimeStep + this.executedTimes, 0, (byte) 2);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kjrush Do you have any information why were are requesting time advance like this with a continuously incremented counter? If you have any information about executedTimes and why its used in our requestAdvanceTime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a quick skim of the rest of this PR, but so far focused on this.

I don't really see what purpose this executed times has, to be honest. It might just be entirely unnecessary. For the record, it doesn't increment infinitely. It's actually reset later on in the code inside the receiveInteraction(SimulationStepResponse) method. The interactions being received there seem to be generated whenever SUMO provides a new TraCI update, so it should be frequently, so this should never go above a small value, maybe 10 at most.

But even 10 doesn't make much sense, considering this requestAdvanceTime arg is in milliseconds, and there's no multiplicative factor applied to this executedTimes value (unlike carlaConfig.updateInterval above), so it's small enough it probably doesn't actually do anything. I imagine it was implemented like this with the goal that if we don't have any fresh data from sumo, we should wait longer to delay the CARLA updates until we do have fresh data. But I'm not sure that's needed and it seems unlikely this is actually accomplishing that.

I think you were right to be questioning about this line, but in the end I'm not sure it matters one way or the other. I'd be in favor of just removing it, but keeping this line in our mental bookmarks in case we run into issues with the ambassador. If we hit those issues we'll have this top of mind to re-investigate.

this.executedTimes++;

//call CarlaXmlRpcClient to ask for data whenever time advances
carlaXmlRpcClient.requestCarlaList();

List<DetectedObjectInteraction> detectedObjectInteractions = new ArrayList<DetectedObjectInteraction>();
// Get all detections from all currently registered detectors.
for (DetectorRegistration registration: registeredDetectors ) {
DetectedObject[] detections = carlaXmlRpcClient.getDetectedObjects( registration.getInfrastructureId() , registration.getDetector().getSensorId());
for (DetectedObject detected: detections) {
detectedObjectInteractions.add(new DetectedObjectInteraction(time, detected));
}
}
// trigger all detection interactions
for (DetectedObjectInteraction detectionInteraction: detectedObjectInteractions) {
this.rti.triggerInteraction(detectionInteraction);
}
} catch (IllegalValueException e) {
log.error("Error during advanceTime(" + time + ")", e);
throw new InternalFederateException(e);
}
catch (XmlRpcException e) {
log.error("Failed to connect to CARLA Adapter : ", e);
carlaXmlRpcClient.closeConnection();
}
}

Expand Down Expand Up @@ -494,6 +514,25 @@ public void processInteraction(Interaction interaction) {
} else if (interaction.getTypeId().equals(CarlaV2xMessageReception.TYPE_ID)) {
this.receiveInteraction((CarlaV2xMessageReception) interaction);
}
else if (interaction.getTypeId().equals(DetectorRegistration.TYPE_ID)) {
this.receiveInteraction((DetectorRegistration) interaction);
}
}

/**
* Method to call XMLRPC method to create sensor on reception of DetectionRegistration interactions.
* @param interaction Interaction triggered by Ambassadors attempting to create sensors in CARLA.
*/
private void receiveInteraction(DetectorRegistration interaction) {
try {
carlaXmlRpcClient.createSensor(interaction);
registeredDetectors.add(interaction);
}
catch(XmlRpcException e) {
log.error("Error occurred attempting to create sensor : {}\n{}", interaction.getDetector(), e);
carlaXmlRpcClient.closeConnection();
}

}

/**
Expand All @@ -508,7 +547,7 @@ private void receiveInteraction(CarlaTraciResponse interaction) {
carlaConnection.getDataOutputStream().write(interaction.getResult());
}
} catch (Exception e) {
log.error("error occurs during process carla traci response interaction: " + e.getMessage());
log.error("error occurs during process carla traci response interaction: {} ", e.getMessage());
}
}

Expand All @@ -526,7 +565,7 @@ private void receiveInteraction(SimulationStepResponse interaction) {
}

} catch (Exception e) {
log.error("error occurs during process simulation step response interaction: " + e.getMessage());
log.error("error occurs during process simulation step response interaction: {}", e.getMessage());
}
}

Expand All @@ -536,7 +575,7 @@ private void receiveInteraction(SimulationStepResponse interaction) {
* @param interaction CarlaV2xMessageReception interaction
*/
private void receiveInteraction(CarlaV2xMessageReception interaction) {
log.info("{} received V2x message: {}.", interaction.getReceiverID(), interaction.getMessage().toString());
log.info("{} received V2x message: {}.", interaction.getReceiverID(), interaction.getMessage());

interactionQueue.add(interaction);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,44 @@
package org.eclipse.mosaic.fed.carla.carlaconnect;

import java.net.URL;
import java.util.Arrays;
import java.util.List;

import org.apache.xmlrpc.XmlRpcException;
import org.apache.xmlrpc.client.XmlRpcClient;
import org.apache.xmlrpc.client.XmlRpcClientConfigImpl;
import org.eclipse.mosaic.interactions.detector.DetectorRegistration;
import org.eclipse.mosaic.lib.objects.detector.DetectedObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.gson.Gson;

/**
* This is a class uses xmlrpc to connect with CARLA CDASim adapter service
* This is a class that uses xmlrpc to connect with CARLA CDASim Adapter. It includes calls
* to dynamically create sensors and get detected objects from created sensors.
*/
public class CarlaXmlRpcClient{

boolean isConnected;
private String registeredFunction = "test.echo";
private boolean isConnected;
private static final String CREATE_SENSOR = "create_simulated_semantic_lidar_sensor";
private static final String GET_DETECTED_OBJECTS = "get_detected_objects";

private XmlRpcClient client;
private URL xmlRpcServerUrl;
private final Logger log = LoggerFactory.getLogger(this.getClass());


public CarlaXmlRpcClient(URL xmlRpcServerUrl) {
initialize(xmlRpcServerUrl);
this.xmlRpcServerUrl = xmlRpcServerUrl;
}

/**
* This method is used to send a stop message to the python server side to shut down server from there
*/
public void closeConnection()
{
log.info("carla connection server closing");
isConnected = false;
}



/**
* need to getting a URL to connect to from ambassador
* Initialize XmlRpcClient.
* @param xmlRpcServerUrl
*/
public void initialize(URL xmlRpcServerUrl)
public void initialize()
{
XmlRpcClientConfigImpl config = new XmlRpcClientConfigImpl();
config.setServerURL(xmlRpcServerUrl);
Expand All @@ -62,34 +62,53 @@ public void initialize(URL xmlRpcServerUrl)
isConnected = true;
}

/**
* Calls CARLA CDA Sim Adapter create_sensor XMLRPC method and logs sensor ID of created sensor.
* @param registration DetectorRegistration interaction used to create sensor.
* @throws XmlRpcException if XMLRPC call fails or connection is lost.
*/
public void createSensor(DetectorRegistration registration) throws XmlRpcException{
List<Double> location = Arrays.asList(registration.getDetector().getLocation().getX(), registration.getDetector().getLocation().getY(), registration.getDetector().getLocation().getZ());
List<Double> orientation = Arrays.asList(registration.getDetector().getOrientation().getPitch(), registration.getDetector().getOrientation().getRoll(), registration.getDetector().getOrientation().getYaw());

if (isConnected) {
Object[] params = new Object[]{registration.getInfrastructureId(), registration.getDetector().getSensorId(), location, orientation};
Object result = client.execute(CREATE_SENSOR, params);
log.info((String)result);
}
else {
log.warn("XMLRpcClient is not connected to CARLA Adapter!");
}
}
/**
* This method uses xmlrpc to connect with CARLA CDASim adapter service
* @throws XmlRpcException
*/
public void requestCarlaList()
{
if(!isConnected)
{

try {
throw new XmlRpcException("Server is not connected!");
}
catch (XmlRpcException e)
{
log.error("Server is not connected! {}", e.getMessage());
}
* Calls CARLA CDA Sim Adapter get_detected_objects XMLRPC method and returns an array of DetectedObject.
* @param infrastructureId String infrastructure ID of sensor to get detections from.
* @param sensorId String sensor ID of sensor to get detections from
* @return DetectedObject[] from given sensor.
* @throws XmlRpcException if XMLRPC call fails or connection is lost.
*/
public DetectedObject[] getDetectedObjects(String infrastructureId ,String sensorId) throws XmlRpcException{
if (isConnected) {
Object[] params = new Object[]{infrastructureId, sensorId};
Object result = client.execute(GET_DETECTED_OBJECTS, params);
log.debug("Detections from infrastructure {} sensor {} : {}", infrastructureId, sensorId, result);
String jsonResult = (String)result;
Gson gson = new Gson();
DetectedObject[] parsedMessage = gson.fromJson(jsonResult,
DetectedObject[].class);
return parsedMessage;
}
try{
Object[] params = new Object[]{"Test " + java.time.LocalDateTime.now()};
Object result = client.execute(registeredFunction, params);
log.info((String)result);
else {
throw new XmlRpcException("XMLRpcClient is not connected to CARLA Adapter!");

}
catch (XmlRpcException x)
{
log.error("Errors occurred with xmlrpc connection {}", x.getMessage());
closeConnection();
}

}
/**
* Method to set isConnected field to false. Does not actually close the underlying http connection session but
* is used to avoid repeated timeouts/exceptions on misconfiguration of XMLRPC client.
*/
public void closeConnection() {
log.warn("Closing XML RPC Client connection in CARLA Ambassador!");
isConnected = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,10 @@ public class CarlaConfiguration implements Serializable {
* Carla connection port
*/
public int carlaConnectionPort;
/**
* URL where CARLACDASimAdapter XMLRPC Server is hosted
*/
public String carlaCDASimAdapterUrl;


}
Loading