From f140aaa61c7133c9afacfdf53ecafb70ca63ff72 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 16 Dec 2024 20:52:57 -0800 Subject: [PATCH 1/8] MINOR: remove ZK from code base --- pom.xml | 19 ++-- .../WikipediaFeedAvroLambdaExampleTest.java | 36 +++--- .../kafka/EmbeddedSingleNodeKafkaCluster.java | 107 ++++++++---------- .../examples/streams/kafka/KafkaEmbedded.java | 101 +++++++---------- .../util/MicroserviceTestUtils.java | 3 +- .../streams/zookeeper/ZooKeeperEmbedded.java | 71 ------------ 6 files changed, 117 insertions(+), 220 deletions(-) delete mode 100644 src/test/java/io/confluent/examples/streams/zookeeper/ZooKeeperEmbedded.java diff --git a/pom.xml b/pom.xml index 8e148e12cb..82fd25c3b8 100644 --- a/pom.xml +++ b/pom.xml @@ -124,14 +124,6 @@ ${scala.version} - - - com.101tec - zkclient - 0.9 - org.slf4j slf4j-reload4j @@ -181,6 +173,11 @@ jackson-databind ${jackson.version} + + jakarta.xml.bind + jakarta.xml.bind-api + 4.0.1 + From 83c4ae1318853baf64998eee5933a92e53fe8d42 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 17 Dec 2024 14:19:28 -0800 Subject: [PATCH 3/8] Update JDK to 17 --- .semaphore/semaphore.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index b1ac914c31..c16d6d6def 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -155,7 +155,7 @@ after_pipeline: - name: SonarQube commands: - checkout - - sem-version java 11 + - sem-version java 17 - artifact pull workflow target-AMD - artifact pull workflow target-ARM - emit-sonarqube-data --run_only_sonar_scan From 16607c981881c1db419e964539913ebbddd62929 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Wed, 18 Dec 2024 21:03:20 -0800 Subject: [PATCH 4/8] fix tests --- .../streams/kafka/EmbeddedSingleNodeKafkaCluster.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/test/java/io/confluent/examples/streams/kafka/EmbeddedSingleNodeKafkaCluster.java b/src/test/java/io/confluent/examples/streams/kafka/EmbeddedSingleNodeKafkaCluster.java index c02e267896..442ac4af4b 100644 --- a/src/test/java/io/confluent/examples/streams/kafka/EmbeddedSingleNodeKafkaCluster.java +++ b/src/test/java/io/confluent/examples/streams/kafka/EmbeddedSingleNodeKafkaCluster.java @@ -132,6 +132,10 @@ protected void after() { * Stops the cluster. */ public void stop() { + if (!running) { + log.info("Confluent is already stopped"); + return; + } log.info("Stopping Confluent"); try { if (schemaRegistry != null) { From 29375cc64daf1e6c293ed3149b9156a05b9198a3 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Thu, 19 Dec 2024 10:13:50 -0800 Subject: [PATCH 5/8] Remove algebird example --- .../examples/streams/algebird/CMSStore.scala | 271 -------------- .../streams/algebird/CMSStoreBuilder.scala | 75 ---- .../algebird/CMSStoreChangeLogger.scala | 52 --- .../algebird/ProbabilisticCounter.scala | 27 -- .../streams/algebird/TopCMSSerde.scala | 81 ----- ...bilisticCountingScalaIntegrationTest.scala | 122 ------- .../streams/algebird/CMSStoreTest.scala | 340 ------------------ .../streams/algebird/TopCMSSerdeTest.scala | 80 ----- 8 files changed, 1048 deletions(-) delete mode 100644 src/main/scala/io/confluent/examples/streams/algebird/CMSStore.scala delete mode 100644 src/main/scala/io/confluent/examples/streams/algebird/CMSStoreBuilder.scala delete mode 100644 src/main/scala/io/confluent/examples/streams/algebird/CMSStoreChangeLogger.scala delete mode 100644 src/main/scala/io/confluent/examples/streams/algebird/ProbabilisticCounter.scala delete mode 100644 src/main/scala/io/confluent/examples/streams/algebird/TopCMSSerde.scala delete mode 100644 src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala delete mode 100644 src/test/scala/io/confluent/examples/streams/algebird/CMSStoreTest.scala delete mode 100644 src/test/scala/io/confluent/examples/streams/algebird/TopCMSSerdeTest.scala diff --git a/src/main/scala/io/confluent/examples/streams/algebird/CMSStore.scala b/src/main/scala/io/confluent/examples/streams/algebird/CMSStore.scala deleted file mode 100644 index cc38cdcb84..0000000000 --- a/src/main/scala/io/confluent/examples/streams/algebird/CMSStore.scala +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Copyright Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.examples.streams.algebird - -import com.twitter.algebird.{CMSHasher, TopCMS, TopPctCMS} -import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.processor.{StateStore, StateStoreContext} -import org.apache.kafka.streams.state.StateSerdes - -/** - * An in-memory store that leverages the Count-Min Sketch implementation of - * [[https://github.com/twitter/algebird Twitter Algebird]]. - * - * This store allows you to probabilistically count items of type T with a - * [[https://en.wikipedia.org/wiki/Count%E2%80%93min_sketch Count-Min Sketch]] data structure. - * Here, the counts returned by the store will be approximate counts, i.e. estimations, because a - * Count-Min Sketch trades slightly inaccurate counts for greatly reduced space utilization - * (however, the estimation error is mathematically proven to be bounded). - * With probability at least `1 - delta`, this estimate is within `eps * N` of the true frequency - * (i.e., `true frequency <= estimate <= true frequency + eps * N`), where `N` is the total number - * of items counted ("seen" in the input) so far (cf. [[CMSStore#totalCount]]). - * - * A traditional Count-Min Sketch is a fixed-size data structure that is essentially an array of - * counters of a particular width (derived from the parameter `eps`) and depth (derived from the - * parameter `delta`). The CMS variant used in this store, [[TopPctCMS]], additionally tracks the - * so-called "heavy hitters" among the counted items (i.e. the items with the largest counts) based - * on a percentage threshold; the size of heavy hitters is still bounded, however, hence the total - * size of the [[TopPctCMS]] data structure is still fixed. - * - * =Fault-tolerance= - * - * This store supports changelogging its state to Kafka and is thus fault-tolerant. Every time the - * store is flushed (cf. [[org.apache.kafka.streams.StreamsConfig.COMMIT_INTERVAL_MS_CONFIG]]) the - * underlying CMS data structure is written to the store's changelog topic. For many use cases - * this approach should be sufficiently efficient because the absolute size of a CMS is typically - * rather small (a few KBs up to a megabyte, depending on the CMS settings, which are determined by - * e.g. your error bound requirements for approximate counts). - * - * =Usage= - * - * Note: Twitter Algebird is best used with Scala, so all the examples below are in Scala, too. - * - * In a Kafka Streams application, you'd typically create this store as such: - * - * {{{ - * val builder: StreamsBuilder = new StreamsBuilder() - * - * // In this example, we create a store for type [[String]]. - * // It's recommended to reduce Kafka's log segment size for the changelogs of CMS stores, which - * // you can do by passing the respective Kafka setting to the CMSStoreBuilder via `withLoggingEnabled()`. - * builder.addStateStore(new CMSStoreBuilder[String]("my-cms-store-name", Serdes.String())) - * }}} - * - * Then you'd use the store within a [[org.apache.kafka.streams.processor.Processor]] or a - * [[org.apache.kafka.streams.kstream.Transformer]] similar to: - * - * {{{ - * class ProbabilisticCounter extends Transformer[Array[Byte], String, KeyValue[String, Long]] { - * - * private var cmsState: CMSStore[String] = _ - * private var processorContext: ProcessorContext = _ - * - * override def init(processorContext: ProcessorContext): Unit = { - * this.processorContext = processorContext - * cmsState = this.processorContext.getStateStore("my-cms-store-name").asInstanceOf[CMSStore[String]] - * } - * - * override def transform(key: Array[Byte], value: String): KeyValue[String, Long] = { - * // Count the record value, think: "+ 1" - * cmsState.put(value) - * - * // Emit the latest count estimate for the record value - * KeyValue.pair[String, Long](value, cmsState.get(value)) - * } - * - * override def punctuate(l: Long): KeyValue[String, Long] = null - * - * override def close(): Unit = {} - * } - * }}} - * - * @param name The name of this store instance - * @param loggingEnabled Whether or not changelogging (fault-tolerance) is enabled for this store. - * @param delta CMS parameter: A bound on the probability that a query estimate does not - * lie within some small interval (an interval that depends on `eps`) around - * the truth. - * See [[TopPctCMS]] and [[com.twitter.algebird.CMSMonoid]]. - * @param eps CMS parameter: One-sided error bound on the error of each point query, - * i.e. frequency estimate. - * See [[TopPctCMS]] and [[com.twitter.algebird.CMSMonoid]]. - * @param seed CMS parameter: A seed to initialize the random number generator used to - * create the pairwise independent hash functions. Typically you do not - * need to change this. - * See [[TopPctCMS]] and [[com.twitter.algebird.CMSMonoid]]. - * @param heavyHittersPct CMS parameter: A threshold for finding heavy hitters, i.e., items that - * appear at least (heavyHittersPct * totalCount) times in the stream. - * Every item that appears at least `(heavyHittersPct * totalCount)` times - * is included, and with probability `p >= 1 - delta`, no item whose count - * is less than `(heavyHittersPct - eps) * totalCount` is included. - * This also means that this parameter is an upper bound on the number of - * heavy hitters that will be tracked: the set of heavy hitters contains at - * most `1 / heavyHittersPct` elements. For example, if - * `heavyHittersPct=0.01` (or 0.25), then at most `1 / 0.01 = 100` items - * or `1 / 0.25 = 4` items) will be tracked/returned as heavy hitters. - * This parameter can thus control the memory footprint required for - * tracking heavy hitters. - * See [[TopPctCMS]] and [[com.twitter.algebird.TopPctCMSMonoid]]. - * @tparam T The type used to identify the items to be counted with the CMS. For example, if - * you want to count the occurrence of user names, you could use count user names - * directly with `T=String`; alternatively, you could map each username to a unique - * numeric ID expressed as a `Long`, and then count the occurrences of those `Long`s with - * a CMS of type `T=Long`. Note that such a mapping between the items of your problem - * domain and their identifiers used for counting via CMS should be bijective. - * We require a [[CMSHasher]] context bound for `K`, see [[CMSHasher]] for available - * implicits that can be imported. - * See [[com.twitter.algebird.CMSMonoid]] for further information. - */ -class CMSStore[T: CMSHasher](override val name: String, - val loggingEnabled: Boolean = true, - val delta: Double = 1E-10, - val eps: Double = 0.001, - val seed: Int = 1, - val heavyHittersPct: Double = 0.01) - extends StateStore { - - private val cmsMonoid = TopPctCMS.monoid[T](eps, delta, seed, heavyHittersPct) - - /** - * The "storage backend" of this store. - * - * Needs proper initializing in case the store's changelog is empty. - */ - private var cms: TopCMS[T] = cmsMonoid.zero - - private var timestampOfLastStateStoreUpdate: Long = 0L - - private var changeLogger: CMSStoreChangeLogger[Integer, TopCMS[T]] = _ - - /** - * The record key used to write to the state's changelog. - * - * This key can be a constant because: - * - * 1. We always write the full CMS when writing to the changelog. - * 2. A CMS does not retain information about which items were counted, i.e. it does not track - * information about the keyspace (in the case of this store, the only information about the - * keyspace are the heavy hitters); so, unless we opted for a different approach than (1) - * above, we cannot leverage keyspace information anyways. - * 3. We use a [[CMSStoreChangeLogger]] that uses a stream task's - * [[org.apache.kafka.streams.processor.TaskId]] to identify the changelog partition to write to. - * Thus only one particular stream task will ever be writing to that changelog partition. - * 4. When restoring from the changelog, a stream task will read only its own (one) changelog - * partition. - * - * In other words, we can hardcode the changelog key because only the "right" stream task will be - * (a) writing to AND (b) reading from the respective partition of the changelog. - */ - private[algebird] val changelogKey = 42 - - /** - * For unit testing - */ - private[algebird] def cmsFrom(items: Seq[T]): TopCMS[T] = cmsMonoid.create(items) - - /** - * For unit testing - */ - private[algebird] def cmsFrom(item: T): TopCMS[T] = cmsMonoid.create(item) - - @volatile private var open: Boolean = false - - /** - * Initializes this store, including restoring the store's state from its changelog. - */ - override def init(context: StateStoreContext, root: StateStore): Unit = { - val serdes = new StateSerdes[Integer, TopCMS[T]]( - name, - Serdes.Integer(), - TopCMSSerde[T]) - changeLogger = new CMSStoreChangeLogger[Integer, TopCMS[T]](name, context, serdes, name) - - // Note: We must manually guard with `loggingEnabled` here because `context.register()` ignores - // that parameter. - if (root != null && loggingEnabled) { - context.register(root, (_, value) => { - if (value == null) { - cms = cmsMonoid.zero - } - else { - cms = serdes.valueFrom(value) - } - }) - } - - open = true - } - - /** - * Returns the estimated count of the item. - * - * @param item item to be counted - * @return estimated count - */ - def get(item: T): Long = cms.frequency(item).estimate - - /** - * Counts the item. - * - * @param item item to be counted - */ - def put(item: T, timestamp: Long): Unit = { - cms = cms + item - timestampOfLastStateStoreUpdate = timestamp - } - - /** - * The top items counted so far, with the percentage-based cut-off being defined by the CMS - * parameter `heavyHittersPct`. - * - * @return the top items counted so far - */ - def heavyHitters: Set[T] = cms.heavyHitters - - /** - * Returns the total number of items counted ("seen" in the input) so far. - * - * This number is not the same as the total number of unique items counted so far, i.e. - * it is not the cardinality of the set of items. - * - * Example: After having counted the input "foo", "bar", "foo", the return value would be 3. - * - * @return number of count operations so far - */ - def totalCount: Long = cms.totalCount - - override val persistent: Boolean = false - - override def isOpen: Boolean = open - - /** - * Periodically saves the latest CMS state to Kafka. - * - * =Implementation detail= - * - * The changelog records have the form: (hardcodedKey, CMS). That is, we are backing up the - * underlying CMS data structure in its entirety to Kafka. - */ - override def flush(): Unit = { - if (loggingEnabled) { - changeLogger.logChange(changelogKey, cms, timestampOfLastStateStoreUpdate) - } - } - - override def close(): Unit = { - open = false - } - -} \ No newline at end of file diff --git a/src/main/scala/io/confluent/examples/streams/algebird/CMSStoreBuilder.scala b/src/main/scala/io/confluent/examples/streams/algebird/CMSStoreBuilder.scala deleted file mode 100644 index 4559f04c81..0000000000 --- a/src/main/scala/io/confluent/examples/streams/algebird/CMSStoreBuilder.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.examples.streams.algebird - -import java.util - -import com.twitter.algebird.CMSHasher -import org.apache.kafka.common.serialization.Serde -import org.apache.kafka.streams.state.StoreBuilder - -/** - * A factory for Kafka Streams to instantiate a [[CMSStore]]. - * - * =Usage= - * - * The [[CMSStore]]'s changelog will typically have rather few and small records per partition. - * To improve efficiency we thus set a smaller log segment size (`segment.bytes`) than Kafka's - * default of 1GB. - * - * {{{ - * val changelogConfig = { - * val cfg = new java.util.HashMap[String, String] - * val segmentSizeBytes = (20 * 1024 * 1024).toString - * cfg.put("segment.bytes", segmentSizeBytes) - * cfg - * } - * new CMSStoreBuilder[String](cmsStoreName, Serdes.String()).withLoggingEnabled(changelogConfig) - * }}} - */ -class CMSStoreBuilder[T: CMSHasher](val name: String, - val serde: Serde[T]) - extends StoreBuilder[CMSStore[T]] { - - var loggingEnabled = false - var logConfig : util.Map[String, String] = new util.HashMap[String, String]() - - - override def build(): CMSStore[T] = new CMSStore[T](name, loggingEnabled) - - override def withCachingEnabled() = throw new UnsupportedOperationException("caching not supported") - - override def withCachingDisabled() = throw new UnsupportedOperationException("caching not supported") - - /** - * To enable fault-tolerance for the [[CMSStore]]. - */ - override def withLoggingEnabled(config: util.Map[String, String]): CMSStoreBuilder[T] = { - loggingEnabled = true - logConfig.clear() - logConfig.putAll(config) - this - } - - /** - * To disable fault-tolerance for the [[CMSStore]]. - */ - override def withLoggingDisabled(): CMSStoreBuilder[T] = { - loggingEnabled = false - logConfig.clear() - this - } -} diff --git a/src/main/scala/io/confluent/examples/streams/algebird/CMSStoreChangeLogger.scala b/src/main/scala/io/confluent/examples/streams/algebird/CMSStoreChangeLogger.scala deleted file mode 100644 index 4bd0d0f0a9..0000000000 --- a/src/main/scala/io/confluent/examples/streams/algebird/CMSStoreChangeLogger.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.examples.streams.algebird - -import org.apache.kafka.streams.processor.StateStoreContext -import org.apache.kafka.streams.processor.internals.{InternalProcessorContext, ProcessorStateManager, RecordCollector} -import org.apache.kafka.streams.state.StateSerdes - -/** - * Copied from Kafka's [[org.apache.kafka.streams.processor.internals.ProcessorContextImpl]]. - * - * If StoreChangeLogger had been public, we would have used it as-is. - * - * Note that the use of array-typed keys is discouraged because they result in incorrect caching - * behavior. If you intend to work on byte arrays as key, for example, you may want to wrap them - * with the [[org.apache.kafka.common.utils.Bytes]] class. - */ -class CMSStoreChangeLogger[K, V](val storeName: String, - val context: StateStoreContext, - val partition: Int, - val serialization: StateSerdes[K, V], - val processorNodeId: String) { - - private val topic = ProcessorStateManager.storeChangelogTopic(context.applicationId, storeName, context.taskId().topologyName()) - private val collector = context.asInstanceOf[RecordCollector.Supplier].recordCollector - - def this(storeName: String, context: StateStoreContext, serialization: StateSerdes[K, V], processorNodeId: String) = { - this(storeName, context, context.taskId.partition(), serialization, processorNodeId) - } - - def logChange(key: K, value: V, timestamp: Long): Unit = { - if (collector != null) { - val keySerializer = serialization.keySerializer - val valueSerializer = serialization.valueSerializer - collector.send(this.topic, key, value, null, this.partition, timestamp, keySerializer, valueSerializer, processorNodeId, context.asInstanceOf[InternalProcessorContext[Void,Void]]) - } - } - -} diff --git a/src/main/scala/io/confluent/examples/streams/algebird/ProbabilisticCounter.scala b/src/main/scala/io/confluent/examples/streams/algebird/ProbabilisticCounter.scala deleted file mode 100644 index b404c11581..0000000000 --- a/src/main/scala/io/confluent/examples/streams/algebird/ProbabilisticCounter.scala +++ /dev/null @@ -1,27 +0,0 @@ -package io.confluent.examples.streams.algebird - -import org.apache.kafka.streams.processor.api.{Processor, ProcessorContext, Record} - -/** - * Counts record values (in String format) probabilistically and then outputs the respective count estimate. - */ -class ProbabilisticCounter(val cmsStoreName: String) - extends Processor[String, String, String, Long] { - - private var cmsState: CMSStore[String] = _ - private var processorContext: ProcessorContext[String, Long] = _ - - override def init(processorContext: ProcessorContext[String, Long]): Unit = { - this.processorContext = processorContext - cmsState = this.processorContext.getStateStore[CMSStore[String]](cmsStoreName) - } - - override def process(record: Record[String, String]): Unit = { - // Count the record value, think: "+ 1" - cmsState.put(record.value(), record.timestamp()) - - // In this example: emit the latest count estimate for the record value. We could also do - // something different, e.g. periodically output the latest heavy hitters via `punctuate`. - processorContext.forward(new Record(record.value(), cmsState.get(record.value()), record.timestamp(), record.headers())) - } -} \ No newline at end of file diff --git a/src/main/scala/io/confluent/examples/streams/algebird/TopCMSSerde.scala b/src/main/scala/io/confluent/examples/streams/algebird/TopCMSSerde.scala deleted file mode 100644 index e99529e5a0..0000000000 --- a/src/main/scala/io/confluent/examples/streams/algebird/TopCMSSerde.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.examples.streams.algebird - -import java.util - -import com.twitter.algebird.TopCMS -import com.twitter.chill.ScalaKryoInstantiator -import org.apache.kafka.common.errors.SerializationException -import org.apache.kafka.common.serialization._ - -class TopCMSSerializer[T] extends Serializer[TopCMS[T]] { - - override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = { - // nothing to do - } - - override def serialize(topic: String, cms: TopCMS[T]): Array[Byte] = - if (cms == null) null - else ScalaKryoInstantiator.defaultPool.toBytesWithClass(cms) - - override def close(): Unit = { - // nothing to do - } - -} - -class TopCMSDeserializer[T] extends Deserializer[TopCMS[T]] { - - override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = { - // nothing to do - } - - override def deserialize(topic: String, bytes: Array[Byte]): TopCMS[T] = - if (bytes == null) null - else if (bytes.isEmpty) throw new SerializationException("byte array must not be empty") - else ScalaKryoInstantiator.defaultPool.fromBytes(bytes).asInstanceOf[TopCMS[T]] - - override def close(): Unit = { - // nothing to do - } - -} - -/** - * A [[Serde]] for [[TopCMS]]. - * - * =Usage= - * - * {{{ - * val anyTopic = "any-topic" - * val cms: TopCMS[String] = ??? - * val serde: Serde[TopCMS[String]] = TopCMSSerde[String] - * - * val bytes: Array[Byte] = serde.serializer().serialize(anyTopic, cms) - * val restoredCms: TopCMS[String] = serde.deserializer().deserialize(anyTopic, bytes) - * }}} - * - * =Future Work= - * - * We could perhaps be more efficient if we serialized not the full [[TopCMS]] instance but only - * its relevant fields. - */ -object TopCMSSerde { - - def apply[T]: Serde[TopCMS[T]] = Serdes.serdeFrom(new TopCMSSerializer[T], new TopCMSDeserializer[T]) - -} \ No newline at end of file diff --git a/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala b/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala deleted file mode 100644 index 950dabc037..0000000000 --- a/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Copyright Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.examples.streams - -import java.util -import java.util.Properties -import io.confluent.examples.streams.algebird.{CMSStoreBuilder, ProbabilisticCounter} -import org.apache.kafka.common.serialization._ -import org.apache.kafka.streams.kstream.Named -import org.apache.kafka.streams.scala.ImplicitConversions._ -import org.apache.kafka.streams.scala.serialization.Serdes._ -import org.apache.kafka.streams.scala.StreamsBuilder -import org.apache.kafka.streams.scala.kstream.KStream -import org.apache.kafka.streams.{StreamsConfig, TopologyTestDriver} -import org.apache.kafka.test.TestUtils -import org.junit._ -import org.scalatestplus.junit.AssertionsForJUnit - -/** - * End-to-end integration test that demonstrates how to probabilistically count items in an input stream. - * - * This example uses a custom state store implementation, [[io.confluent.examples.streams.algebird.CMSStore]], - * that is backed by a Count-Min Sketch data structure. The algorithm is WordCount. - */ -class ProbabilisticCountingScalaIntegrationTest extends AssertionsForJUnit { - - private val inputTopic = "inputTopic" - private val outputTopic = "output-topic" - - @Test - def shouldProbabilisticallyCountWords(): Unit = { - val inputTextLines: Seq[String] = Seq( - "Hello Kafka Streams", - "All streams lead to Kafka", - "Join Kafka Summit" - ) - - val expectedWordCounts: Map[String, Long] = Map( - ("hello", 1L), - ("kafka", 1L), - ("streams", 1L), - ("all", 1L), - ("streams", 2L), - ("lead", 1L), - ("to", 1L), - ("kafka", 2L), - ("join", 1L), - ("kafka", 3L), - ("summit", 1L) - ) - - // Step 1: Create the topology and its configuration - val builder: StreamsBuilder = createTopology() - val streamsConfiguration = createTopologyConfiguration() - - val topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration) - try { - // Step 2: Write the input - import IntegrationTestScalaUtils._ - IntegrationTestScalaUtils.produceValuesSynchronously(inputTopic, inputTextLines, topologyTestDriver) - - // Step 3: Validate the output - val actualWordCounts: Map[String, Long] = - IntegrationTestScalaUtils.drainTableOutput[String, Long](outputTopic, topologyTestDriver) - - // Note: This example only processes a small amount of input data, for which the word counts - // will actually be exact counts. However, for large amounts of input data we would expect to - // observe approximate counts (where the approximate counts would be >= true exact counts). - assert(actualWordCounts === expectedWordCounts) - } finally { - topologyTestDriver.close() - } - } - - def createTopology(): StreamsBuilder = { - - def createCMSStoreBuilder(cmsStoreName: String): CMSStoreBuilder[String] = { - val changelogConfig: util.HashMap[String, String] = { - val cfg = new java.util.HashMap[String, String] - // The CMSStore's changelog will typically have rather few and small records per partition. - // To improve efficiency we thus set a smaller log segment size than Kafka's default of 1GB. - val segmentSizeBytes = (20 * 1024 * 1024).toString - cfg.put("segment.bytes", segmentSizeBytes) - cfg - } - new CMSStoreBuilder[String](cmsStoreName, Serdes.String()).withLoggingEnabled(changelogConfig) - } - - val builder = new StreamsBuilder - val cmsStoreName = "cms-store" - builder.addStateStore(createCMSStoreBuilder(cmsStoreName)) - val textLines: KStream[String, String] = builder.stream[String, String](inputTopic) - val approximateWordCounts: KStream[String, Long] = textLines - .flatMapValues(textLine => textLine.toLowerCase.split("\\W+")) - .process(() => new ProbabilisticCounter(cmsStoreName), Named.as("cms-store"), cmsStoreName) - approximateWordCounts.to(outputTopic) - builder - } - - def createTopologyConfiguration(): Properties = { - val p = new Properties() - p.put(StreamsConfig.APPLICATION_ID_CONFIG, "probabilistic-counting-scala-integration-test") - p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config") - // Use a temporary directory for storing state, which will be automatically removed after the test. - p.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory.getAbsolutePath) - p - } - -} \ No newline at end of file diff --git a/src/test/scala/io/confluent/examples/streams/algebird/CMSStoreTest.scala b/src/test/scala/io/confluent/examples/streams/algebird/CMSStoreTest.scala deleted file mode 100644 index 542a5557c3..0000000000 --- a/src/test/scala/io/confluent/examples/streams/algebird/CMSStoreTest.scala +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Copyright Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.examples.streams.algebird - -import com.twitter.algebird.TopCMS -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.header.Headers -import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.serialization.{Serdes, Serializer} -import org.apache.kafka.common.utils.LogContext -import org.apache.kafka.streams.processor.StateStoreContext -import org.apache.kafka.streams.processor.internals.{InternalProcessorContext, MockStreamsMetrics} -import org.apache.kafka.streams.state.KeyValueStoreTestDriver -import org.apache.kafka.streams.state.internals.ThreadCache -import org.apache.kafka.test.{InternalMockProcessorContext, MockRecordCollector, TestUtils} -import org.assertj.core.api.Assertions.assertThat -import org.junit._ -import org.scalatestplus.junit.AssertionsForJUnit -import org.scalatestplus.mockito.MockitoSugar - -import scala.collection.MapView - -class CMSStoreTest extends AssertionsForJUnit with MockitoSugar { - - private val anyStoreName = "cms-store" - - private def createTestContext[T](driver: KeyValueStoreTestDriver[Integer, TopCMS[T]] = createTestDriver[T](), - changelogRecords: Option[Seq[(Int, TopCMS[T])]] = None - ): InternalMockProcessorContext[Integer, TopCMS[T]] = { - // Write the records to the store's changelog - changelogRecords.getOrElse(Seq.empty).foreach { case (key, cms) => driver.addEntryToRestoreLog(key, cms) } - - // The processor context is what makes the restore data available to a store during - // the store's initialization, hence this is what we must return back. - val processorContext: InternalMockProcessorContext[Integer, TopCMS[T]] = { - val pc = driver.context.asInstanceOf[InternalMockProcessorContext[Integer, TopCMS[T]]] - pc.setTime(1) - pc - } - processorContext - } - - private def createTestDriver[T](): KeyValueStoreTestDriver[Integer, TopCMS[T]] = { - val keySerde = Serdes.Integer() - val cmsSerde = TopCMSSerde[T] - KeyValueStoreTestDriver.create[Integer, TopCMS[T]]( - keySerde.serializer(), - keySerde.deserializer(), - cmsSerde.serializer(), - cmsSerde.deserializer()) - } - - @Test - def shouldBeChangeloggingByDefault(): Unit = { - val store = new CMSStore[String](anyStoreName) - assertThat(store.loggingEnabled).isTrue - } - - @Test - def shouldBeNonPersistentStore(): Unit = { - val store = new CMSStore[String](anyStoreName) - assertThat(store.persistent).isFalse - } - - @Test - def shouldBeClosedBeforeInit(): Unit = { - val store = new CMSStore[String](anyStoreName) - assertThat(store.isOpen).isFalse - } - - @Test - def shouldBeOpenAfterInit(): Unit = { - // Given - val store: CMSStore[String] = new CMSStore[String](anyStoreName) - val processorContext = createTestContext[String]() - - // When - store.init(processorContext.asInstanceOf[StateStoreContext], store) - - // Then - assertThat(store.isOpen).isTrue - } - - @Test - def shouldBeClosedAfterClosing(): Unit = { - // Given - val store: CMSStore[String] = new CMSStore[String](anyStoreName) - - // When - store.close() - - // Then - assertThat(store.isOpen).isFalse - } - - - @Test - def shouldExactlyCountSmallNumbersOfItems(): Unit = { - // Given - val store: CMSStore[String] = new CMSStore[String](anyStoreName) - val items = Seq( - ("foo", System.currentTimeMillis()), - ("bar", System.currentTimeMillis()), - ("foo", System.currentTimeMillis()), - ("foo", System.currentTimeMillis()), - ("quux", System.currentTimeMillis()), - ("bar", System.currentTimeMillis()), - ("foor", System.currentTimeMillis())) - val processorContext = createTestContext() - store.init(processorContext.asInstanceOf[StateStoreContext], store) - - // When - items.foreach(x => store.put(x._1, x._2)) - - // Note: We intentionally do not flush the store in this test. - - // Then - assertThat(store.totalCount).isEqualTo(items.size) - assertThat(store.heavyHitters).isEqualTo(items.map(x => x._1).toSet) - val expWordCounts: MapView[String, Int] = items.map(x => x._1).groupBy(identity).view.mapValues(_.length) - expWordCounts.foreach { case (word, count) => assertThat(store.get(word)).isEqualTo(count) } - } - - @Test - def shouldBackupToChangelogIfLoggingIsEnabled(): Unit = { - // Given - val driver: KeyValueStoreTestDriver[Integer, TopCMS[String]] = createTestDriver[String]() - val processorContext = createTestContext(driver) - val store: CMSStore[String] = new CMSStore[String](anyStoreName, loggingEnabled = true) - store.init(processorContext.asInstanceOf[StateStoreContext], store) - - // When - val items = Seq( - ("one", System.currentTimeMillis()), - ("two", System.currentTimeMillis()), - ("three", System.currentTimeMillis())) - items.foreach(x => store.put(x._1, x._2)) - store.flush() - - // Then - val cms: TopCMS[String] = store.cmsFrom(items.map(x => x._1)) - assertThat(driver.flushedEntryStored(store.changelogKey)).isEqualTo(cms) - assertThat(driver.flushedEntryRemoved(store.changelogKey)).isFalse - } - - @Test - def shouldBackupToChangelogOnlyOnFlush(): Unit = { - // Given - val store: CMSStore[String] = new CMSStore[String](anyStoreName, loggingEnabled = true) - val observedChangelogRecords = new java.util.ArrayList[ProducerRecord[_, _]] - val processorContext = { - // We must use a "spying" RecordCollector because, unfortunately, Kafka's - // KeyValueStoreTestDriver is not providing any such facilities. - val observingCollector: MockRecordCollector = new MockRecordCollector() { - override def send[K, V](topic: String, - key: K, - value: V, - headers: Headers, - partition: Integer, - timestamp: java.lang.Long, - keySerializer: Serializer[K], - valueSerializer: Serializer[V], - processorNodeId: String, - internalContext: InternalProcessorContext[Void,Void]): Unit = { - observedChangelogRecords.add(new ProducerRecord[K, V](topic, partition, timestamp, key, value)) - } - } - val cache: ThreadCache = new ThreadCache(new LogContext("test"), 1024, new MockStreamsMetrics(new Metrics)) - val context = new InternalMockProcessorContext(TestUtils.tempDirectory, Serdes.Integer(), TopCMSSerde[String], observingCollector, cache) - context.setTime(1) - context - } - store.init(processorContext.asInstanceOf[StateStoreContext], store) - val items = Seq( - ("one", System.currentTimeMillis()), - ("two", System.currentTimeMillis()), - ("three", System.currentTimeMillis()), - ("four", System.currentTimeMillis()), - ("firve", System.currentTimeMillis())) - - // When - items.foreach(x => store.put(x._1, x._2)) - // Then - assertThat(observedChangelogRecords.size()).isEqualTo(0) - - // When - store.flush() - // Then - assertThat(observedChangelogRecords.size()).isEqualTo(1) - } - - @Test - def shouldNotBackupToChangelogIfLoggingIsDisabled(): Unit = { - // Given - val driver: KeyValueStoreTestDriver[Integer, TopCMS[String]] = createTestDriver[String]() - val processorContext = createTestContext(driver) - val store: CMSStore[String] = new CMSStore[String](anyStoreName, loggingEnabled = false) - store.init(processorContext.asInstanceOf[StateStoreContext], store) - - // When - val items = Seq( - ("one", System.currentTimeMillis()), - ("two", System.currentTimeMillis()), - ("three", System.currentTimeMillis())) - items.foreach(x => store.put(x._1, x._2)) - store.flush() - - // Then - assertThat(driver.flushedEntryStored(store.changelogKey)).isNull() - assertThat(driver.flushedEntryRemoved(store.changelogKey)).isFalse - } - - @Test - def shouldRestoreFromEmptyChangelog(): Unit = { - // Given - val driver: KeyValueStoreTestDriver[Integer, TopCMS[String]] = createTestDriver[String]() - val store: CMSStore[String] = new CMSStore[String](anyStoreName, loggingEnabled = true) - val processorContext = createTestContext[String](driver) - - // When - store.init(processorContext.asInstanceOf[StateStoreContext], store) - processorContext.restore(store.name, driver.restoredEntries()) - - // Then - assertThat(store.totalCount).isZero - assertThat(store.heavyHitters.size).isZero - } - - @Test - def shouldRestoreFromNonEmptyChangelog(): Unit = { - // Given - val driver: KeyValueStoreTestDriver[Integer, TopCMS[String]] = createTestDriver[String]() - val store: CMSStore[String] = new CMSStore[String](anyStoreName, loggingEnabled = true) - val items: Seq[String] = Seq("foo", "bar", "foo", "foo", "quux", "bar", "foo") - val processorContext: InternalMockProcessorContext[Integer, TopCMS[String]] = { - val changelogKeyDoesNotMatter = 123 - val cms: TopCMS[String] = store.cmsFrom(items) - val changelogRecords = Seq((changelogKeyDoesNotMatter, cms)) - createTestContext(driver, changelogRecords = Some(changelogRecords)) - } - - // When - store.init(processorContext.asInstanceOf[StateStoreContext], store) - processorContext.restore(store.name, driver.restoredEntries()) - - // Then - val expWordCounts: MapView[String, Int] = items.groupBy(identity).view.mapValues(_.length) - expWordCounts.foreach { case (word, count) => assertThat(store.get(word)).isEqualTo(count) } - } - - @Test - def shouldRestoreFromChangelogTombstone(): Unit = { - // Given - val driver: KeyValueStoreTestDriver[Integer, TopCMS[String]] = createTestDriver[String]() - val store: CMSStore[String] = new CMSStore[String](anyStoreName, loggingEnabled = true) - val processorContext: InternalMockProcessorContext[Integer, TopCMS[String]] = { - val changelogKeyDoesNotMatter = 123 - val tombstone: TopCMS[String] = null - val changelogRecords = Seq((changelogKeyDoesNotMatter, tombstone)) - createTestContext(driver, changelogRecords = Some(changelogRecords)) - } - - // When - store.init(processorContext.asInstanceOf[StateStoreContext], store) - processorContext.restore(store.name, driver.restoredEntries()) - - // Then - assertThat(store.totalCount).isZero - assertThat(store.heavyHitters.size).isZero - } - - @Test - def shouldRestoreFromLatestChangelogRecordOnly(): Unit = { - // Given - val driver: KeyValueStoreTestDriver[Integer, TopCMS[String]] = createTestDriver[String]() - val store: CMSStore[String] = new CMSStore[String](anyStoreName, loggingEnabled = true) - val expectedItems = Seq("foo", "bar", "foo", "foo", "quux", "bar", "foo") - val unexpectedItems1 = Seq("something", "entirely", "different") - val unexpectedItems2 = Seq("even", "more", "different") - val processorContext: InternalMockProcessorContext[Integer, TopCMS[String]] = { - val cms = store.cmsFrom(expectedItems) - val differentCms1 = store.cmsFrom(unexpectedItems1) - val differentCms2 = store.cmsFrom(unexpectedItems2) - val sameKey = 123 - val differentKey = 456 - val changelogRecords = Seq( - (sameKey, differentCms1), - (differentKey, differentCms2), - (sameKey, cms) /* latest entry in the changelog should "win" */ - ) - createTestContext(driver, changelogRecords = Some(changelogRecords)) - } - - // When - store.init(processorContext.asInstanceOf[StateStoreContext], store) - processorContext.restore(store.name, driver.restoredEntries()) - - // Then - val expWordCounts: MapView[String, Int] = expectedItems.groupBy(identity).view.mapValues(_.length) - expWordCounts.foreach { case (word, count) => assertThat(store.get(word)).isEqualTo(count) } - // Note: The asserts below work only because, given the test setup, we are sure not to run into - // CMS hash collisions that would lead to non-zero counts even for un-counted items. - unexpectedItems1.toSet[String].foreach(item => assertThat(store.get(item)).isEqualTo(0)) - unexpectedItems2.toSet[String].foreach(item => assertThat(store.get(item)).isEqualTo(0)) - } - - @Test - def shouldNotRestoreFromChangelogIfLoggingIsDisabled(): Unit = { - // Given - val store: CMSStore[String] = new CMSStore[String](anyStoreName, loggingEnabled = false) - val items: Seq[String] = Seq("foo", "bar", "foo", "foo", "quux", "bar", "foo") - val processorContext: InternalMockProcessorContext[Integer, TopCMS[String]] = { - val changelogKeyDoesNotMatter = 123 - val cms: TopCMS[String] = store.cmsFrom(items) - val changelogRecords = Seq((changelogKeyDoesNotMatter, cms)) - createTestContext(changelogRecords = Some(changelogRecords)) - } - - // When - store.init(processorContext.asInstanceOf[StateStoreContext], store) - - // Then - assertThat(store.totalCount).isZero - assertThat(store.heavyHitters.size).isZero - } - -} \ No newline at end of file diff --git a/src/test/scala/io/confluent/examples/streams/algebird/TopCMSSerdeTest.scala b/src/test/scala/io/confluent/examples/streams/algebird/TopCMSSerdeTest.scala deleted file mode 100644 index ad99de917a..0000000000 --- a/src/test/scala/io/confluent/examples/streams/algebird/TopCMSSerdeTest.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.examples.streams.algebird - -import com.twitter.algebird.{TopCMS, TopPctCMS, TopPctCMSMonoid} -import org.apache.kafka.common.errors.SerializationException -import org.apache.kafka.common.serialization.Serde -import org.assertj.core.api.Assertions.assertThat -import org.junit._ -import org.scalatestplus.junit.AssertionsForJUnit -import org.scalatestplus.mockito.MockitoSugar - -class TopCMSSerdeTest extends AssertionsForJUnit with MockitoSugar { - - private val anyTopic = "any-topic" - - @Test - def shouldRoundtrip(): Unit = { - // Given - val cmsMonoid: TopPctCMSMonoid[String] = { - val delta: Double = 1E-10 - val eps: Double = 0.001 - val seed: Int = 1 - val heavyHittersPct: Double = 0.01 - TopPctCMS.monoid[String](eps, delta, seed, heavyHittersPct) - } - val itemToBeCounted = "count-me" - val cms: TopCMS[String] = cmsMonoid.create(itemToBeCounted) - val serde: Serde[TopCMS[String]] = TopCMSSerde[String] - - // When - val bytes: Array[Byte] = serde.serializer().serialize(anyTopic, cms) - val restoredCms = serde.deserializer().deserialize(anyTopic, bytes) - - // Then - assertThat(restoredCms).isEqualTo(cms) - assertThat(restoredCms.frequency(itemToBeCounted).estimate).isEqualTo(cms.frequency(itemToBeCounted).estimate) - } - - @Test - def shouldSerializeNullToNull(): Unit = { - val serde: Serde[TopCMS[String]] = TopCMSSerde[String] - assertThat(serde.serializer.serialize(anyTopic, null)).isNull() - serde.close() - } - - @Test - def shouldDeserializeNullToNull(): Unit = { - val serde: Serde[TopCMS[String]] = TopCMSSerde[String] - assertThat(serde.deserializer.deserialize(anyTopic, null)).isNull() - serde.close() - } - - @Test - def shouldThrowSerializationExceptionWhenDeserializingZeroBytes(): Unit = { - val serde: Serde[TopCMS[String]] = TopCMSSerde[String] - try { - serde.deserializer.deserialize(anyTopic, new Array[Byte](0)) - fail("Should have thrown a SerializationException because of zero input bytes"); - } - catch { - case _: SerializationException => // Ignore (there's no contract on the details of the exception) - } - serde.close() - } - -} \ No newline at end of file From d79f0369ae96f9dd52d775aae5761e7989b5d8fa Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 23 Dec 2024 09:51:35 -0800 Subject: [PATCH 6/8] update readme --- README.md | 48 +++++++++++++++++++++++------------------------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index df4a39399d..4177b23148 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,9 @@ # Kafka Streams Examples +> [!NOTE] +> This repo is replace with [Apache Tutorial](https://github.com/confluentinc/tutorials). We still "keep the lights on", +but we don't improve existing examples any longer, nor do we add new example. + This project contains code examples that demonstrate how to implement real-time applications and event-driven microservices using the Streams API of [Apache Kafka](http://kafka.apache.org/) aka Kafka Streams. @@ -254,17 +258,14 @@ If you are using Eclipse, you can also right-click on `pom.xml` file and choose -## Java 8+ - -Some code examples require Java 8+, primarily because of the usage of -[lambda expressions](https://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html). +## Java 17+ IntelliJ IDEA users: * Open _File > Project structure_ * Select "Project" on the left. - * Set "Project SDK" to Java 1.8. - * Set "Project language level" to "8 - Lambdas, type annotations, etc." + * Set "Project SDK" to Java 17. + * Set "Project language level" to "17 - Sealed types, always-strict floating-point semantics" @@ -274,11 +275,7 @@ IntelliJ IDEA users: > Scala is required only for the Scala examples in this repository. If you are a Java developer you can safely ignore > this section. -If you want to experiment with the Scala examples in this repository, you need a version of Scala that supports Java 8 -and SAM / Java lambda (e.g. Scala 2.11 with `-Xexperimental` compiler flag, or 2.12). - -If you are compiling with Java 9+, you'll need to have Scala version 2.12+ to be compatible with the Java version. - +If you want to experiment with the Scala examples in this repository, you need a version of Scala that supports Java 17. @@ -300,10 +297,13 @@ In a nutshell: ```shell # Ensure you have downloaded and installed Confluent Platform as per the Quickstart instructions above. -# Start ZooKeeper -$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties +#Generate a Cluster UUID +$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)" -# In a separate terminal, start Kafka broker +#Format Log Directories +$ bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties + +# Start the Kafka broker $ ./bin/kafka-server-start ./etc/kafka/server.properties # In a separate terminal, start Confluent Schema Registry @@ -378,17 +378,15 @@ $ mvn package # Packages the application examples into a standalone jar | Branch (this repo) | Confluent Platform | Apache Kafka | | ----------------------------------------|--------------------|-------------------| -| [5.4.x](../../../tree/5.4.x/)\* | 5.4.0-SNAPSHOT | 2.4.0-SNAPSHOT | -| [5.3.0-post](../../../tree/5.3.0-post/) | 5.3.0 | 2.3.0 | -| [5.2.2-post](../../../tree/5.2.2-post/) | 5.2.2 | 2.2.1 | -| [5.2.1-post](../../../tree/5.2.1-post/) | 5.2.1 | 2.2.1 | -| [5.1.0-post](../../../tree/5.1.0-post/) | 5.1.0 | 2.1.0 | -| [5.0.0-post](../../../tree/5.0.0-post/) | 5.0.0 | 2.0.0 | -| [4.1.0-post](../../../tree/4.1.0-post/) | 4.1.0 | 1.1.0 | -| [4.0.0-post](../../../tree/4.4.0-post/) | 4.0.0 | 1.0.0 | -| [3.3.0-post](../../../tree/3.3.0-post/) | 3.3.0 | 0.11.0 | - -\*You must manually build the `2.3` version of Apache Kafka and the `5.3.x` version of Confluent Platform. See instructions above. +| [master](../../../tree/master/)\* | 8.0.0-SNAPSHOT | 4.0.0-SNAPSHOT | +| [7.9.x](../../../tree/7.9.x/) | 7.9.0-SNAPSHOT | 3.9.0 | +| [7.8.0-post](../../../tree/7.8.0-post/) | 7.8.0 | 3.8.0 | +| ... | | | +| [7.1.0-post](../../../tree/7.1.0-post/) | 7.1.0 | 3.1.0 | + +Older version prior to 7.1.0 are [not supported any longer](https://docs.confluent.io/platform/current/installation/versions-interoperability.html). + +\*You must manually build the `4.0` version of Apache Kafka and the `8.0.0` version of Confluent Platform. See instructions above. The `master` branch of this repository represents active development, and may require additional steps on your side to make it compile. Check this README as well as [pom.xml](pom.xml) for any such information. From e289bd13b5e3c1e10b2ab52f084c6b93b3b36362 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 23 Dec 2024 09:56:47 -0800 Subject: [PATCH 7/8] update --- .semaphore/semaphore.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index c16d6d6def..1fcc93ae51 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -155,7 +155,7 @@ after_pipeline: - name: SonarQube commands: - checkout - - sem-version java 17 + - if [[ $SEMAPHORE_GIT_BRANCH =~ ^7\..* ]]; then sem-version java 8; else sem-version java 17; fi - artifact pull workflow target-AMD - artifact pull workflow target-ARM - emit-sonarqube-data --run_only_sonar_scan From ab6f9487a63ea57ae811da8ab58e4764c7727262 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 23 Dec 2024 09:59:01 -0800 Subject: [PATCH 8/8] readme --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 4177b23148..96518db8c8 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # Kafka Streams Examples > [!NOTE] -> This repo is replace with [Apache Tutorial](https://github.com/confluentinc/tutorials). We still "keep the lights on", -but we don't improve existing examples any longer, nor do we add new example. +> This repo is replaced with [Confluent Tutorials for Apache Kafka](https://github.com/confluentinc/tutorials). +We still "keep the lights on", but we don't improve existing examples any longer, nor do we add new example. This project contains code examples that demonstrate how to implement real-time applications and event-driven microservices using the Streams API of [Apache Kafka](http://kafka.apache.org/) aka Kafka Streams.