Skip to content

Commit

Permalink
[#3488] Enable subtopics for downstream messages with Pub/Sub messagi…
Browse files Browse the repository at this point in the history
…ng infrastructure

Enable subtopic support in case Pub/Sub is used as the messaging infrastructure by checking if the topic contains subtopics and rerouting them to the respective Pub/Sub topic (e.g. MQTT topic event/exampleTenant/exampleDevice/subtopic to Pub/Sub topic exampleTenant.event.subtopic).

Implement a fallback mechanism in case the subtopic does not exist. In case the subtopic does not exist the message would instead get send to the "main" topic (e.g. telemetry or event).

Signed-off-by: matthiask <[email protected]>
  • Loading branch information
mattkaem authored and sophokles73 committed May 24, 2023
1 parent 95d17e0 commit 8eddc19
Show file tree
Hide file tree
Showing 8 changed files with 399 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,21 +194,43 @@ protected final Future<Void> sendAndWaitForOutcome(

final PubsubMessage pubsubMessage = builder.build();

log.info("sending message to Pub/Sub [topic: {}, registry: {}, deviceId: {}]", topic, tenantId, deviceId);
log.debug("sending message to Pub/Sub [topic: {}, registry: {}, deviceId: {}]", topic, tenantId, deviceId);
logPubSubMessage(currentSpan, pubsubMessage, topic, tenantId);

return getOrCreatePublisher(topic).publish(pubsubMessage)
.onSuccess(recordMessage -> {
logPubSubMessageId(currentSpan, topic, recordMessage);
})
.onFailure(t -> {
logError(currentSpan, topic, tenantId, deviceId, t);
throw new ServerErrorException(tenantId, HttpURLConnection.HTTP_UNAVAILABLE, t);
})
.onSuccess(recordMessage -> logPubSubMessageId(currentSpan, topic, recordMessage))
.recover(t -> retrySendToFallbackTopic(topic, currentSpan, tenantId, deviceId, t, pubsubMessage))
.mapEmpty();

}

private Future<String> retrySendToFallbackTopic(final String topic,
final Span currentSpan,
final String tenantId,
final String deviceId,
final Throwable t,
final PubsubMessage pubsubMessage) {
log.debug("Failed to publish to topic {}", topic);
final String fallback = PubSubMessageHelper.getTopicEndpointFromTopic(topic, tenantId);
if (fallback == null) {
logError(currentSpan, topic, tenantId, deviceId, t);
throw new ServerErrorException(tenantId, HttpURLConnection.HTTP_UNAVAILABLE, t);
}
// delete previous publisher
publisherFactory.closePublisher(topic);

final String fallbackTopic = PubSubMessageHelper.getTopicName(fallback, tenantId);
log.debug("Retry sending message to Pub/Sub using the fallback topic [{}]", fallbackTopic);
// retry publish on fallback topic
return getOrCreatePublisher(fallbackTopic).publish(pubsubMessage)
.onSuccess(recordMessage -> logPubSubMessageId(currentSpan, fallbackTopic, recordMessage))
.onFailure(thr -> {
logError(currentSpan, fallbackTopic, tenantId, deviceId, thr);
throw new ServerErrorException(tenantId, HttpURLConnection.HTTP_UNAVAILABLE, thr);
})
.mapEmpty();
}

/**
* Creates a new <em>OpenTracing</em> span to trace publishing messages to Pub/Sub.
*
Expand Down Expand Up @@ -240,8 +262,8 @@ protected PubSubPublisherClient getOrCreatePublisher(final String topic) {
}

private void logPubSubMessageId(final Span span, final String topic, final String messageId) {
log.info("message published to PubSub [topic: {}, id: {}]", topic, messageId);
span.log("message published to PubSub");
log.debug("message published to Pub/Sub [topic: {}, id: {}]", topic, messageId);
span.log("message published to Pub/Sub");

Tags.HTTP_STATUS.set(span, HttpURLConnection.HTTP_ACCEPTED);
}
Expand Down Expand Up @@ -288,8 +310,8 @@ private Map<String, String> encodePropertiesAsPubSubAttributes(final Map<String,
}

private String getStringEncodedValue(final Object value) {
if (value instanceof String) {
return (String) value;
if (value instanceof String val) {
return val;
}
return Json.encode(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
package org.eclipse.hono.client.pubsub;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -39,8 +42,8 @@ public final class PubSubMessageHelper {
public static final String PUBSUB_PROPERTY_RESPONSE_REQUIRED = "response-required";

/**
* Prefix to use in the Pub/Sub message properties for marking properties of command messages that should be included
* in response messages indicating failure to deliver the command.
* Prefix to use in the Pub/Sub message properties for marking properties of command messages that should be
* included in response messages indicating failure to deliver the command.
*/
public static final String DELIVERY_FAILURE_NOTIFICATION_METADATA_PREFIX = "delivery-failure-notification-metadata";

Expand All @@ -50,8 +53,8 @@ private PubSubMessageHelper() {
/**
* Gets the provider for credentials to use for authenticating to the Pub/Sub service.
*
* @return An optional containing a CredentialsProvider to use for authenticating to the Pub/Sub service or an
* empty optional if the given GoogleCredentials is {@code null}.
* @return An optional containing a CredentialsProvider to use for authenticating to the Pub/Sub service or an empty
* optional if the given GoogleCredentials is {@code null}.
*/
public static Optional<CredentialsProvider> getCredentialsProvider() {
return Optional.ofNullable(getCredentials())
Expand All @@ -70,12 +73,78 @@ private static GoogleCredentials getCredentials() {
/**
* Gets the topic name with the given prefix.
*
* @param topic The endpoint of the topic (e.g. event)
* @param prefix The prefix of the Pub/Sub topic, it's either the tenant ID or the adapter instance ID
* @param topic The endpoint of the topic (e.g. event).
* @param prefix The prefix of the Pub/Sub topic, it's either the tenant ID or the adapter instance ID.
* @return The topic containing the prefix identifier and the endpoint.
*/
public static String getTopicName(final String topic, final String prefix) {
return String.format("%s.%s", prefix, topic);
return getTopicName(topic, prefix, new ArrayList<>());
}

/**
* Gets the topic name with the given prefix and subtopics.
*
* @param topic The endpoint of the topic (e.g. event).
* @param prefix The prefix of the Pub/Sub topic, it's either the tenant ID or the adapter instance ID.
* @param subtopics A list with the subtopics.
* @return The topic containing the prefix identifier, the endpoint and all the subtopics.
*/
public static String getTopicName(final String topic, final String prefix, final List<String> subtopics) {
final StringBuilder topicBuilder = new StringBuilder();
topicBuilder.append(prefix).append(".").append(topic);
for (String subtopic : subtopics) {
topicBuilder.append(".").append(subtopic);
}
return topicBuilder.toString();
}

/**
* Gets the subtopics from the message attributes.
*
* @param attributesMap The attribute map.
* @return An ordered list containing all the subtopics.
*/
public static List<String> getSubtopics(final Map<String, String> attributesMap) {
String origAddress = getAttributesValue(attributesMap, MessageHelper.APP_PROPERTY_ORIG_ADDRESS)
.orElse("");
origAddress = origAddress.startsWith("/") ? origAddress.substring(1) : origAddress;
final List<String> origAddressSplit = new ArrayList<>(Arrays.stream(origAddress.split("/")).toList());
// Subtopics are located starting at the 4th position (e.g. event/tenantId/deviceId/subtopic1/subtopic2/...).
if (origAddressSplit.size() < 4) {
return new ArrayList<>();
}
origAddressSplit.subList(0, 3).clear();
// Remove the last entry if it is a metadata property bag.
if (origAddressSplit.get(origAddressSplit.size() - 1).startsWith("?")) {
origAddressSplit.remove(origAddressSplit.size() - 1);
}
return origAddressSplit;
}

/**
* Gets the subFolder from the list of subtopics.
*
* @param subtopics The list of subtopics.
* @return An string containing the subFolder.
*/
public static String getSubFolder(final List<String> subtopics) {
return String.join("/", subtopics);
}

/**
* Gets the topic endpoint as fallback from the given topic including subtopics.
*
* @param topic The topic containing the tenant and subtopic(s), e.g. tenant.telemetry.subtopic.
* @param tenantId The tenant identifier related to the topic.
* @return The original topic endpoint, e.g. telemetry, or {@code null} if no subtopic is defined.
*/
public static String getTopicEndpointFromTopic(final String topic, final String tenantId) {
final String topicWithoutTenant = topic.replace(tenantId, "");
final String[] fallbackTopics = topicWithoutTenant.split("\\.");
if (fallbackTopics.length <= 2) {
return null;
}
return fallbackTopics[1];
}

/**
Expand All @@ -90,6 +159,7 @@ public static byte[] getPayload(final PubsubMessage message) {
Objects.requireNonNull(message);
return message.getData().toByteArray();
}

/**
* Gets the value of the {@link MessageHelper#APP_PROPERTY_DEVICE_ID} attribute.
*
Expand Down Expand Up @@ -152,7 +222,8 @@ public static Optional<String> getSubject(final Map<String, String> attributesMa
}

/**
* Gets the properties of the attributes which starts with the prefix {@value DELIVERY_FAILURE_NOTIFICATION_METADATA_PREFIX}.
* Gets the properties of the attributes which starts with the prefix
* {@value DELIVERY_FAILURE_NOTIFICATION_METADATA_PREFIX}.
*
* @param attributesMap The attributes map to get the value from.
* @return The properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public void setClientSupplier(final Supplier<PubSubPublisherClient> supplier) {
this.clientSupplier = supplier;
}

@Override
public Future<Void> closePublisher(final String topic) {
return removePublisher(topic);
}

@Override
public Future<Void> closePublisher(final String topic, final String prefix) {
final String topicName = PubSubMessageHelper.getTopicName(topic, prefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@
*/
public interface PubSubPublisherFactory {

/**
* Closes the publisher with the given topic if it exists.
* <p>
* This method is expected to be invoked as soon as the publisher is no longer needed.
*
* @param topic The topic of the publisher to remove.
* @return A future that is completed when the close operation completed or a succeeded future if no publisher
* existed with the given topic.
*/
Future<Void> closePublisher(String topic);

/**
* Closes the publisher with the given topic if it exists.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,15 @@ public class AbstractPubSubBasedMessageSenderTest {

private String topic;

private String topicWithSubtopic;

/**
* Sets up the fixture.
*/
@BeforeEach
public void setUp() {
topic = String.format("%s.%s", TENANT_ID, TOPIC_NAME);
topicWithSubtopic = String.format("%s.%s.%s", TENANT_ID, TOPIC_NAME, "subtopic");
factory = mock(PubSubPublisherFactory.class);
sender = new AbstractPubSubBasedMessageSender(factory, TOPIC_NAME, PROJECT_ID, tracer) {
};
Expand Down Expand Up @@ -113,7 +116,7 @@ public void testSendFailedWhenLifecycleStatusIsNotStarted() {
}

/**
* Verifies that the send method throws a {@link ServerErrorException}.
* Verifies that the send method returns a failed future.
*/
@Test
public void testSendFailed() {
Expand All @@ -122,10 +125,34 @@ public void testSendFailed() {
Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_CONFLICT, "client is null")));
when(factory.getOrCreatePublisher(topic)).thenReturn(client);

assertThrows(
ServerErrorException.class,
() -> sender.sendAndWaitForOutcome(topic, TENANT_ID, DEVICE_ID, Buffer.buffer("test payload"),
Map.of(), NoopSpan.INSTANCE));
final var result = sender.sendAndWaitForOutcome(topic, TENANT_ID, DEVICE_ID, Buffer.buffer("test payload"),
Map.of(), NoopSpan.INSTANCE);

assertThat(result.failed()).isTrue();
result.onFailure(t -> assertThat(t.getClass()).isEqualTo(ServerErrorException.class));
}

/**
* Verifies that the send method returns a failed future on the fallback topic.
*/
@Test
public void testSendFailedOnFallbackTopic() {
final PubSubPublisherClient client = mock(PubSubPublisherClient.class);
when(client.publish(Mockito.any(PubsubMessage.class))).thenReturn(
Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_CONFLICT, "client is null")));
when(factory.getOrCreatePublisher(topicWithSubtopic)).thenReturn(client);

final PubSubPublisherClient fallbackClient = mock(PubSubPublisherClient.class);
when(fallbackClient.publish(Mockito.any(PubsubMessage.class))).thenReturn(
Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_CONFLICT, "client is null")));
when(factory.getOrCreatePublisher(topic)).thenReturn(fallbackClient);

final var result = sender.sendAndWaitForOutcome(topicWithSubtopic, TENANT_ID, DEVICE_ID,
Buffer.buffer("test payload"),
Map.of(), NoopSpan.INSTANCE);

assertThat(result.failed()).isTrue();
result.onFailure(t -> assertThat(t.getClass()).isEqualTo(ServerErrorException.class));
}

/**
Expand Down Expand Up @@ -158,6 +185,39 @@ public void testSendSucceeded() {
assertThat(publishedMessage.getValue().getData()).isEqualTo(bs);
}

/**
* Verifies that the send method succeeded on the fallback topic.
*/
@Test
public void testSucceededOnFallbackTopic() {
final byte[] b = new byte[22];
new Random().nextBytes(b);
final ByteString bs = ByteString.copyFrom(b);

final PubSubPublisherClient client = mock(PubSubPublisherClient.class);
when(client.publish(Mockito.any(PubsubMessage.class))).thenReturn(
Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_CONFLICT, "client is null")));
when(factory.getOrCreatePublisher(topicWithSubtopic)).thenReturn(client);

final PubSubPublisherClient fallbackClient = mock(PubSubPublisherClient.class);
when(fallbackClient.publish(Mockito.any(PubsubMessage.class))).thenReturn(Future.succeededFuture());
when(factory.getOrCreatePublisher(topic)).thenReturn(fallbackClient);

final var result = sender.sendAndWaitForOutcome(
topicWithSubtopic,
TENANT_ID,
DEVICE_ID,
Buffer.buffer(b),
Map.of(),
NoopSpan.INSTANCE);

assertThat(result.succeeded()).isTrue();
final var publishedMessage = ArgumentCaptor.forClass(PubsubMessage.class);
Mockito.verify(client).publish(publishedMessage.capture());
assertThat(publishedMessage.getValue().getOrderingKey()).isEqualTo(DEVICE_ID);
assertThat(publishedMessage.getValue().getData()).isEqualTo(bs);
}

private Map<String, Object> getProperties() {
return Map.of(
"device-Id", DEVICE_ID,
Expand Down
Loading

0 comments on commit 8eddc19

Please sign in to comment.