Skip to content

Commit

Permalink
more testing for ordered with start time or start sequence
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Nov 15, 2024
1 parent 968d3c5 commit ddf7e8d
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions src/test/java/io/nats/client/impl/SimplificationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public void testIterableConsumer() throws Exception {
int stopCount = 500;
// create the consumer then use it
try (IterableConsumer consumer = consumerContext.iterate()) {
_testIterable(js, stopCount, consumer, tsc.subject());
_testIterableBasic(js, stopCount, consumer, tsc.subject());
}

// coverage
Expand Down Expand Up @@ -327,12 +327,12 @@ public void testOrderedIterableConsumerBasic() throws Exception {
OrderedConsumerConfiguration occ = new OrderedConsumerConfiguration().filterSubject(tsc.subject());
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
try (IterableConsumer consumer = occtx.iterate()) {
_testIterable(js, stopCount, consumer, tsc.subject());
_testIterableBasic(js, stopCount, consumer, tsc.subject());
}
});
}

private static void _testIterable(JetStream js, int stopCount, IterableConsumer consumer, String subject) throws InterruptedException {
private static void _testIterableBasic(JetStream js, int stopCount, IterableConsumer consumer, String subject) throws InterruptedException {
AtomicInteger count = new AtomicInteger();
Thread consumeThread = new Thread(() -> {
try {
Expand Down Expand Up @@ -662,14 +662,14 @@ public void testOrderedBehaviorNext() throws Exception {

// New pomm factory in place before each subscription is made
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new;
orderedNext(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject()));
_testOrderedNext(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject()));

((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new;
orderedNext(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
_testOrderedNext(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
.deliverPolicy(DeliverPolicy.ByStartTime).startTime(startTime));

((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedNextTestDropSimulator::new;
orderedNext(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
_testOrderedNext(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
.deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(2));
});
}
Expand All @@ -683,7 +683,7 @@ private static ZonedDateTime getStartTimeFirstMessage(JetStream js, TestingStrea
return startTime;
}

private static void orderedNext(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws IOException, JetStreamApiException, InterruptedException, JetStreamStatusCheckedException {
private static void _testOrderedNext(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws IOException, JetStreamApiException, InterruptedException, JetStreamStatusCheckedException {
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
// Loop through the messages to make sure I get stream sequence 1 to 6
while (expectedStreamSeq <= 6) {
Expand Down Expand Up @@ -733,21 +733,21 @@ public void testOrderedBehaviorFetch() throws Exception {
// Set the Consumer Sequence For Stream Sequence 3 statically for ease
CS_FOR_SS_3 = 3;
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
orderedFetch(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject()));
_testOrderedFetch(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject()));

CS_FOR_SS_3 = 2;
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
orderedFetch(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
_testOrderedFetch(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
.deliverPolicy(DeliverPolicy.ByStartTime).startTime(startTime));

CS_FOR_SS_3 = 2;
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
orderedFetch(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
_testOrderedFetch(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
.deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(2));
});
}

private static void orderedFetch(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws Exception {
private static void _testOrderedFetch(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws Exception {
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
FetchConsumeOptions fco = FetchConsumeOptions.builder().maxMessages(6).expiresIn(1000).build();
try (FetchConsumer fcon = occtx.fetch(fco)) {
Expand Down Expand Up @@ -791,21 +791,21 @@ public void testOrderedBehaviorIterable() throws Exception {
// Set the Consumer Sequence For Stream Sequence 3 statically for ease
CS_FOR_SS_3 = 3;
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
orderedIterate(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject()));
_testOrderedIterate(sctx, 1, new OrderedConsumerConfiguration().filterSubject(tsc.subject()));

CS_FOR_SS_3 = 2;
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
orderedIterate(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
_testOrderedIterate(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
.deliverPolicy(DeliverPolicy.ByStartTime).startTime(startTime));

CS_FOR_SS_3 = 2;
((NatsJetStream)js)._pullOrderedMessageManagerFactory = PullOrderedTestDropSimulator::new;
orderedIterate(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
_testOrderedIterate(sctx, 2, new OrderedConsumerConfiguration().filterSubject(tsc.subject())
.deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(2));
});
}

private static void orderedIterate(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws Exception {
private static void _testOrderedIterate(StreamContext sctx, int expectedStreamSeq, OrderedConsumerConfiguration occ) throws Exception {
OrderedConsumerContext occtx = sctx.createOrderedConsumer(occ);
try (IterableConsumer icon = occtx.iterate()) {
// Loop through the messages to make sure I get stream sequence 1 to 5
Expand Down

0 comments on commit ddf7e8d

Please sign in to comment.