Skip to content

Commit

Permalink
ServiceBus: Allowing for null contentType and other optional values (#…
Browse files Browse the repository at this point in the history
…138)

* Allowing for null contentType and other optional values

* spotless formatting

* small static import changes
  • Loading branch information
GoMati-MU authored Oct 10, 2024
1 parent fbb9b9a commit c6044c1
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

import java.util.Map;

import java.util.Optional;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
Expand Down Expand Up @@ -79,23 +81,39 @@ public static SourceRecord mapSingleServiceBusMessage(ServiceBusReceivedMessage
}

private static Struct createStructFromServiceBusMessage(final ServiceBusReceivedMessage serviceBusMessage) {
return new Struct(VALUE_SCHEMA)
.put(DELIVERY_COUNT, serviceBusMessage.getDeliveryCount())
.put(ENQUEUED_TIME_UTC, serviceBusMessage.getEnqueuedTime().toEpochSecond())
.put(CONTENT_TYPE, serviceBusMessage.getContentType())
.put(LABEL, AzureServiceBusSourceConnector.class.getSimpleName())
.put(CORRELATION_ID, serviceBusMessage.getCorrelationId())
.put(PARTITION_KEY, serviceBusMessage.getPartitionKey())
.put(REPLY_TO, serviceBusMessage.getReplyTo())
.put(REPLY_TO_SESSION_ID, serviceBusMessage.getReplyToSessionId())
.put(DEAD_LETTER_SOURCE, serviceBusMessage.getDeadLetterSource())
.put(TIME_TO_LIVE, serviceBusMessage.getTimeToLive().toMillis())
.put(LOCKED_UNTIL_UTC, serviceBusMessage.getLockedUntil().toEpochSecond())
.put(SEQUENCE_NUMBER, serviceBusMessage.getSequenceNumber())
.put(SESSION_ID, serviceBusMessage.getSessionId())
.put(LOCK_TOKEN, serviceBusMessage.getLockToken())
.put(MESSAGE_BODY, serviceBusMessage.getBody().toBytes())
.put(GET_TO, serviceBusMessage.getTo());
Struct struct =
new Struct(VALUE_SCHEMA)
.put(DELIVERY_COUNT, serviceBusMessage.getDeliveryCount())
.put(ENQUEUED_TIME_UTC, serviceBusMessage.getEnqueuedTime().toEpochSecond())
.put(LABEL, AzureServiceBusSourceConnector.class.getSimpleName())
.put(TIME_TO_LIVE, serviceBusMessage.getTimeToLive().toMillis())
.put(MESSAGE_BODY, serviceBusMessage.getBody().toBytes())
.put(GET_TO, serviceBusMessage.getTo());

addOptionalSchemaValues(struct, serviceBusMessage);

return struct;
}

private static void addOptionalSchemaValues(Struct struct, ServiceBusReceivedMessage serviceBusMessage) {
addOptionalSchemaValueIfExists(struct, serviceBusMessage.getContentType(), CONTENT_TYPE);
addOptionalSchemaValueIfExists(struct, serviceBusMessage.getCorrelationId(), CORRELATION_ID);
addOptionalSchemaValueIfExists(struct, serviceBusMessage.getPartitionKey(), PARTITION_KEY);
addOptionalSchemaValueIfExists(struct, serviceBusMessage.getReplyTo(), REPLY_TO);
addOptionalSchemaValueIfExists(struct, serviceBusMessage.getReplyToSessionId(), REPLY_TO_SESSION_ID);
addOptionalSchemaValueIfExists(struct, serviceBusMessage.getDeadLetterSource(), DEAD_LETTER_SOURCE);
addOptionalSchemaValueIfExists(struct, serviceBusMessage.getSequenceNumber(), SEQUENCE_NUMBER);
addOptionalSchemaValueIfExists(struct, serviceBusMessage.getSessionId(), SESSION_ID);
addOptionalSchemaValueIfExists(struct, serviceBusMessage.getLockToken(), LOCK_TOKEN);
addOptionalSchemaValueIfExists(struct, serviceBusMessage.getTo(), GET_TO);

Optional.ofNullable(serviceBusMessage.getLockedUntil())
.ifPresent(lu -> struct.put(LOCKED_UNTIL_UTC, lu.toEpochSecond()));

}

private static void addOptionalSchemaValueIfExists(Struct struct, Object value, Field field) {
Optional.ofNullable(value).ifPresent(val -> struct.put(field, val));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public class ServiceBusValueSchemaField {
static final Field ENQUEUED_TIME_UTC =
new Field(SchemaFieldConstants.ENQUEUED_TIME_UTC, 1, Schema.INT64_SCHEMA);
static final Field CONTENT_TYPE =
new Field(SchemaFieldConstants.CONTENT_TYPE, 2, Schema.STRING_SCHEMA);
new Field(SchemaFieldConstants.CONTENT_TYPE, 2, Schema.OPTIONAL_STRING_SCHEMA);
static final Field LABEL =
new Field(SchemaFieldConstants.LABEL, 3, Schema.STRING_SCHEMA);
new Field(SchemaFieldConstants.LABEL, 3, Schema.OPTIONAL_STRING_SCHEMA);
static final Field CORRELATION_ID =
new Field(SchemaFieldConstants.CORRELATION_ID, 4, Schema.OPTIONAL_STRING_SCHEMA);
static final Field MESSAGE_PROPERTIES =
Expand All @@ -56,7 +56,7 @@ public class ServiceBusValueSchemaField {
static final Field LOCK_TOKEN =
new Field(SchemaFieldConstants.LOCK_TOKEN, 14, Schema.OPTIONAL_STRING_SCHEMA);
static final Field MESSAGE_BODY =
new Field(SchemaFieldConstants.MESSAGE_BODY, 15, Schema.BYTES_SCHEMA);
new Field(SchemaFieldConstants.MESSAGE_BODY, 15, Schema.OPTIONAL_BYTES_SCHEMA);
static final Field GET_TO =
new Field(SchemaFieldConstants.GET_TO, 16, Schema.OPTIONAL_STRING_SCHEMA);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import com.azure.core.util.BinaryData;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
Expand Down Expand Up @@ -84,6 +83,50 @@ void mapSingleSourceRecordWitAllParameters() {

}

@Test
void mapSingleSourceRecordAllowsForOptionalScemaFieldsToBeNull() {
//given
ServiceBusReceivedMessage busMessage = prepareMessageBusWithOnlyRequiredFields();
AzureServiceBusPartitionKey partitionKey = new AzureServiceBusPartitionKey(OUTPUT_TOPIC, PARTITION_KEY);
AzureServiceBusOffsetMarker busOffsetMarker = new AzureServiceBusOffsetMarker(SEQUENCE_NUMBER);

//when
SourceRecord sourceRecord =
ServiceBusToSourceRecordMapper.mapSingleServiceBusMessage(busMessage, OUTPUT_TOPIC, partitionKey,
busOffsetMarker);

//then
assertThat(sourceRecord)
.returns(TIME_NOW.toEpochSecond(), from(SourceRecord::timestamp))
.returns(partitionKey, from(SourceRecord::sourcePartition))
.returns(null, from(SourceRecord::kafkaPartition))
.returns(busOffsetMarker, from(SourceRecord::sourceOffset))
.returns(OUTPUT_TOPIC, from(SourceRecord::topic))
.returns(Schema.STRING_SCHEMA, from(SourceRecord::keySchema))
.returns(ServiceBusToSourceRecordMapper.VALUE_SCHEMA, from(SourceRecord::valueSchema));

Struct valueStruct = (Struct) sourceRecord.value();

assertThat(valueStruct)
.returns(DELIVERY_COUNT, from(v -> v.get(ServiceBusValueSchemaField.DELIVERY_COUNT)))
.returns(TIME_NOW.toEpochSecond(), from(v -> v.get(ServiceBusValueSchemaField.ENQUEUED_TIME_UTC)))
.returns(null, from(v -> v.get(ServiceBusValueSchemaField.CONTENT_TYPE)))
.returns(LABEL, from(v -> v.get(ServiceBusValueSchemaField.LABEL)))
.returns(null, from(v -> v.get(ServiceBusValueSchemaField.CORRELATION_ID)))
.returns(null, from(v -> v.get(ServiceBusValueSchemaField.PARTITION_KEY)))
.returns(null, from(v -> v.get(ServiceBusValueSchemaField.REPLY_TO)))
.returns(null, from(v -> v.get(ServiceBusValueSchemaField.REPLY_TO_SESSION_ID)))
.returns(null, from(v -> v.get(ServiceBusValueSchemaField.DEAD_LETTER_SOURCE)))
.returns(TIME_TO_LIVE.toMillis(), from(v -> v.get(ServiceBusValueSchemaField.TIME_TO_LIVE)))
.returns(null, from(v -> v.get(ServiceBusValueSchemaField.LOCKED_UNTIL_UTC)))
.returns(SEQUENCE_NUMBER, from(v -> v.get(ServiceBusValueSchemaField.SEQUENCE_NUMBER)))
.returns(null, from(v -> v.get(ServiceBusValueSchemaField.SESSION_ID)))
.returns(null, from(v -> v.get(ServiceBusValueSchemaField.LOCK_TOKEN)))
.returns(MESSAGE_BODY, from(v -> v.get(ServiceBusValueSchemaField.MESSAGE_BODY)))
.returns(DELIVERY_COUNT, from(v -> v.get(ServiceBusValueSchemaField.DELIVERY_COUNT)));

}

private void assertMappedStructValues(Struct valueStruct) {
assertThat(valueStruct)
.returns(DELIVERY_COUNT, from(v -> v.get(ServiceBusValueSchemaField.DELIVERY_COUNT)))
Expand All @@ -110,22 +153,48 @@ private ServiceBusReceivedMessage prepareMessageBusWithAllConsumedFields() {
BinaryData bodyBinary = mock(BinaryData.class);
when(bodyBinary.toBytes()).thenReturn(MESSAGE_BODY);

Mockito.when(busReceivedMessage.getMessageId()).thenReturn(MESSAGE_ID);
Mockito.when(busReceivedMessage.getDeliveryCount()).thenReturn(DELIVERY_COUNT);
Mockito.when(busReceivedMessage.getEnqueuedTime()).thenReturn(TIME_NOW);
Mockito.when(busReceivedMessage.getContentType()).thenReturn(CONTENT_TYPE);
Mockito.when(busReceivedMessage.getCorrelationId()).thenReturn(CORRELATION_ID);
Mockito.when(busReceivedMessage.getPartitionKey()).thenReturn(PARTITION_KEY);
Mockito.when(busReceivedMessage.getReplyTo()).thenReturn(REPLY_TO);
Mockito.when(busReceivedMessage.getReplyToSessionId()).thenReturn(REPLY_TO_SESSION_ID);
Mockito.when(busReceivedMessage.getDeadLetterSource()).thenReturn(DEAD_LETTER_SOURCE);
Mockito.when(busReceivedMessage.getTimeToLive()).thenReturn(TIME_TO_LIVE);
Mockito.when(busReceivedMessage.getLockedUntil()).thenReturn(TIME_NOW);
Mockito.when(busReceivedMessage.getSequenceNumber()).thenReturn(SEQUENCE_NUMBER);
Mockito.when(busReceivedMessage.getSessionId()).thenReturn(SESSION_ID);
Mockito.when(busReceivedMessage.getLockToken()).thenReturn(LOCK_TOKEN);
Mockito.when(busReceivedMessage.getBody()).thenReturn(bodyBinary);
Mockito.when(busReceivedMessage.getTo()).thenReturn(GET_TO);
when(busReceivedMessage.getMessageId()).thenReturn(MESSAGE_ID);
when(busReceivedMessage.getDeliveryCount()).thenReturn(DELIVERY_COUNT);
when(busReceivedMessage.getEnqueuedTime()).thenReturn(TIME_NOW);
when(busReceivedMessage.getContentType()).thenReturn(CONTENT_TYPE);
when(busReceivedMessage.getCorrelationId()).thenReturn(CORRELATION_ID);
when(busReceivedMessage.getPartitionKey()).thenReturn(PARTITION_KEY);
when(busReceivedMessage.getReplyTo()).thenReturn(REPLY_TO);
when(busReceivedMessage.getReplyToSessionId()).thenReturn(REPLY_TO_SESSION_ID);
when(busReceivedMessage.getDeadLetterSource()).thenReturn(DEAD_LETTER_SOURCE);
when(busReceivedMessage.getTimeToLive()).thenReturn(TIME_TO_LIVE);
when(busReceivedMessage.getLockedUntil()).thenReturn(TIME_NOW);
when(busReceivedMessage.getSequenceNumber()).thenReturn(SEQUENCE_NUMBER);
when(busReceivedMessage.getSessionId()).thenReturn(SESSION_ID);
when(busReceivedMessage.getLockToken()).thenReturn(LOCK_TOKEN);
when(busReceivedMessage.getBody()).thenReturn(bodyBinary);
when(busReceivedMessage.getTo()).thenReturn(GET_TO);

return busReceivedMessage;
}

private ServiceBusReceivedMessage prepareMessageBusWithOnlyRequiredFields() {
ServiceBusReceivedMessage busReceivedMessage = mock(ServiceBusReceivedMessage.class);

BinaryData bodyBinary = mock(BinaryData.class);
when(bodyBinary.toBytes()).thenReturn(MESSAGE_BODY);

when(busReceivedMessage.getMessageId()).thenReturn(MESSAGE_ID);
when(busReceivedMessage.getDeliveryCount()).thenReturn(DELIVERY_COUNT);
when(busReceivedMessage.getEnqueuedTime()).thenReturn(TIME_NOW);
when(busReceivedMessage.getContentType()).thenReturn(null);
when(busReceivedMessage.getCorrelationId()).thenReturn(null);
when(busReceivedMessage.getPartitionKey()).thenReturn(null);
when(busReceivedMessage.getReplyTo()).thenReturn(null);
when(busReceivedMessage.getReplyToSessionId()).thenReturn(null);
when(busReceivedMessage.getDeadLetterSource()).thenReturn(null);
when(busReceivedMessage.getTimeToLive()).thenReturn(TIME_TO_LIVE);
when(busReceivedMessage.getLockedUntil()).thenReturn(null);
when(busReceivedMessage.getSequenceNumber()).thenReturn(SEQUENCE_NUMBER);
when(busReceivedMessage.getSessionId()).thenReturn(null);
when(busReceivedMessage.getLockToken()).thenReturn(null);
when(busReceivedMessage.getBody()).thenReturn(bodyBinary);
when(busReceivedMessage.getTo()).thenReturn(null);

return busReceivedMessage;
}
Expand Down

0 comments on commit c6044c1

Please sign in to comment.