Java idiomatic client for Pub/Sub Lite Kafka Shim.
Note: This client is a work-in-progress, and may occasionally make backwards-incompatible changes.
If you are using Maven, add this to your pom.xml file:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-kafka</artifactId>
<version>0.2.3</version>
</dependency>
If you are using Gradle without BOM, add this to your dependencies
compile 'com.google.cloud:pubsublite-kafka:0.2.3'
If you are using SBT, add this to your dependencies
libraryDependencies += "com.google.cloud" % "pubsublite-kafka" % "0.2.3"
See the Authentication section in the base directory's README.
You will need a Google Cloud Platform Console project with the Pub/Sub Lite Kafka Shim API enabled.
You will need to enable billing to use Google Pub/Sub Lite Kafka Shim.
Follow these instructions to get your project set up. You will also need to set up the local development environment by
installing the Google Cloud SDK and running the following commands in command line:
gcloud auth login
and gcloud config set project [YOUR PROJECT ID]
.
You'll need to obtain the pubsublite-kafka
library. See the Quickstart section
to add pubsublite-kafka
as a dependency in your code.
Because Google Cloud Pub/Sub Lite provides partitioned zonal data storage with predefined capacity, a large portion of the Kafka Producer/Consumer API can be implemented using Pub/Sub Lite as a backend. The key differences are:
- Pub/Sub Lite does not support transactions. All transaction methods on
Producer<byte[], byte[]>
will raise an exception. - Producers operate on a single topic, and Consumers on a single subscription.
- ProducerRecord may not specify partition explicitly.
- Consumers may not dynamically create consumer groups (subscriptions).
Consumer.offsetsForTimes
andConsumer.endOffsets
will raise an exception.
With Pub/Sub Lite, you can use a Producer<byte[], byte[]>
to publish messages:
import com.google.cloud.pubsublite.kafka.ProducerSettings;
import org.apache.kafka.clients.producer.*;
import com.google.cloud.pubsublite.*;
...
private final static String ZONE = "us-central1-b";
private final static Long PROJECT_NUM = 123L;
...
TopicPath topic = TopicPath.newBuilder()
.setLocation(CloudZone.parse(ZONE))
.setProject(ProjectNumber.of(PROJECT_NUM))
.setName(TopicName.of("my-topic")).build();
ProducerSettings settings = ProducerSettings.newBuilder()
.setTopicPath(topic)
.build();
try (Producer<byte[], byte[]> producer = settings.instantiate()) {
Future<RecordMetadata> sent = producer.send(new ProducerRecord(
topic.toString(), // Required to be the same topic.
"key".getBytes(),
"value".getBytes()
));
RecordMetadata meta = sent.get();
}
With Pub/Sub Lite you can receive messages using a Consumer<byte[], byte[]>
:
import com.google.cloud.pubsublite.kafka.ConsumerSettings;
import org.apache.kafka.clients.consumer.*;
import com.google.cloud.pubsublite.*;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
...
private final static String ZONE = "us-central1-b";
private final static Long PROJECT_NUM = 123L;
...
SubscriptionPath subscription = SubscriptionPath.newBuilder()
.setLocation(CloudZone.parse(ZONE))
.setProject(ProjectNumber.of(PROJECT_NUM))
.setName(SubscriptionName.of("my-sub"))
.build();
ConsumerSettings settings = ConsumerSettings.newBuilder()
.setSubscriptionPath(subscription)
.setPerPartitionFlowControlSettings(FlowControlSettings.builder()
.setBytesOutstanding(10_000_000) // 10 MB
.setMessagesOutstanding(Long.MAX_VALUE)
.build())
.setAutocommit(true);
try (Consumer<byte[], byte[]> consumer = settings.instantiate()) {
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.println(record.offset() + “: ” + record.value());
}
}
} catch (WakeupException e) {
// ignored
}
Samples are in the samples/
directory. The samples' README.md
has instructions for running the samples.
Sample | Source Code | Try it |
---|---|---|
Consumer Example | source code | |
Producer Example | source code |
To get help, follow the instructions in the shared Troubleshooting document.
Pub/Sub Lite Kafka Shim uses gRPC for the transport layer.
Java 8 or above is required for using this client.
This library follows Semantic Versioning.
It is currently in major version zero (0.y.z
), which means that anything may change at any time
and the public API should not be considered stable.
Contributions to this library are always welcome and highly encouraged.
See CONTRIBUTING for more information how to get started.
Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms. See Code of Conduct for more information.
Apache 2.0 - See LICENSE for more information.
Java Version | Status |
---|---|
Java 8 | |
Java 8 OSX | |
Java 8 Windows | |
Java 11 |
Java is a registered trademark of Oracle and/or its affiliates.