Skip to content

Commit

Permalink
Add auto-reconnect to ROS for the robot browser (#76)
Browse files Browse the repository at this point in the history
* Address the bug where initial ROS comms won't go through

* Removed checkROSConnection

* Fix from test cases

* Wait until key topics have a publisher

* Copy getPublishers in for backwards compatibility
  • Loading branch information
hello-amal authored Jul 18, 2024
1 parent 6580963 commit 18b18a4
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 33 deletions.
1 change: 1 addition & 0 deletions launch/web_interface.launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ def generate_launch_description():
]
),
"authenticate": "false",
"call_services_in_new_thread": "true",
}.items(),
)
ld.add_action(rosbridge_launch)
Expand Down
12 changes: 4 additions & 8 deletions src/pages/robot/tsx/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ connection = new WebRTCConnection({
onMessage: handleMessage,
onConnectionEnd: disconnectFromRobot,
});

robot.connect().then(() => {
robot.setOnRosConnectCallback(async () => {
robot.subscribeToVideo({
topicName: "/navigation_camera/image_raw/rotated/compressed",
callback: navigationStream.updateImage,
Expand All @@ -84,7 +83,10 @@ robot.connect().then(() => {
robot.getJointLimits();

connection.joinRobotRoom();

return Promise.resolve();
});
robot.connect();

function handleSessionStart() {
connection.removeTracks();
Expand Down Expand Up @@ -280,12 +282,6 @@ function handleMessage(message: WebRTCMessage) {
case "getHasBetaTeleopKit":
robot.getHasBetaTeleopKit();
case "moveToPregrasp":
console.log(
"moveToPregrasp",
message.scaled_x,
message.scaled_y,
message.horizontal,
);
robot.executeMoveToPregraspGoal(
message.scaled_x,
message.scaled_y,
Expand Down
195 changes: 176 additions & 19 deletions src/pages/robot/tsx/robot.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const moveToPregraspActionName = "/move_to_pregrasp";

export class Robot extends React.Component {
private ros: ROSLIB.Ros;
private readonly rosURL = "wss://localhost:9090";
private rosReconnectTimerID?: ReturnType<typeof setTimeout>;
private onRosConnectCallback?: () => Promise<void>;
private jointLimits: { [key in ValidJoints]?: [number, number] } = {};
private jointState?: ROSJointState;
private poseGoal?: ROSLIB.ActionGoal;
Expand Down Expand Up @@ -98,31 +101,135 @@ export class Robot extends React.Component {
this.stretchToolCallback = props.stretchToolCallback;
}

setOnRosConnectCallback(callback: () => Promise<void>) {
this.onRosConnectCallback = callback;
}

async connect(): Promise<void> {
console.log("Connecting to ROS...");
this.ros = new ROSLIB.Ros({
// set this to false to use the new service interface to
// tf2_web_republisher. true is the default and means roslibjs
// will use the action interface
groovyCompatibility: false,
url: "wss://localhost:9090",
url: this.rosURL,
});

return new Promise<void>((resolve, reject) => {
this.ros.on("connection", async () => {
this.ros.on("connection", async () => {
console.log("Connected to ROS.");
// We check that bidirectional communications with ROS are working, and
// that some key topics have publishers (which are indicative of all
// required nodes being loaded). This is because ROSbridge matches the
// QoS of publishers, so without publishers there is likely to be a
// QoS mismatch.
let isConnected = await this.checkROSConnection();
if (isConnected) {
await this.onConnect();
resolve();
});
this.ros.on("error", (error) => {
reject(error);
});
if (this.onRosConnectCallback) await this.onRosConnectCallback();
} else {
console.log("Required ROS nodes are not yet loaded. Reconnecting.");
this.reconnect();
}
});
this.ros.on("error", (error) => {
console.log("Error connecting to ROS:", error);
this.reconnect();
});

this.ros.on("close", () => {
reject("Connection to websocket has been closed.");
});
this.ros.on("close", () => {
console.log("Connection to ROS has been closed.");
this.reconnect();
});
}

async reconnect(interval_ms: number = 1000) {
if (!this.rosReconnectTimerID) {
this.rosReconnectTimerID = setTimeout(() => {
console.log("Reconnecting to ROS...");
this.ros.close();
this.ros.connect(this.rosURL);
this.rosReconnectTimerID = undefined;
}, interval_ms);
}
}

async checkROSConnection(
required_topics: string[] = [
"/camera/color/image_raw/rotated/compressed",
"/gripper_camera/image_raw/cropped/compressed",
"/navigation_camera/image_raw/rotated/compressed",
"/stretch/joint_states",
],
timeout_ms: number = 5000,
): Promise<boolean> {
// For backwards compatibility with older versions of roslibjs, use the
// local copy of getPublishers if the ROS object does not have it.
let getPublishers = this.getPublishers.bind(this);
if (this.ros.getPublishers !== undefined) {
getPublishers = this.ros.getPublishers.bind(this.ros);
}

let numRequiredTopicsWithPublisher = 0;
let isResolved = false;
console.log("Checking ROS connection...");
return new Promise(async (resolve) => {
if (this.ros.isConnected) {
for (let topic of required_topics) {
// Verify that the topic has a publisher
getPublishers(
topic,
// Success callback
(publishers: string[]) => {
if (publishers.length > 0) {
console.log("Got a publisher on topic", topic);
numRequiredTopicsWithPublisher += 1;
if (numRequiredTopicsWithPublisher === required_topics.length) {
console.log("Got publishers on all required topics.");
isResolved = true;
resolve(true);
}
} else {
console.log("No publisher on topic", topic);
isResolved = true;
resolve(false);
}
},
// Failure callback
(error) => {
console.log(
"Error in getting publishers for topic",
topic,
error,
);
isResolved = true;
resolve(false);
},
);
}
resolve(
await new Promise<boolean>((resolve) =>
setTimeout(() => {
if (!isResolved) {
if (numRequiredTopicsWithPublisher < required_topics.length) {
console.log(
"Timed out with at least one required topic not having publishers.",
);
resolve(false);
}
}
}, timeout_ms),
),
);
} else {
console.log("ROS is not connected.");
isResolved = true;
resolve(false);
}
});
}

async onConnect() {
console.log("onConnect");
this.subscribeToJointState();
this.subscribeToJointLimits();
this.subscribeToBatteryState();
Expand Down Expand Up @@ -205,7 +312,13 @@ export class Robot extends React.Component {
this.subscriptions.push(jointLimitsTopic);

jointLimitsTopic.subscribe((msg: ROSJointState) => {
msg.name.forEach((name, idx) => {
msg.name.forEach((name: string, idx: number) => {
console.log(
"Got joint limit for",
name,
msg.position[idx],
msg.velocity[idx],
);
if (name == "joint_arm") name = "wrist_extension";
this.jointLimits[name] = [msg.position[idx], msg.velocity[idx]];
});
Expand Down Expand Up @@ -248,6 +361,7 @@ export class Robot extends React.Component {
}

getStretchTool() {
console.log("Getting stretch tool", this.ros.isConnected);
// NOTE: This information can also come from the /tool topic.
// However, we only need it once, so opt for a parameter.
this.stretchToolParam = new ROSLIB.Param({
Expand Down Expand Up @@ -292,14 +406,23 @@ export class Robot extends React.Component {
}

getJointLimits() {
console.log("Getting joint limits");
let getJointLimitsService = new ROSLIB.Service({
ros: this.ros,
name: "/get_joint_states",
serviceType: "std_srvs/Trigger",
});

var request = new ROSLIB.ServiceRequest({});
getJointLimitsService.callService(request, () => {});
getJointLimitsService.callService(
request,
() => {
console.log("Got joint limits service succeeded");
},
(error) => {
console.log("Got joint limits service failed", error);
},
);
}

subscribeToActionResult(
Expand Down Expand Up @@ -514,8 +637,8 @@ export class Robot extends React.Component {
request,
(response: boolean) => {
response
? console.log("Enabled realsense depth sensing")
: console.log("Disabled realsense depth sensing");
? console.log("Successfully set realsense depth sensing to", toggle)
: console.log("Failed to set realsense depth sensing to", toggle);
},
);
}
Expand All @@ -526,8 +649,8 @@ export class Robot extends React.Component {
request,
(response: boolean) => {
response
? console.log("Enabled gripper depth sensing")
: console.log("Disabled gripper depth sensing");
? console.log("Successfully set gripper depth sensing to", toggle)
: console.log("Failed to set gripper depth sensing to", toggle);
},
);
}
Expand All @@ -538,8 +661,8 @@ export class Robot extends React.Component {
request,
(response: boolean) => {
response
? console.log("Enabled expanded gripper")
: console.log("Disabled expanded gripper");
? console.log("Successfully set expanded gripper to", toggle)
: console.log("Failed to set expanded gripper to", toggle);
},
);
}
Expand Down Expand Up @@ -981,4 +1104,38 @@ export class Robot extends React.Component {
// Send an empty string and override behavior 1 to interrupt the current speech
this.playTextToSpeech("", 1);
}

/**
* Copied from https://github.com/hello-vinitha/roslibjs/pull/1 and
* https://github.com/RobotWebTools/roslibjs/pull/760 , included here for
* backwards compatibility.
*/
getPublishers(
topic: string,
callback: (publishers: string[]) => void,
failedCallback: (message: any) => void,
) {
var publishersClient = new ROSLIB.Service({
ros: this.ros,
name: "/rosapi/publishers",
serviceType: "rosapi_msgs/srv/Publishers",
});

var request = new ROSLIB.ServiceRequest({
topic: topic,
});
if (typeof failedCallback === "function") {
publishersClient.callService(
request,
function (result: any) {
callback(result.publishers);
},
failedCallback,
);
} else {
publishersClient.callService(request, function (result) {
callback(result.publishers);
});
}
}
}
18 changes: 13 additions & 5 deletions src/pages/robot/tsx/videostreams.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export class VideoStream extends React.Component<VideoStreamProps> {
className?: string;
outputVideoStream?: MediaStream;
aspectRatio: any;
started: boolean;

constructor(props: VideoStreamProps) {
super(props);
Expand All @@ -58,6 +59,7 @@ export class VideoStream extends React.Component<VideoStreamProps> {
this.video.setAttribute("width", this.width.toString());
this.video.setAttribute("height", this.height.toString());
this.outputVideoStream = new MediaStream();
this.started = false;

this.updateImage = this.updateImage.bind(this);
}
Expand Down Expand Up @@ -85,7 +87,7 @@ export class VideoStream extends React.Component<VideoStreamProps> {
);
}
if (!this.imageReceived) {
let { width, height, data } = jpeg.decode(
let { width, height } = jpeg.decode(
Uint8Array.from(atob(message.data), (c) => c.charCodeAt(0)),
true,
);
Expand All @@ -109,10 +111,16 @@ export class VideoStream extends React.Component<VideoStreamProps> {
}

start() {
if (!this.canvas.current) throw "Video stream canvas null";
this.outputVideoStream = this.canvas.current.captureStream(this.fps);
this.video.srcObject = this.outputVideoStream;
this.drawVideo();
if (!this.started) {
console.log("Starting video stream", this.streamName);
if (!this.canvas.current) throw "Video stream canvas null";
this.outputVideoStream = this.canvas.current.captureStream(this.fps);
this.video.srcObject = this.outputVideoStream;
this.drawVideo();
this.started = true;
} else {
console.log("Video stream already started", this.streamName);
}
}

render() {
Expand Down
2 changes: 1 addition & 1 deletion src/shared/webrtcconnections.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export class WebRTCConnection extends React.Component {
this.socket = io();

this.socket.on("connect", () => {
console.log("socket connected");
console.log("WebRTCConnection socket connected");
});

this.socket.on("join", (room: string) => {
Expand Down

0 comments on commit 18b18a4

Please sign in to comment.