diff --git a/src/main/scala/io/confluent/examples/streams/algebird/CMSStore.scala b/src/main/scala/io/confluent/examples/streams/algebird/CMSStore.scala deleted file mode 100644 index cc38cdcb84..0000000000 --- a/src/main/scala/io/confluent/examples/streams/algebird/CMSStore.scala +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Copyright Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.examples.streams.algebird - -import com.twitter.algebird.{CMSHasher, TopCMS, TopPctCMS} -import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.processor.{StateStore, StateStoreContext} -import org.apache.kafka.streams.state.StateSerdes - -/** - * An in-memory store that leverages the Count-Min Sketch implementation of - * [[https://github.com/twitter/algebird Twitter Algebird]]. - * - * This store allows you to probabilistically count items of type T with a - * [[https://en.wikipedia.org/wiki/Count%E2%80%93min_sketch Count-Min Sketch]] data structure. - * Here, the counts returned by the store will be approximate counts, i.e. estimations, because a - * Count-Min Sketch trades slightly inaccurate counts for greatly reduced space utilization - * (however, the estimation error is mathematically proven to be bounded). - * With probability at least `1 - delta`, this estimate is within `eps * N` of the true frequency - * (i.e., `true frequency <= estimate <= true frequency + eps * N`), where `N` is the total number - * of items counted ("seen" in the input) so far (cf. [[CMSStore#totalCount]]). - * - * A traditional Count-Min Sketch is a fixed-size data structure that is essentially an array of - * counters of a particular width (derived from the parameter `eps`) and depth (derived from the - * parameter `delta`). The CMS variant used in this store, [[TopPctCMS]], additionally tracks the - * so-called "heavy hitters" among the counted items (i.e. the items with the largest counts) based - * on a percentage threshold; the size of heavy hitters is still bounded, however, hence the total - * size of the [[TopPctCMS]] data structure is still fixed. - * - * =Fault-tolerance= - * - * This store supports changelogging its state to Kafka and is thus fault-tolerant. Every time the - * store is flushed (cf. [[org.apache.kafka.streams.StreamsConfig.COMMIT_INTERVAL_MS_CONFIG]]) the - * underlying CMS data structure is written to the store's changelog topic. For many use cases - * this approach should be sufficiently efficient because the absolute size of a CMS is typically - * rather small (a few KBs up to a megabyte, depending on the CMS settings, which are determined by - * e.g. your error bound requirements for approximate counts). - * - * =Usage= - * - * Note: Twitter Algebird is best used with Scala, so all the examples below are in Scala, too. - * - * In a Kafka Streams application, you'd typically create this store as such: - * - * {{{ - * val builder: StreamsBuilder = new StreamsBuilder() - * - * // In this example, we create a store for type [[String]]. - * // It's recommended to reduce Kafka's log segment size for the changelogs of CMS stores, which - * // you can do by passing the respective Kafka setting to the CMSStoreBuilder via `withLoggingEnabled()`. - * builder.addStateStore(new CMSStoreBuilder[String]("my-cms-store-name", Serdes.String())) - * }}} - * - * Then you'd use the store within a [[org.apache.kafka.streams.processor.Processor]] or a - * [[org.apache.kafka.streams.kstream.Transformer]] similar to: - * - * {{{ - * class ProbabilisticCounter extends Transformer[Array[Byte], String, KeyValue[String, Long]] { - * - * private var cmsState: CMSStore[String] = _ - * private var processorContext: ProcessorContext = _ - * - * override def init(processorContext: ProcessorContext): Unit = { - * this.processorContext = processorContext - * cmsState = this.processorContext.getStateStore("my-cms-store-name").asInstanceOf[CMSStore[String]] - * } - * - * override def transform(key: Array[Byte], value: String): KeyValue[String, Long] = { - * // Count the record value, think: "+ 1" - * cmsState.put(value) - * - * // Emit the latest count estimate for the record value - * KeyValue.pair[String, Long](value, cmsState.get(value)) - * } - * - * override def punctuate(l: Long): KeyValue[String, Long] = null - * - * override def close(): Unit = {} - * } - * }}} - * - * @param name The name of this store instance - * @param loggingEnabled Whether or not changelogging (fault-tolerance) is enabled for this store. - * @param delta CMS parameter: A bound on the probability that a query estimate does not - * lie within some small interval (an interval that depends on `eps`) around - * the truth. - * See [[TopPctCMS]] and [[com.twitter.algebird.CMSMonoid]]. - * @param eps CMS parameter: One-sided error bound on the error of each point query, - * i.e. frequency estimate. - * See [[TopPctCMS]] and [[com.twitter.algebird.CMSMonoid]]. - * @param seed CMS parameter: A seed to initialize the random number generator used to - * create the pairwise independent hash functions. Typically you do not - * need to change this. - * See [[TopPctCMS]] and [[com.twitter.algebird.CMSMonoid]]. - * @param heavyHittersPct CMS parameter: A threshold for finding heavy hitters, i.e., items that - * appear at least (heavyHittersPct * totalCount) times in the stream. - * Every item that appears at least `(heavyHittersPct * totalCount)` times - * is included, and with probability `p >= 1 - delta`, no item whose count - * is less than `(heavyHittersPct - eps) * totalCount` is included. - * This also means that this parameter is an upper bound on the number of - * heavy hitters that will be tracked: the set of heavy hitters contains at - * most `1 / heavyHittersPct` elements. For example, if - * `heavyHittersPct=0.01` (or 0.25), then at most `1 / 0.01 = 100` items - * or `1 / 0.25 = 4` items) will be tracked/returned as heavy hitters. - * This parameter can thus control the memory footprint required for - * tracking heavy hitters. - * See [[TopPctCMS]] and [[com.twitter.algebird.TopPctCMSMonoid]]. - * @tparam T The type used to identify the items to be counted with the CMS. For example, if - * you want to count the occurrence of user names, you could use count user names - * directly with `T=String`; alternatively, you could map each username to a unique - * numeric ID expressed as a `Long`, and then count the occurrences of those `Long`s with - * a CMS of type `T=Long`. Note that such a mapping between the items of your problem - * domain and their identifiers used for counting via CMS should be bijective. - * We require a [[CMSHasher]] context bound for `K`, see [[CMSHasher]] for available - * implicits that can be imported. - * See [[com.twitter.algebird.CMSMonoid]] for further information. - */ -class CMSStore[T: CMSHasher](override val name: String, - val loggingEnabled: Boolean = true, - val delta: Double = 1E-10, - val eps: Double = 0.001, - val seed: Int = 1, - val heavyHittersPct: Double = 0.01) - extends StateStore { - - private val cmsMonoid = TopPctCMS.monoid[T](eps, delta, seed, heavyHittersPct) - - /** - * The "storage backend" of this store. - * - * Needs proper initializing in case the store's changelog is empty. - */ - private var cms: TopCMS[T] = cmsMonoid.zero - - private var timestampOfLastStateStoreUpdate: Long = 0L - - private var changeLogger: CMSStoreChangeLogger[Integer, TopCMS[T]] = _ - - /** - * The record key used to write to the state's changelog. - * - * This key can be a constant because: - * - * 1. We always write the full CMS when writing to the changelog. - * 2. A CMS does not retain information about which items were counted, i.e. it does not track - * information about the keyspace (in the case of this store, the only information about the - * keyspace are the heavy hitters); so, unless we opted for a different approach than (1) - * above, we cannot leverage keyspace information anyways. - * 3. We use a [[CMSStoreChangeLogger]] that uses a stream task's - * [[org.apache.kafka.streams.processor.TaskId]] to identify the changelog partition to write to. - * Thus only one particular stream task will ever be writing to that changelog partition. - * 4. When restoring from the changelog, a stream task will read only its own (one) changelog - * partition. - * - * In other words, we can hardcode the changelog key because only the "right" stream task will be - * (a) writing to AND (b) reading from the respective partition of the changelog. - */ - private[algebird] val changelogKey = 42 - - /** - * For unit testing - */ - private[algebird] def cmsFrom(items: Seq[T]): TopCMS[T] = cmsMonoid.create(items) - - /** - * For unit testing - */ - private[algebird] def cmsFrom(item: T): TopCMS[T] = cmsMonoid.create(item) - - @volatile private var open: Boolean = false - - /** - * Initializes this store, including restoring the store's state from its changelog. - */ - override def init(context: StateStoreContext, root: StateStore): Unit = { - val serdes = new StateSerdes[Integer, TopCMS[T]]( - name, - Serdes.Integer(), - TopCMSSerde[T]) - changeLogger = new CMSStoreChangeLogger[Integer, TopCMS[T]](name, context, serdes, name) - - // Note: We must manually guard with `loggingEnabled` here because `context.register()` ignores - // that parameter. - if (root != null && loggingEnabled) { - context.register(root, (_, value) => { - if (value == null) { - cms = cmsMonoid.zero - } - else { - cms = serdes.valueFrom(value) - } - }) - } - - open = true - } - - /** - * Returns the estimated count of the item. - * - * @param item item to be counted - * @return estimated count - */ - def get(item: T): Long = cms.frequency(item).estimate - - /** - * Counts the item. - * - * @param item item to be counted - */ - def put(item: T, timestamp: Long): Unit = { - cms = cms + item - timestampOfLastStateStoreUpdate = timestamp - } - - /** - * The top items counted so far, with the percentage-based cut-off being defined by the CMS - * parameter `heavyHittersPct`. - * - * @return the top items counted so far - */ - def heavyHitters: Set[T] = cms.heavyHitters - - /** - * Returns the total number of items counted ("seen" in the input) so far. - * - * This number is not the same as the total number of unique items counted so far, i.e. - * it is not the cardinality of the set of items. - * - * Example: After having counted the input "foo", "bar", "foo", the return value would be 3. - * - * @return number of count operations so far - */ - def totalCount: Long = cms.totalCount - - override val persistent: Boolean = false - - override def isOpen: Boolean = open - - /** - * Periodically saves the latest CMS state to Kafka. - * - * =Implementation detail= - * - * The changelog records have the form: (hardcodedKey, CMS). That is, we are backing up the - * underlying CMS data structure in its entirety to Kafka. - */ - override def flush(): Unit = { - if (loggingEnabled) { - changeLogger.logChange(changelogKey, cms, timestampOfLastStateStoreUpdate) - } - } - - override def close(): Unit = { - open = false - } - -} \ No newline at end of file diff --git a/src/main/scala/io/confluent/examples/streams/algebird/CMSStoreBuilder.scala b/src/main/scala/io/confluent/examples/streams/algebird/CMSStoreBuilder.scala deleted file mode 100644 index 4559f04c81..0000000000 --- a/src/main/scala/io/confluent/examples/streams/algebird/CMSStoreBuilder.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.examples.streams.algebird - -import java.util - -import com.twitter.algebird.CMSHasher -import org.apache.kafka.common.serialization.Serde -import org.apache.kafka.streams.state.StoreBuilder - -/** - * A factory for Kafka Streams to instantiate a [[CMSStore]]. - * - * =Usage= - * - * The [[CMSStore]]'s changelog will typically have rather few and small records per partition. - * To improve efficiency we thus set a smaller log segment size (`segment.bytes`) than Kafka's - * default of 1GB. - * - * {{{ - * val changelogConfig = { - * val cfg = new java.util.HashMap[String, String] - * val segmentSizeBytes = (20 * 1024 * 1024).toString - * cfg.put("segment.bytes", segmentSizeBytes) - * cfg - * } - * new CMSStoreBuilder[String](cmsStoreName, Serdes.String()).withLoggingEnabled(changelogConfig) - * }}} - */ -class CMSStoreBuilder[T: CMSHasher](val name: String, - val serde: Serde[T]) - extends StoreBuilder[CMSStore[T]] { - - var loggingEnabled = false - var logConfig : util.Map[String, String] = new util.HashMap[String, String]() - - - override def build(): CMSStore[T] = new CMSStore[T](name, loggingEnabled) - - override def withCachingEnabled() = throw new UnsupportedOperationException("caching not supported") - - override def withCachingDisabled() = throw new UnsupportedOperationException("caching not supported") - - /** - * To enable fault-tolerance for the [[CMSStore]]. - */ - override def withLoggingEnabled(config: util.Map[String, String]): CMSStoreBuilder[T] = { - loggingEnabled = true - logConfig.clear() - logConfig.putAll(config) - this - } - - /** - * To disable fault-tolerance for the [[CMSStore]]. - */ - override def withLoggingDisabled(): CMSStoreBuilder[T] = { - loggingEnabled = false - logConfig.clear() - this - } -} diff --git a/src/main/scala/io/confluent/examples/streams/algebird/CMSStoreChangeLogger.scala b/src/main/scala/io/confluent/examples/streams/algebird/CMSStoreChangeLogger.scala deleted file mode 100644 index 4bd0d0f0a9..0000000000 --- a/src/main/scala/io/confluent/examples/streams/algebird/CMSStoreChangeLogger.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.examples.streams.algebird - -import org.apache.kafka.streams.processor.StateStoreContext -import org.apache.kafka.streams.processor.internals.{InternalProcessorContext, ProcessorStateManager, RecordCollector} -import org.apache.kafka.streams.state.StateSerdes - -/** - * Copied from Kafka's [[org.apache.kafka.streams.processor.internals.ProcessorContextImpl]]. - * - * If StoreChangeLogger had been public, we would have used it as-is. - * - * Note that the use of array-typed keys is discouraged because they result in incorrect caching - * behavior. If you intend to work on byte arrays as key, for example, you may want to wrap them - * with the [[org.apache.kafka.common.utils.Bytes]] class. - */ -class CMSStoreChangeLogger[K, V](val storeName: String, - val context: StateStoreContext, - val partition: Int, - val serialization: StateSerdes[K, V], - val processorNodeId: String) { - - private val topic = ProcessorStateManager.storeChangelogTopic(context.applicationId, storeName, context.taskId().topologyName()) - private val collector = context.asInstanceOf[RecordCollector.Supplier].recordCollector - - def this(storeName: String, context: StateStoreContext, serialization: StateSerdes[K, V], processorNodeId: String) = { - this(storeName, context, context.taskId.partition(), serialization, processorNodeId) - } - - def logChange(key: K, value: V, timestamp: Long): Unit = { - if (collector != null) { - val keySerializer = serialization.keySerializer - val valueSerializer = serialization.valueSerializer - collector.send(this.topic, key, value, null, this.partition, timestamp, keySerializer, valueSerializer, processorNodeId, context.asInstanceOf[InternalProcessorContext[Void,Void]]) - } - } - -} diff --git a/src/main/scala/io/confluent/examples/streams/algebird/ProbabilisticCounter.scala b/src/main/scala/io/confluent/examples/streams/algebird/ProbabilisticCounter.scala deleted file mode 100644 index b404c11581..0000000000 --- a/src/main/scala/io/confluent/examples/streams/algebird/ProbabilisticCounter.scala +++ /dev/null @@ -1,27 +0,0 @@ -package io.confluent.examples.streams.algebird - -import org.apache.kafka.streams.processor.api.{Processor, ProcessorContext, Record} - -/** - * Counts record values (in String format) probabilistically and then outputs the respective count estimate. - */ -class ProbabilisticCounter(val cmsStoreName: String) - extends Processor[String, String, String, Long] { - - private var cmsState: CMSStore[String] = _ - private var processorContext: ProcessorContext[String, Long] = _ - - override def init(processorContext: ProcessorContext[String, Long]): Unit = { - this.processorContext = processorContext - cmsState = this.processorContext.getStateStore[CMSStore[String]](cmsStoreName) - } - - override def process(record: Record[String, String]): Unit = { - // Count the record value, think: "+ 1" - cmsState.put(record.value(), record.timestamp()) - - // In this example: emit the latest count estimate for the record value. We could also do - // something different, e.g. periodically output the latest heavy hitters via `punctuate`. - processorContext.forward(new Record(record.value(), cmsState.get(record.value()), record.timestamp(), record.headers())) - } -} \ No newline at end of file diff --git a/src/main/scala/io/confluent/examples/streams/algebird/TopCMSSerde.scala b/src/main/scala/io/confluent/examples/streams/algebird/TopCMSSerde.scala deleted file mode 100644 index e99529e5a0..0000000000 --- a/src/main/scala/io/confluent/examples/streams/algebird/TopCMSSerde.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.examples.streams.algebird - -import java.util - -import com.twitter.algebird.TopCMS -import com.twitter.chill.ScalaKryoInstantiator -import org.apache.kafka.common.errors.SerializationException -import org.apache.kafka.common.serialization._ - -class TopCMSSerializer[T] extends Serializer[TopCMS[T]] { - - override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = { - // nothing to do - } - - override def serialize(topic: String, cms: TopCMS[T]): Array[Byte] = - if (cms == null) null - else ScalaKryoInstantiator.defaultPool.toBytesWithClass(cms) - - override def close(): Unit = { - // nothing to do - } - -} - -class TopCMSDeserializer[T] extends Deserializer[TopCMS[T]] { - - override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = { - // nothing to do - } - - override def deserialize(topic: String, bytes: Array[Byte]): TopCMS[T] = - if (bytes == null) null - else if (bytes.isEmpty) throw new SerializationException("byte array must not be empty") - else ScalaKryoInstantiator.defaultPool.fromBytes(bytes).asInstanceOf[TopCMS[T]] - - override def close(): Unit = { - // nothing to do - } - -} - -/** - * A [[Serde]] for [[TopCMS]]. - * - * =Usage= - * - * {{{ - * val anyTopic = "any-topic" - * val cms: TopCMS[String] = ??? - * val serde: Serde[TopCMS[String]] = TopCMSSerde[String] - * - * val bytes: Array[Byte] = serde.serializer().serialize(anyTopic, cms) - * val restoredCms: TopCMS[String] = serde.deserializer().deserialize(anyTopic, bytes) - * }}} - * - * =Future Work= - * - * We could perhaps be more efficient if we serialized not the full [[TopCMS]] instance but only - * its relevant fields. - */ -object TopCMSSerde { - - def apply[T]: Serde[TopCMS[T]] = Serdes.serdeFrom(new TopCMSSerializer[T], new TopCMSDeserializer[T]) - -} \ No newline at end of file diff --git a/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala b/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala deleted file mode 100644 index 950dabc037..0000000000 --- a/src/test/scala/io/confluent/examples/streams/ProbabilisticCountingScalaIntegrationTest.scala +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Copyright Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.examples.streams - -import java.util -import java.util.Properties -import io.confluent.examples.streams.algebird.{CMSStoreBuilder, ProbabilisticCounter} -import org.apache.kafka.common.serialization._ -import org.apache.kafka.streams.kstream.Named -import org.apache.kafka.streams.scala.ImplicitConversions._ -import org.apache.kafka.streams.scala.serialization.Serdes._ -import org.apache.kafka.streams.scala.StreamsBuilder -import org.apache.kafka.streams.scala.kstream.KStream -import org.apache.kafka.streams.{StreamsConfig, TopologyTestDriver} -import org.apache.kafka.test.TestUtils -import org.junit._ -import org.scalatestplus.junit.AssertionsForJUnit - -/** - * End-to-end integration test that demonstrates how to probabilistically count items in an input stream. - * - * This example uses a custom state store implementation, [[io.confluent.examples.streams.algebird.CMSStore]], - * that is backed by a Count-Min Sketch data structure. The algorithm is WordCount. - */ -class ProbabilisticCountingScalaIntegrationTest extends AssertionsForJUnit { - - private val inputTopic = "inputTopic" - private val outputTopic = "output-topic" - - @Test - def shouldProbabilisticallyCountWords(): Unit = { - val inputTextLines: Seq[String] = Seq( - "Hello Kafka Streams", - "All streams lead to Kafka", - "Join Kafka Summit" - ) - - val expectedWordCounts: Map[String, Long] = Map( - ("hello", 1L), - ("kafka", 1L), - ("streams", 1L), - ("all", 1L), - ("streams", 2L), - ("lead", 1L), - ("to", 1L), - ("kafka", 2L), - ("join", 1L), - ("kafka", 3L), - ("summit", 1L) - ) - - // Step 1: Create the topology and its configuration - val builder: StreamsBuilder = createTopology() - val streamsConfiguration = createTopologyConfiguration() - - val topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration) - try { - // Step 2: Write the input - import IntegrationTestScalaUtils._ - IntegrationTestScalaUtils.produceValuesSynchronously(inputTopic, inputTextLines, topologyTestDriver) - - // Step 3: Validate the output - val actualWordCounts: Map[String, Long] = - IntegrationTestScalaUtils.drainTableOutput[String, Long](outputTopic, topologyTestDriver) - - // Note: This example only processes a small amount of input data, for which the word counts - // will actually be exact counts. However, for large amounts of input data we would expect to - // observe approximate counts (where the approximate counts would be >= true exact counts). - assert(actualWordCounts === expectedWordCounts) - } finally { - topologyTestDriver.close() - } - } - - def createTopology(): StreamsBuilder = { - - def createCMSStoreBuilder(cmsStoreName: String): CMSStoreBuilder[String] = { - val changelogConfig: util.HashMap[String, String] = { - val cfg = new java.util.HashMap[String, String] - // The CMSStore's changelog will typically have rather few and small records per partition. - // To improve efficiency we thus set a smaller log segment size than Kafka's default of 1GB. - val segmentSizeBytes = (20 * 1024 * 1024).toString - cfg.put("segment.bytes", segmentSizeBytes) - cfg - } - new CMSStoreBuilder[String](cmsStoreName, Serdes.String()).withLoggingEnabled(changelogConfig) - } - - val builder = new StreamsBuilder - val cmsStoreName = "cms-store" - builder.addStateStore(createCMSStoreBuilder(cmsStoreName)) - val textLines: KStream[String, String] = builder.stream[String, String](inputTopic) - val approximateWordCounts: KStream[String, Long] = textLines - .flatMapValues(textLine => textLine.toLowerCase.split("\\W+")) - .process(() => new ProbabilisticCounter(cmsStoreName), Named.as("cms-store"), cmsStoreName) - approximateWordCounts.to(outputTopic) - builder - } - - def createTopologyConfiguration(): Properties = { - val p = new Properties() - p.put(StreamsConfig.APPLICATION_ID_CONFIG, "probabilistic-counting-scala-integration-test") - p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config") - // Use a temporary directory for storing state, which will be automatically removed after the test. - p.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory.getAbsolutePath) - p - } - -} \ No newline at end of file diff --git a/src/test/scala/io/confluent/examples/streams/algebird/CMSStoreTest.scala b/src/test/scala/io/confluent/examples/streams/algebird/CMSStoreTest.scala deleted file mode 100644 index 542a5557c3..0000000000 --- a/src/test/scala/io/confluent/examples/streams/algebird/CMSStoreTest.scala +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Copyright Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.examples.streams.algebird - -import com.twitter.algebird.TopCMS -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.header.Headers -import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.serialization.{Serdes, Serializer} -import org.apache.kafka.common.utils.LogContext -import org.apache.kafka.streams.processor.StateStoreContext -import org.apache.kafka.streams.processor.internals.{InternalProcessorContext, MockStreamsMetrics} -import org.apache.kafka.streams.state.KeyValueStoreTestDriver -import org.apache.kafka.streams.state.internals.ThreadCache -import org.apache.kafka.test.{InternalMockProcessorContext, MockRecordCollector, TestUtils} -import org.assertj.core.api.Assertions.assertThat -import org.junit._ -import org.scalatestplus.junit.AssertionsForJUnit -import org.scalatestplus.mockito.MockitoSugar - -import scala.collection.MapView - -class CMSStoreTest extends AssertionsForJUnit with MockitoSugar { - - private val anyStoreName = "cms-store" - - private def createTestContext[T](driver: KeyValueStoreTestDriver[Integer, TopCMS[T]] = createTestDriver[T](), - changelogRecords: Option[Seq[(Int, TopCMS[T])]] = None - ): InternalMockProcessorContext[Integer, TopCMS[T]] = { - // Write the records to the store's changelog - changelogRecords.getOrElse(Seq.empty).foreach { case (key, cms) => driver.addEntryToRestoreLog(key, cms) } - - // The processor context is what makes the restore data available to a store during - // the store's initialization, hence this is what we must return back. - val processorContext: InternalMockProcessorContext[Integer, TopCMS[T]] = { - val pc = driver.context.asInstanceOf[InternalMockProcessorContext[Integer, TopCMS[T]]] - pc.setTime(1) - pc - } - processorContext - } - - private def createTestDriver[T](): KeyValueStoreTestDriver[Integer, TopCMS[T]] = { - val keySerde = Serdes.Integer() - val cmsSerde = TopCMSSerde[T] - KeyValueStoreTestDriver.create[Integer, TopCMS[T]]( - keySerde.serializer(), - keySerde.deserializer(), - cmsSerde.serializer(), - cmsSerde.deserializer()) - } - - @Test - def shouldBeChangeloggingByDefault(): Unit = { - val store = new CMSStore[String](anyStoreName) - assertThat(store.loggingEnabled).isTrue - } - - @Test - def shouldBeNonPersistentStore(): Unit = { - val store = new CMSStore[String](anyStoreName) - assertThat(store.persistent).isFalse - } - - @Test - def shouldBeClosedBeforeInit(): Unit = { - val store = new CMSStore[String](anyStoreName) - assertThat(store.isOpen).isFalse - } - - @Test - def shouldBeOpenAfterInit(): Unit = { - // Given - val store: CMSStore[String] = new CMSStore[String](anyStoreName) - val processorContext = createTestContext[String]() - - // When - store.init(processorContext.asInstanceOf[StateStoreContext], store) - - // Then - assertThat(store.isOpen).isTrue - } - - @Test - def shouldBeClosedAfterClosing(): Unit = { - // Given - val store: CMSStore[String] = new CMSStore[String](anyStoreName) - - // When - store.close() - - // Then - assertThat(store.isOpen).isFalse - } - - - @Test - def shouldExactlyCountSmallNumbersOfItems(): Unit = { - // Given - val store: CMSStore[String] = new CMSStore[String](anyStoreName) - val items = Seq( - ("foo", System.currentTimeMillis()), - ("bar", System.currentTimeMillis()), - ("foo", System.currentTimeMillis()), - ("foo", System.currentTimeMillis()), - ("quux", System.currentTimeMillis()), - ("bar", System.currentTimeMillis()), - ("foor", System.currentTimeMillis())) - val processorContext = createTestContext() - store.init(processorContext.asInstanceOf[StateStoreContext], store) - - // When - items.foreach(x => store.put(x._1, x._2)) - - // Note: We intentionally do not flush the store in this test. - - // Then - assertThat(store.totalCount).isEqualTo(items.size) - assertThat(store.heavyHitters).isEqualTo(items.map(x => x._1).toSet) - val expWordCounts: MapView[String, Int] = items.map(x => x._1).groupBy(identity).view.mapValues(_.length) - expWordCounts.foreach { case (word, count) => assertThat(store.get(word)).isEqualTo(count) } - } - - @Test - def shouldBackupToChangelogIfLoggingIsEnabled(): Unit = { - // Given - val driver: KeyValueStoreTestDriver[Integer, TopCMS[String]] = createTestDriver[String]() - val processorContext = createTestContext(driver) - val store: CMSStore[String] = new CMSStore[String](anyStoreName, loggingEnabled = true) - store.init(processorContext.asInstanceOf[StateStoreContext], store) - - // When - val items = Seq( - ("one", System.currentTimeMillis()), - ("two", System.currentTimeMillis()), - ("three", System.currentTimeMillis())) - items.foreach(x => store.put(x._1, x._2)) - store.flush() - - // Then - val cms: TopCMS[String] = store.cmsFrom(items.map(x => x._1)) - assertThat(driver.flushedEntryStored(store.changelogKey)).isEqualTo(cms) - assertThat(driver.flushedEntryRemoved(store.changelogKey)).isFalse - } - - @Test - def shouldBackupToChangelogOnlyOnFlush(): Unit = { - // Given - val store: CMSStore[String] = new CMSStore[String](anyStoreName, loggingEnabled = true) - val observedChangelogRecords = new java.util.ArrayList[ProducerRecord[_, _]] - val processorContext = { - // We must use a "spying" RecordCollector because, unfortunately, Kafka's - // KeyValueStoreTestDriver is not providing any such facilities. - val observingCollector: MockRecordCollector = new MockRecordCollector() { - override def send[K, V](topic: String, - key: K, - value: V, - headers: Headers, - partition: Integer, - timestamp: java.lang.Long, - keySerializer: Serializer[K], - valueSerializer: Serializer[V], - processorNodeId: String, - internalContext: InternalProcessorContext[Void,Void]): Unit = { - observedChangelogRecords.add(new ProducerRecord[K, V](topic, partition, timestamp, key, value)) - } - } - val cache: ThreadCache = new ThreadCache(new LogContext("test"), 1024, new MockStreamsMetrics(new Metrics)) - val context = new InternalMockProcessorContext(TestUtils.tempDirectory, Serdes.Integer(), TopCMSSerde[String], observingCollector, cache) - context.setTime(1) - context - } - store.init(processorContext.asInstanceOf[StateStoreContext], store) - val items = Seq( - ("one", System.currentTimeMillis()), - ("two", System.currentTimeMillis()), - ("three", System.currentTimeMillis()), - ("four", System.currentTimeMillis()), - ("firve", System.currentTimeMillis())) - - // When - items.foreach(x => store.put(x._1, x._2)) - // Then - assertThat(observedChangelogRecords.size()).isEqualTo(0) - - // When - store.flush() - // Then - assertThat(observedChangelogRecords.size()).isEqualTo(1) - } - - @Test - def shouldNotBackupToChangelogIfLoggingIsDisabled(): Unit = { - // Given - val driver: KeyValueStoreTestDriver[Integer, TopCMS[String]] = createTestDriver[String]() - val processorContext = createTestContext(driver) - val store: CMSStore[String] = new CMSStore[String](anyStoreName, loggingEnabled = false) - store.init(processorContext.asInstanceOf[StateStoreContext], store) - - // When - val items = Seq( - ("one", System.currentTimeMillis()), - ("two", System.currentTimeMillis()), - ("three", System.currentTimeMillis())) - items.foreach(x => store.put(x._1, x._2)) - store.flush() - - // Then - assertThat(driver.flushedEntryStored(store.changelogKey)).isNull() - assertThat(driver.flushedEntryRemoved(store.changelogKey)).isFalse - } - - @Test - def shouldRestoreFromEmptyChangelog(): Unit = { - // Given - val driver: KeyValueStoreTestDriver[Integer, TopCMS[String]] = createTestDriver[String]() - val store: CMSStore[String] = new CMSStore[String](anyStoreName, loggingEnabled = true) - val processorContext = createTestContext[String](driver) - - // When - store.init(processorContext.asInstanceOf[StateStoreContext], store) - processorContext.restore(store.name, driver.restoredEntries()) - - // Then - assertThat(store.totalCount).isZero - assertThat(store.heavyHitters.size).isZero - } - - @Test - def shouldRestoreFromNonEmptyChangelog(): Unit = { - // Given - val driver: KeyValueStoreTestDriver[Integer, TopCMS[String]] = createTestDriver[String]() - val store: CMSStore[String] = new CMSStore[String](anyStoreName, loggingEnabled = true) - val items: Seq[String] = Seq("foo", "bar", "foo", "foo", "quux", "bar", "foo") - val processorContext: InternalMockProcessorContext[Integer, TopCMS[String]] = { - val changelogKeyDoesNotMatter = 123 - val cms: TopCMS[String] = store.cmsFrom(items) - val changelogRecords = Seq((changelogKeyDoesNotMatter, cms)) - createTestContext(driver, changelogRecords = Some(changelogRecords)) - } - - // When - store.init(processorContext.asInstanceOf[StateStoreContext], store) - processorContext.restore(store.name, driver.restoredEntries()) - - // Then - val expWordCounts: MapView[String, Int] = items.groupBy(identity).view.mapValues(_.length) - expWordCounts.foreach { case (word, count) => assertThat(store.get(word)).isEqualTo(count) } - } - - @Test - def shouldRestoreFromChangelogTombstone(): Unit = { - // Given - val driver: KeyValueStoreTestDriver[Integer, TopCMS[String]] = createTestDriver[String]() - val store: CMSStore[String] = new CMSStore[String](anyStoreName, loggingEnabled = true) - val processorContext: InternalMockProcessorContext[Integer, TopCMS[String]] = { - val changelogKeyDoesNotMatter = 123 - val tombstone: TopCMS[String] = null - val changelogRecords = Seq((changelogKeyDoesNotMatter, tombstone)) - createTestContext(driver, changelogRecords = Some(changelogRecords)) - } - - // When - store.init(processorContext.asInstanceOf[StateStoreContext], store) - processorContext.restore(store.name, driver.restoredEntries()) - - // Then - assertThat(store.totalCount).isZero - assertThat(store.heavyHitters.size).isZero - } - - @Test - def shouldRestoreFromLatestChangelogRecordOnly(): Unit = { - // Given - val driver: KeyValueStoreTestDriver[Integer, TopCMS[String]] = createTestDriver[String]() - val store: CMSStore[String] = new CMSStore[String](anyStoreName, loggingEnabled = true) - val expectedItems = Seq("foo", "bar", "foo", "foo", "quux", "bar", "foo") - val unexpectedItems1 = Seq("something", "entirely", "different") - val unexpectedItems2 = Seq("even", "more", "different") - val processorContext: InternalMockProcessorContext[Integer, TopCMS[String]] = { - val cms = store.cmsFrom(expectedItems) - val differentCms1 = store.cmsFrom(unexpectedItems1) - val differentCms2 = store.cmsFrom(unexpectedItems2) - val sameKey = 123 - val differentKey = 456 - val changelogRecords = Seq( - (sameKey, differentCms1), - (differentKey, differentCms2), - (sameKey, cms) /* latest entry in the changelog should "win" */ - ) - createTestContext(driver, changelogRecords = Some(changelogRecords)) - } - - // When - store.init(processorContext.asInstanceOf[StateStoreContext], store) - processorContext.restore(store.name, driver.restoredEntries()) - - // Then - val expWordCounts: MapView[String, Int] = expectedItems.groupBy(identity).view.mapValues(_.length) - expWordCounts.foreach { case (word, count) => assertThat(store.get(word)).isEqualTo(count) } - // Note: The asserts below work only because, given the test setup, we are sure not to run into - // CMS hash collisions that would lead to non-zero counts even for un-counted items. - unexpectedItems1.toSet[String].foreach(item => assertThat(store.get(item)).isEqualTo(0)) - unexpectedItems2.toSet[String].foreach(item => assertThat(store.get(item)).isEqualTo(0)) - } - - @Test - def shouldNotRestoreFromChangelogIfLoggingIsDisabled(): Unit = { - // Given - val store: CMSStore[String] = new CMSStore[String](anyStoreName, loggingEnabled = false) - val items: Seq[String] = Seq("foo", "bar", "foo", "foo", "quux", "bar", "foo") - val processorContext: InternalMockProcessorContext[Integer, TopCMS[String]] = { - val changelogKeyDoesNotMatter = 123 - val cms: TopCMS[String] = store.cmsFrom(items) - val changelogRecords = Seq((changelogKeyDoesNotMatter, cms)) - createTestContext(changelogRecords = Some(changelogRecords)) - } - - // When - store.init(processorContext.asInstanceOf[StateStoreContext], store) - - // Then - assertThat(store.totalCount).isZero - assertThat(store.heavyHitters.size).isZero - } - -} \ No newline at end of file diff --git a/src/test/scala/io/confluent/examples/streams/algebird/TopCMSSerdeTest.scala b/src/test/scala/io/confluent/examples/streams/algebird/TopCMSSerdeTest.scala deleted file mode 100644 index ad99de917a..0000000000 --- a/src/test/scala/io/confluent/examples/streams/algebird/TopCMSSerdeTest.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.examples.streams.algebird - -import com.twitter.algebird.{TopCMS, TopPctCMS, TopPctCMSMonoid} -import org.apache.kafka.common.errors.SerializationException -import org.apache.kafka.common.serialization.Serde -import org.assertj.core.api.Assertions.assertThat -import org.junit._ -import org.scalatestplus.junit.AssertionsForJUnit -import org.scalatestplus.mockito.MockitoSugar - -class TopCMSSerdeTest extends AssertionsForJUnit with MockitoSugar { - - private val anyTopic = "any-topic" - - @Test - def shouldRoundtrip(): Unit = { - // Given - val cmsMonoid: TopPctCMSMonoid[String] = { - val delta: Double = 1E-10 - val eps: Double = 0.001 - val seed: Int = 1 - val heavyHittersPct: Double = 0.01 - TopPctCMS.monoid[String](eps, delta, seed, heavyHittersPct) - } - val itemToBeCounted = "count-me" - val cms: TopCMS[String] = cmsMonoid.create(itemToBeCounted) - val serde: Serde[TopCMS[String]] = TopCMSSerde[String] - - // When - val bytes: Array[Byte] = serde.serializer().serialize(anyTopic, cms) - val restoredCms = serde.deserializer().deserialize(anyTopic, bytes) - - // Then - assertThat(restoredCms).isEqualTo(cms) - assertThat(restoredCms.frequency(itemToBeCounted).estimate).isEqualTo(cms.frequency(itemToBeCounted).estimate) - } - - @Test - def shouldSerializeNullToNull(): Unit = { - val serde: Serde[TopCMS[String]] = TopCMSSerde[String] - assertThat(serde.serializer.serialize(anyTopic, null)).isNull() - serde.close() - } - - @Test - def shouldDeserializeNullToNull(): Unit = { - val serde: Serde[TopCMS[String]] = TopCMSSerde[String] - assertThat(serde.deserializer.deserialize(anyTopic, null)).isNull() - serde.close() - } - - @Test - def shouldThrowSerializationExceptionWhenDeserializingZeroBytes(): Unit = { - val serde: Serde[TopCMS[String]] = TopCMSSerde[String] - try { - serde.deserializer.deserialize(anyTopic, new Array[Byte](0)) - fail("Should have thrown a SerializationException because of zero input bytes"); - } - catch { - case _: SerializationException => // Ignore (there's no contract on the details of the exception) - } - serde.close() - } - -} \ No newline at end of file