Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP! Playing around with some interface naming #23

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.confluent.parallelconsumer;

import java.util.function.Consumer;

public interface Handler<T> extends Consumer<T> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public JStreamParallelEoSStreamProcessor(org.apache.kafka.clients.consumer.Consu
}

@Override
public Stream<ConsumeProduceResult<K, V, K, V>> pollProduceAndStream(Function<ConsumerRecord<K, V>, List<ProducerRecord<K, V>>> userFunction) {
super.pollAndProduce(userFunction, (result) -> {
public Stream<ConsumeProduceResult<K, V, K, V>> register(Function<ConsumerRecord<K, V>, List<ProducerRecord<K, V>>> userFunction) {
super.register(userFunction, (result) -> {
log.trace("Wrapper callback applied, sending result to stream. Input: {}", result);
this.userProcessResultsStream.add(result);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ static <KK, VV> JStreamParallelStreamProcessor<KK, VV> createJStreamEosStreamPro
}

/**
* Like {@link ParallelEoSStreamProcessor#pollAndProduce} but instead of callbacks, streams the results instead,
* Like {@link ParallelEoSStreamProcessor#register} but instead of callbacks, streams the results instead,
* after the produce result is ack'd by Kafka.
*
* @return a stream of results of applying the function to the polled records
*/
Stream<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> pollProduceAndStream(Function<ConsumerRecord<K, V>,
Stream<ParallelStreamProcessor.ConsumeProduceResult<K, V, K, V>> register(Function<ConsumerRecord<K, V>,
List<ProducerRecord<K, V>>> userFunction);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* @param <K> key consume / produce key type
* @param <V> value consume / produce value type
* @see ParallelEoSStreamProcessor
* @see #poll(Consumer)
* @see #register(Consumer)
*/
// end::javadoc[]
public interface ParallelConsumer<K, V> extends DrainingCloseable {
Expand Down Expand Up @@ -52,7 +52,7 @@ public interface ParallelConsumer<K, V> extends DrainingCloseable {
*
* @param usersVoidConsumptionFunction the function
*/
void poll(Consumer<ConsumerRecord<K, V>> usersVoidConsumptionFunction);
void register(Handler<ConsumerRecord<K, V>> usersVoidConsumptionFunction);

/**
* A simple tuple structure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ private void checkAutoCommitIsDisabled(org.apache.kafka.clients.consumer.Consume
}

@Override
public void poll(Consumer<ConsumerRecord<K, V>> usersVoidConsumptionFunction) {
public void register(Consumer<ConsumerRecord<K, V>> usersVoidConsumptionFunction) {
Function<ConsumerRecord<K, V>, List<Object>> wrappedUserFunc = (record) -> {
log.trace("asyncPoll - Consumed a record ({}), executing void function...", record.offset());
usersVoidConsumptionFunction.accept(record);
Expand All @@ -264,8 +264,8 @@ public void poll(Consumer<ConsumerRecord<K, V>> usersVoidConsumptionFunction) {

@Override
@SneakyThrows
public void pollAndProduce(Function<ConsumerRecord<K, V>, List<ProducerRecord<K, V>>> userFunction,
Consumer<ConsumeProduceResult<K, V, K, V>> callback) {
public void register(Function<ConsumerRecord<K, V>, List<ProducerRecord<K, V>>> userFunction,
Consumer<ConsumeProduceResult<K, V, K, V>> callback) {
// wrap user func to add produce function
Function<ConsumerRecord<K, V>, List<ConsumeProduceResult<K, V, K, V>>> wrappedUserFunc = (consumedRecord) -> {
List<ProducerRecord<K, V>> recordListToProduce = userFunction.apply(consumedRecord);
Expand All @@ -290,9 +290,9 @@ public void pollAndProduce(Function<ConsumerRecord<K, V>, List<ProducerRecord<K,
* Produce a message back to the broker.
* <p>
* Implementation uses the blocking API, performance upgrade in later versions, is not an issue for the common use
* case ({@link #poll(Consumer)}).
* case ({@link #register(Consumer)}).
*
* @see #pollAndProduce(Function, Consumer)
* @see #register(Function, Consumer)
*/
RecordMetadata produceMessage(ProducerRecord<K, V> outMsg) {
// only needed if not using tx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* Parallel message consumer which also can optionally produce 0 or many {@link ProducerRecord} results to be published
* back to Kafka.
*
* @see #pollAndProduce(Function, Consumer)
* @see #register(Function, Consumer)
*/
public interface ParallelStreamProcessor<K, V> extends ParallelConsumer<K, V>, DrainingCloseable {

Expand All @@ -36,8 +36,8 @@ static <KK, VV> ParallelStreamProcessor<KK, VV> createEosStreamProcessor(
* @param callback applied after the produced message is acknowledged by kafka
*/
@SneakyThrows
void pollAndProduce(Function<ConsumerRecord<K, V>, List<ProducerRecord<K, V>>> userFunction,
Consumer<ConsumeProduceResult<K, V, K, V>> callback);
void register(Function<ConsumerRecord<K, V>, List<ProducerRecord<K, V>>> userFunction,
Consumer<ConsumeProduceResult<K, V, K, V>> callback);

/**
* A simple triple structure to capture the set of coinciding data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void offsetsOpenClose() {

// read some messages
var readByOne = new ArrayList<ConsumerRecord<String, String>>();
asyncOne.poll(x -> {
asyncOne.register(x -> {
log.info("Read by consumer ONE: {}", x);
if (x.value().equals("4")) {
log.info("Throwing fake error for message 4");
Expand Down Expand Up @@ -117,7 +117,7 @@ void offsetsOpenClose() {

// read what we're given
var readByThree = new ArrayList<ConsumerRecord<String, String>>();
asyncThree.poll(x -> {
asyncThree.register(x -> {
log.info("Read by consumer THREE: {}", x.value());
readByThree.add(x);
});
Expand Down Expand Up @@ -167,7 +167,7 @@ void correctOffsetVerySimple() {
asyncOne.subscribe(UniLists.of(topic));

var readByOne = new ArrayList<ConsumerRecord<String, String>>();
asyncOne.poll(readByOne::add);
asyncOne.register(readByOne::add);

// the single message is processed
await().untilAsserted(() -> assertThat(readByOne)
Expand All @@ -186,7 +186,7 @@ void correctOffsetVerySimple() {

// read what we're given
var readByThree = new ArrayList<ConsumerRecord<String, String>>();
asyncThree.poll(x -> {
asyncThree.register(x -> {
log.info("Three read: {}", x.value());
readByThree.add(x);
});
Expand Down Expand Up @@ -227,7 +227,7 @@ void largeNumberOfMessagesSmallOffsetBitmap() {

Set<String> failingMessages = UniSets.of("123", "2345", "8765");
var readByOne = new ConcurrentSkipListSet<String>();
asyncOne.poll(x -> {
asyncOne.register(x -> {
String value = x.value();
if (failingMessages.contains(value)) {
log.info("Throwing fake error for message {}", value);
Expand All @@ -254,7 +254,7 @@ void largeNumberOfMessagesSmallOffsetBitmap() {

// read what we're given
var readByThree = new ConcurrentSkipListSet<String>();
asyncThree.poll(x -> {
asyncThree.register(x -> {
log.trace("Three read: {}", x.value());
readByThree.add(x.value());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ void asyncConsumeAndProcess() {
.build();

try (pb) {
async.poll(r -> {
async.register(r -> {
// message processing function
int simulatedCPUMessageProcessingDelay = nextInt(0, 5); // random delay between 0,5
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void load() {

CountDownLatch latch = new CountDownLatch(quantityOfMessagesToProduce);

parallelConsumer.pollAndProduce((rec) -> {
parallelConsumer.register((rec) -> {
ProducerRecord<String, String> mock = mock(ProducerRecord.class);
return UniLists.of(mock);
}, (x) -> {
Expand Down Expand Up @@ -182,7 +182,7 @@ private void testTiming(int numberOfKeys, int quantityOfMessagesToProduce) {

Queue<ConsumerRecord<String, String>> processingCheck = new ConcurrentLinkedQueue<ConsumerRecord<String, String>>();

parallelConsumer.pollAndProduce((rec) -> {
parallelConsumer.register((rec) -> {
processingCheck.add(rec);
int rangeOfTimeSimulatedProcessingTakesMs = 5;
long sleepTime = (long) (Math.random() * rangeOfTimeSimulatedProcessingTakesMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected ParallelEoSStreamProcessor initAsyncConsumer(ParallelConsumerOptions p
@Test
public void testStream() {
var latch = new CountDownLatch(1);
Stream<ConsumeProduceResult<String, String, String, String>> streamedResults = streaming.pollProduceAndStream((record) -> {
Stream<ConsumeProduceResult<String, String, String, String>> streamedResults = streaming.register((record) -> {
ProducerRecord mock = mock(ProducerRecord.class);
log.info("Consumed and produced record ({}), and returning a derivative result to produce to output topic: {}", record, mock);
myRecordProcessingAction.apply(record);
Expand All @@ -71,7 +71,7 @@ public void testStream() {
@Test
public void testConsumeAndProduce() {
var latch = new CountDownLatch(1);
var stream = streaming.pollProduceAndStream((record) -> {
var stream = streaming.register((record) -> {
String apply = myRecordProcessingAction.apply(record);
ProducerRecord<String, String> result = new ProducerRecord<>(OUTPUT_TOPIC, "akey", apply);
log.info("Consumed a record ({}), and returning a derivative result record to be produced: {}", record, result);
Expand Down Expand Up @@ -106,7 +106,7 @@ public void testConsumeAndProduce() {
@Test
public void testFlatMapProduce() {
var latch = new CountDownLatch(1);
var myResultStream = streaming.pollProduceAndStream((record) -> {
var myResultStream = streaming.register((record) -> {
String apply1 = myRecordProcessingAction.apply(record);
String apply2 = myRecordProcessingAction.apply(record);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void setupData() {
@Test
@SneakyThrows
public void failingActionNothingCommitted() {
parallelConsumer.poll((ignore) -> {
parallelConsumer.register((ignore) -> {
throw new RuntimeException("My user's function error");
});

Expand Down Expand Up @@ -100,7 +100,7 @@ public void offsetsAreNeverCommittedForMessagesStillInFlightSimplest() {
var startBarrierLatch = new CountDownLatch(1);

// finish processing only msg 1
parallelConsumer.poll((ignore) -> {
parallelConsumer.register((ignore) -> {
startBarrierLatch.countDown();
int offset = (int) ignore.offset();
awaitLatch(locks, offset);
Expand Down Expand Up @@ -158,7 +158,7 @@ public void offsetsAreNeverCommittedForMessagesStillInFlightLong() {

CountDownLatch startLatch = new CountDownLatch(1);

parallelConsumer.poll((ignore) -> {
parallelConsumer.register((ignore) -> {
int offset = (int) ignore.offset();
CountDownLatch latchForMsg = locks.get(offset);
try {
Expand Down Expand Up @@ -239,7 +239,7 @@ public void offsetCommitsAreIsolatedPerPartition() {

List<CountDownLatch> locks = of(msg0Lock, msg1Lock, msg2Lock, msg3Lock);

parallelConsumer.poll((ignore) -> {
parallelConsumer.register((ignore) -> {
int offset = (int) ignore.offset();
CountDownLatch latchForMsg = locks.get(offset);
try {
Expand Down Expand Up @@ -297,7 +297,7 @@ public void controlFlowException() {
parallelConsumer.setClock(mock);

//
parallelConsumer.poll((ignore) -> {
parallelConsumer.register((ignore) -> {
// ignore
});

Expand All @@ -312,7 +312,7 @@ public void controlFlowException() {
public void testVoid() {
int expected = 1;
var msgCompleteBarrier = new CountDownLatch(expected);
parallelConsumer.poll((record) -> {
parallelConsumer.register((record) -> {
waitForInitialBootstrapCommit();
myRecordProcessingAction.apply(record);
msgCompleteBarrier.countDown();
Expand Down Expand Up @@ -437,7 +437,7 @@ public void processInKeyOrder() {
return o;
}).when(consumerSpy).poll(any());

parallelConsumer.poll((ignore) -> {
parallelConsumer.register((ignore) -> {
int offset = (int) ignore.offset();
CountDownLatch latchForMsg = locks.get(offset);
try {
Expand Down Expand Up @@ -567,7 +567,7 @@ public void processInKeyOrderWorkNotReturnedDoesntBreakCommits() {
return records;
}).when(consumerSpy).poll(any());

parallelConsumer.poll((ignore) -> {
parallelConsumer.register((ignore) -> {
int offset = (int) ignore.offset();
CountDownLatch countDownLatch = locks.get(offset);
if (countDownLatch != null) try {
Expand Down Expand Up @@ -616,7 +616,7 @@ public void processInKeyOrderWorkNotReturnedDoesntBreakCommits() {
public void closeAfterSingleMessageShouldBeEventBasedFast() {
var msgCompleteBarrier = new CountDownLatch(1);

parallelConsumer.poll((ignore) -> {
parallelConsumer.register((ignore) -> {
waitForInitialBootstrapCommit();
log.info("Message processed: {} - noop", ignore.offset());
msgCompleteBarrier.countDown();
Expand Down Expand Up @@ -646,10 +646,10 @@ public void closeWithoutRunningShouldBeEventBasedFast() {

@Test
public void ensureLibraryCantBeUsedTwice() {
parallelConsumer.poll(ignore -> {
parallelConsumer.register(ignore -> {
});
assertThatIllegalStateException().isThrownBy(() -> {
parallelConsumer.poll(ignore -> {
parallelConsumer.register(ignore -> {
});
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void run() {
this.parallelConsumer = setupConsumer();

// tag::example[]
parallelConsumer.poll(record ->
parallelConsumer.register(record ->
log.info("Concurrently processing a record: {}", record)
);
// end::example[]
Expand Down Expand Up @@ -78,7 +78,7 @@ void runPollAndProduce() {
this.parallelConsumer = setupConsumer();

// tag::exampleProduce[]
this.parallelConsumer.pollAndProduce(record -> {
this.parallelConsumer.register(record -> {
var result = processBrokerRecord(record);
ProducerRecord<String, String> produceRecord =
new ProducerRecord<>(outputTopic, "a-key", result.payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/


import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -71,7 +70,7 @@ void startStreams(Topology topology) {
void concurrentProcess() {
setupConsumer();

parallelConsumer.poll(record -> {
parallelConsumer.register(record -> {
log.info("Concurrently processing a record: {}", record);
messageCount.getAndIncrement();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void run() {
getKafkaProducer(), options);

// tag::example[]
var resultStream = parallelConsumer.vertxHttpReqInfoStream(record -> {
var resultStream = parallelConsumer.register(record -> {
log.info("Concurrently constructing and returning RequestInfo from record: {}", record);
Map<String, String> params = UniMaps.of("recordKey", record.key(), "payload", record.value());
return new RequestInfo("localhost", "/api", params); // <1>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public JStreamVertxParallelEoSStreamProcessor(org.apache.kafka.clients.consumer.
}

@Override
public Stream<VertxCPResult<K, V>> vertxHttpReqInfoStream(Function<ConsumerRecord<K, V>, RequestInfo> requestInfoFunction) {
public Stream<VertxCPResult<K, V>> register(Function<ConsumerRecord<K, V>, RequestInfo> requestInfoFunction) {

VertxCPResult.VertxCPResultBuilder<K, V> result = VertxCPResult.builder();

Expand All @@ -84,14 +84,14 @@ public Stream<VertxCPResult<K, V>> vertxHttpReqInfoStream(Function<ConsumerRecor
userProcessResultsStream.add(build);
};

super.vertxHttpReqInfo(requestInfoFunctionWrapped, onSendCallBack, (ignore) -> {
super.register(requestInfoFunctionWrapped, onSendCallBack, (ignore) -> {
});

return stream;
}

@Override
public Stream<VertxCPResult<K, V>> vertxHttpRequestStream(BiFunction<WebClient, ConsumerRecord<K, V>, HttpRequest<Buffer>> webClientRequestFunction) {
public Stream<VertxCPResult<K, V>> register(BiFunction<WebClient, ConsumerRecord<K, V>, HttpRequest<Buffer>> webClientRequestFunction) {

VertxCPResult.VertxCPResultBuilder<K, V> result = VertxCPResult.builder();

Expand All @@ -109,7 +109,7 @@ public Stream<VertxCPResult<K, V>> vertxHttpRequestStream(BiFunction<WebClient,
userProcessResultsStream.add(build);
};

super.vertxHttpRequest(requestInfoFunctionWrapped, onSendCallBack, (ignore) -> {
super.register(requestInfoFunctionWrapped, onSendCallBack, (ignore) -> {
});
return stream;
}
Expand All @@ -135,7 +135,7 @@ public Stream<VertxCPResult<K, V>> vertxHttpWebClientStream(
userProcessResultsStream.add(build);
};

super.vertxHttpWebClient(wrappedFunc, onSendCallBack);
super.register(wrappedFunc, onSendCallBack);

return stream;
}
Expand Down
Loading