diff --git a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java index 5b1dd7f105..6e604ccf5b 100644 --- a/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java +++ b/udmis/src/main/java/com/google/bos/udmi/service/messaging/impl/SimpleMqttPipe.java @@ -153,8 +153,34 @@ private String makeMqttTopic(Bundle bundle) { } private String makeTopic(Envelope envelope) { - return format("/r/%s/d/%s/t/%s/f/%s", - envelope.deviceRegistryId, envelope.deviceId, envelope.subType, envelope.subFolder); + return format("/r/%s/d/%s/t/%s/f/%s/g/%s", envelope.deviceRegistryId, envelope.deviceId, + envelope.subType, envelope.subFolder, envelope.gatewayId); + } + + private Map parseEnvelopeTopic(String topic) { + // 0/1/2 /3/4 /5/6 [/7/8 [/9/10 ]] + // /r/REGISTRY/d/DEVICE/t/TYPE[/f/FOLDER[/g/GATEWAY]] + String[] parts = topic.split("/", 12); + if (parts.length < 7 || parts.length > 11) { + throw new RuntimeException("Unexpected topic length: " + topic); + } + Envelope envelope = new Envelope(); + checkState(Strings.isNullOrEmpty(parts[0]), "non-empty prefix"); + checkState("r".equals(parts[1]), "expected registries"); + envelope.deviceRegistryId = parts[2]; + checkState("d".equals(parts[3]), "expected devices"); + envelope.deviceId = parts[4]; + checkState("t".equals(parts[5]), "expected type"); + envelope.subType = SubType.fromValue(parts[6]); + if (parts.length >= 8) { + checkState("f".equals(parts[7]), "expected type"); + envelope.subFolder = SubFolder.fromValue(parts[8]); + } + if (parts.length >= 10) { + checkState("g".equals(parts[9]), "expected gateway"); + envelope.gatewayId = parts[10]; + } + return toStringMap(envelope); } private void subscribeToMessages() { @@ -220,28 +246,6 @@ public void messageArrived(String topic, MqttMessage message) { } } - private Map parseEnvelopeTopic(String topic) { - // 0/1/2 /3/4 /5/6 [/7/8] - // /r/REGISTRY/d/DEVICE/t/TYPE[/f/FOLDER] - String[] parts = topic.split("/", 10); - if (parts.length < 7 || parts.length > 9) { - throw new RuntimeException("Unexpected topic length: " + topic); - } - Envelope envelope = new Envelope(); - checkState(Strings.isNullOrEmpty(parts[0]), "non-empty prefix"); - checkState("r".equals(parts[1]), "expected registries"); - envelope.deviceRegistryId = parts[2]; - checkState("d".equals(parts[3]), "expected devices"); - envelope.deviceId = parts[4]; - checkState("t".equals(parts[5]), "expected type"); - envelope.subType = SubType.fromValue(parts[6]); - if (parts.length >= 8) { - checkState("f".equals(parts[7]), "expected type"); - envelope.subFolder = SubFolder.fromValue(parts[8]); - } - return toStringMap(envelope); - } - @Override void resetForTest() { super.resetForTest();