Skip to content

Commit

Permalink
fix(citrus-kafka): Fix flaky test
Browse files Browse the repository at this point in the history
  • Loading branch information
christophd committed Feb 11, 2025
1 parent 4fe8865 commit be96fdd
Showing 1 changed file with 137 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,6 @@

package org.citrusframework.kafka.integration;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.citrusframework.actions.ReceiveMessageAction.Builder.receive;
import static org.citrusframework.actions.SendMessageAction.Builder.send;
import static org.citrusframework.actions.SleepAction.Builder.sleep;
import static org.citrusframework.container.Parallel.Builder.parallel;
import static org.citrusframework.kafka.endpoint.KafkaMessageFilter.kafkaMessageFilter;
import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.ValueMatchingStrategy.ENDS_WITH;
import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.ValueMatchingStrategy.STARTS_WITH;
import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.kafkaHeaderContains;
import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.kafkaHeaderEquals;

import java.time.Duration;
import java.util.concurrent.TimeoutException;

Expand All @@ -42,6 +31,17 @@
import org.citrusframework.testng.spring.TestNGCitrusSpringSupport;
import org.testng.annotations.Test;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.citrusframework.actions.ReceiveMessageAction.Builder.receive;
import static org.citrusframework.actions.SendMessageAction.Builder.send;
import static org.citrusframework.actions.SleepAction.Builder.sleep;
import static org.citrusframework.container.Parallel.Builder.parallel;
import static org.citrusframework.kafka.endpoint.KafkaMessageFilter.kafkaMessageFilter;
import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.ValueMatchingStrategy.ENDS_WITH;
import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.ValueMatchingStrategy.STARTS_WITH;
import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.kafkaHeaderContains;
import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.kafkaHeaderEquals;

@Test(singleThreaded = true)
public class KafkaEndpointJavaIT extends TestNGCitrusSpringSupport {

Expand All @@ -61,20 +61,20 @@ public void findKafkaEvent_headerEquals_citrus_DSL() {
var value = "Bilbo";

when(
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, value))
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, value))
);

then(
receive(kafkaWithRandomConsumerGroupEndpoint)
.selector(
kafkaMessageFilter()
.eventLookbackWindow(Duration.ofSeconds(1L))
.kafkaMessageSelector(kafkaHeaderEquals(key, value))
.build()
)
.getMessageBuilderSupport()
.body(body)
receive(kafkaWithRandomConsumerGroupEndpoint)
.selector(
kafkaMessageFilter()
.eventLookbackWindow(Duration.ofSeconds(1L))
.kafkaMessageSelector(kafkaHeaderEquals(key, value))
.build()
)
.getMessageBuilderSupport()
.body(body)
);
}

Expand All @@ -87,20 +87,20 @@ public void findKafkaEvent_headerContains_citrus_DSL() {
var value = "Frodo";

when(
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, value))
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, value))
);

then(
receive(kafkaWithRandomConsumerGroupEndpoint)
.selector(
kafkaMessageFilter()
.eventLookbackWindow(Duration.ofSeconds(1L))
.kafkaMessageSelector(kafkaHeaderContains(key, "odo"))
.build()
)
.getMessageBuilderSupport()
.body(body)
receive(kafkaWithRandomConsumerGroupEndpoint)
.selector(
kafkaMessageFilter()
.eventLookbackWindow(Duration.ofSeconds(1L))
.kafkaMessageSelector(kafkaHeaderContains(key, "odo"))
.build()
)
.getMessageBuilderSupport()
.body(body)
);
}

Expand All @@ -113,26 +113,26 @@ public void findKafkaEvent_headerStartsWith_citrus_DSL() {
var value = "Galadriel";

when(
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, value))
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, value))
);

then(
receive(kafkaWithRandomConsumerGroupEndpoint)
.selector(
kafkaMessageFilter()
.eventLookbackWindow(Duration.ofSeconds(1L))
.kafkaMessageSelector(
KafkaMessageByHeaderSelector.builder()
.key(key)
.value("Gala")
.valueMatchingStrategy(STARTS_WITH)
.build()
)
.build()
)
.getMessageBuilderSupport()
.body(body)
receive(kafkaWithRandomConsumerGroupEndpoint)
.selector(
kafkaMessageFilter()
.eventLookbackWindow(Duration.ofSeconds(1L))
.kafkaMessageSelector(
KafkaMessageByHeaderSelector.builder()
.key(key)
.value("Gala")
.valueMatchingStrategy(STARTS_WITH)
.build()
)
.build()
)
.getMessageBuilderSupport()
.body(body)
);
}

Expand All @@ -145,26 +145,26 @@ public void findKafkaEvent_headerEndsWith_citrus_DSL() {
var value = "Celeborn";

when(
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, value))
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, value))
);

then(
receive(kafkaWithRandomConsumerGroupEndpoint)
.selector(
kafkaMessageFilter()
.eventLookbackWindow(Duration.ofSeconds(1L))
.kafkaMessageSelector(
KafkaMessageByHeaderSelector.builder()
.key(key)
.value("born")
.valueMatchingStrategy(ENDS_WITH)
.build()
)
.build()
)
.getMessageBuilderSupport()
.body(body)
receive(kafkaWithRandomConsumerGroupEndpoint)
.selector(
kafkaMessageFilter()
.eventLookbackWindow(Duration.ofSeconds(1L))
.kafkaMessageSelector(
KafkaMessageByHeaderSelector.builder()
.key(key)
.value("born")
.valueMatchingStrategy(ENDS_WITH)
.build()
)
.build()
)
.getMessageBuilderSupport()
.body(body)
);
}

Expand All @@ -177,20 +177,20 @@ public void findKafkaEvent_nothingFound_noMatch_citrus_DSL() {
var value = "Elrond";

when(
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, value))
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, value))
);

ThrowableAssert.ThrowingCallable receiver = () -> then(
receive(kafkaWithRandomConsumerGroupEndpoint)
.selector(
kafkaMessageFilter()
.eventLookbackWindow(Duration.ofSeconds(1L))
.kafkaMessageSelector(kafkaHeaderEquals(key, "Arwen"))
.build()
)
.getMessageBuilderSupport()
.body(body)
receive(kafkaWithRandomConsumerGroupEndpoint)
.selector(
kafkaMessageFilter()
.eventLookbackWindow(Duration.ofSeconds(1L))
.kafkaMessageSelector(kafkaHeaderEquals(key, "Arwen"))
.build()
)
.getMessageBuilderSupport()
.body(body)
);

assertThatThrownBy(receiver)
Expand All @@ -208,22 +208,22 @@ public void findKafkaEvent_nothingFound_outsideLookbackWindow_citrus_DSL() {
var value = "Gimli";

given(
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, value))
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, value))
);

when(sleep().seconds(2));

ThrowableAssert.ThrowingCallable receiver = () -> then(
receive(kafkaWithRandomConsumerGroupEndpoint)
.selector(
kafkaMessageFilter()
.eventLookbackWindow(Duration.ofSeconds(1L))
.kafkaMessageSelector(kafkaHeaderEquals(key, value))
.build()
)
.getMessageBuilderSupport()
.body(body)
receive(kafkaWithRandomConsumerGroupEndpoint)
.selector(
kafkaMessageFilter()
.eventLookbackWindow(Duration.ofSeconds(1L))
.kafkaMessageSelector(kafkaHeaderEquals(key, value))
.build()
)
.getMessageBuilderSupport()
.body(body)
);

assertThatThrownBy(receiver)
Expand All @@ -240,25 +240,25 @@ public void findKafkaEvent_duplicateEntriesFound_citrus_DSL() {
var key = "Name";

given(
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, "Gandalf the Grey"))
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, "Gandalf the Grey"))
);

when(
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, "Gandalf the White"))
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, "Gandalf the White"))
);

ThrowableAssert.ThrowingCallable receiver = () -> then(
receive(kafkaWithRandomConsumerGroupEndpoint)
.selector(
kafkaMessageFilter()
.eventLookbackWindow(Duration.ofSeconds(1L))
.kafkaMessageSelector(kafkaHeaderContains(key, "Gandalf"))
.build()
)
.getMessageBuilderSupport()
.body(body)
receive(kafkaWithRandomConsumerGroupEndpoint)
.selector(
kafkaMessageFilter()
.eventLookbackWindow(Duration.ofSeconds(1L))
.kafkaMessageSelector(kafkaHeaderContains(key, "Gandalf"))
.build()
)
.getMessageBuilderSupport()
.body(body)
);

assertThatThrownBy(receiver)
Expand All @@ -276,47 +276,53 @@ public void findKafkaEvent_headerEquals_java_DSL() {
var value = "Gollum";

when(
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, value))
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, value))
);

then(
kafkaWithRandomConsumerGroupEndpoint.findKafkaEventHeaderEquals(Duration.ofSeconds(1L), key, value)
.body(body)
kafkaWithRandomConsumerGroupEndpoint.findKafkaEventHeaderEquals(Duration.ofSeconds(1L), key, value)
.body(body)
);
}

@Test
@CitrusTest
public void shutdown_afterTimeout_isThreadSafe() {
KafkaEndpoint kafkaEndpoint = KafkaEndpoint.builder()
.randomConsumerGroup(true)
.topic("names")
.useThreadSafeConsumer()
.build();

var body = "shutdown_afterTimeout_isThreadSafe";

var key = "Name";
var value = "Aragorn";

when(
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, value))
send(kafkaEndpoint)
.message(new KafkaMessage(body).setHeader(key, value))
);

ThrowableAssert.ThrowingCallable receiver = () -> then(
receive(kafkaWithRandomConsumerGroupEndpoint)
.timeout(2_000)
.selector(
kafkaMessageFilter()
.eventLookbackWindow(Duration.ofSeconds(1L))
.kafkaMessageSelector(kafkaHeaderEquals(key, "Samwise"))
.pollTimeout(Duration.ofSeconds(3)) // Note that pollTimeout > overall receive timeout
.build()
)
.getMessageBuilderSupport()
.body(body)
receive(kafkaEndpoint)
.timeout(2_000)
.selector(
kafkaMessageFilter()
.eventLookbackWindow(Duration.ofSeconds(1L))
.kafkaMessageSelector(kafkaHeaderEquals(key, "Samwise"))
.pollTimeout(Duration.ofSeconds(3)) // Note that pollTimeout > overall receive timeout
.build()
)
.getMessageBuilderSupport()
.body(body)
);

assertThatThrownBy(receiver)
.isInstanceOf(TestCaseFailedException.class)
.hasRootCauseInstanceOf(TimeoutException.class)
.hasMessageContaining("Action timeout after 2000 milliseconds. Failed to receive message on endpoint: 'KafkaEndpointJavaIT'");
.hasMessageContaining("Action timeout after 2000 milliseconds. Failed to receive message on endpoint: 'names'");
}

@Test
Expand All @@ -331,23 +337,23 @@ public void threadSafetyOfKafkaConsumer_onParallelAccess() {
var brother2 = "Elrohir";

when(
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, brother1))
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, brother1))
);

when(
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, brother2))
send(kafkaWithRandomConsumerGroupEndpoint)
.message(new KafkaMessage(body).setHeader(key, brother2))
);

then(
parallel()
.actions(
kafkaWithRandomConsumerGroupEndpoint.findKafkaEventHeaderEquals(Duration.ofSeconds(1L), key, brother1)
.body(body),
kafkaWithRandomConsumerGroupEndpoint.findKafkaEventHeaderEquals(Duration.ofSeconds(1L), key, brother2)
.body(body)
)
parallel()
.actions(
kafkaWithRandomConsumerGroupEndpoint.findKafkaEventHeaderEquals(Duration.ofSeconds(1L), key, brother1)
.body(body),
kafkaWithRandomConsumerGroupEndpoint.findKafkaEventHeaderEquals(Duration.ofSeconds(1L), key, brother2)
.body(body)
)
);
}
}

0 comments on commit be96fdd

Please sign in to comment.