diff --git a/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala b/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala index 513a90dba..f7c6237f6 100644 --- a/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala +++ b/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala @@ -403,7 +403,6 @@ class CassandraPartition(index: ChunkIDPartitionChunkIndex, val (rangeQuery, infosSkips) = method match { case AllChunkScan => (true, index.allChunks.toSeq) case RowKeyChunkScan(k1, k2) => (false, index.rowKeyRange(k1.binRec, k2.binRec).toSeq) - case SingleChunkScan(key, id) => (false, index.singleChunk(key.binRec, id).toSeq) case LastSampleChunkScan => (false, index.latestN(1).toSeq) } logger.debug(s"Reading chunks from columns ${columnIds.toList}, ${index.binPartition}, method $method") diff --git a/cassandra/src/test/scala/filodb.cassandra/columnstore/CassandraBackedTimeSeriesPartitionSpec.scala b/cassandra/src/test/scala/filodb.cassandra/columnstore/CassandraBackedTimeSeriesPartitionSpec.scala index dee1b2fa1..e282181bc 100644 --- a/cassandra/src/test/scala/filodb.cassandra/columnstore/CassandraBackedTimeSeriesPartitionSpec.scala +++ b/cassandra/src/test/scala/filodb.cassandra/columnstore/CassandraBackedTimeSeriesPartitionSpec.scala @@ -39,10 +39,10 @@ class CassandraBackedTimeSeriesPartitionSpec extends TimeSeriesPartitionSpec wit val start: BinaryRecord = BinaryRecord(dataset1, Seq(now)) val end: BinaryRecord = BinaryRecord(dataset1, Seq(now + 20000 - 100)) // query for 2 chunks val scan = RowKeyChunkScan(start, end) - val colIds = Array(0,1) + val colIds = Array(0, 1) val readers1 = part.readers(scan, colIds).toList - // we should see 20 rows in two chunks, then add one for the latest unencoded chunk - readers1.size shouldBe 3 + // we should see 20 rows in two chunks. No data ingested means no chunks for write buffer + readers1.size shouldBe 2 readers1.map(_.rowIterator().size).sum shouldEqual 20 val readTuples = readers1.flatMap(_.rowIterator().map(r => (r.getLong(0), r.getDouble(1)))) val ingestedTuples = data.take(20).toList.map(r => (r.getLong(0), r.getDouble(1))) @@ -50,14 +50,14 @@ class CassandraBackedTimeSeriesPartitionSpec extends TimeSeriesPartitionSpec wit // now query all chunks val readers2 = part.readers(AllChunkScan, colIds).toList - // we should see at least 4 chunks, each with 10 rows - readers2.size should be > 4 + // we should see exactly 4 chunks, each with 10 rows (since we didn't ingest new data) + readers2.size shouldEqual 4 readers2.map(_.rowIterator().size).sum shouldEqual 40 // since it may include rows from earlier runs // now test streamReaders val readers3 = part.streamReaders(scan, colIds).toListL.runAsync.futureValue - // we should see 20 rows in two chunks, then add one for the latest unencoded chunk - readers3.size shouldBe 3 + // we should see 20 rows in two chunks + readers3.size shouldBe 2 readers3.map(_.rowIterator().size).sum shouldEqual 20 } diff --git a/coordinator/src/main/scala/filodb.coordinator/IngestionActor.scala b/coordinator/src/main/scala/filodb.coordinator/IngestionActor.scala index f29a6a79d..cca5f938c 100644 --- a/coordinator/src/main/scala/filodb.coordinator/IngestionActor.scala +++ b/coordinator/src/main/scala/filodb.coordinator/IngestionActor.scala @@ -143,6 +143,7 @@ private[filodb] final class IngestionActor(dataset: Dataset, private def normalIngestion(shard: Int, offset: Option[Long], startingGroupNo: Int): Unit = { create(shard, offset) map { ingestionStream => val stream = ingestionStream.get + logger.info(s"Starting normal/active ingestion for shard $shard at offset $offset") clusterActor ! IngestionStarted(dataset.ref, shard, context.parent) streamSubscriptions(shard) = memStore.ingestStream(dataset.ref, shard, stream, flushStream(startingGroupNo)) { @@ -150,10 +151,11 @@ private[filodb] final class IngestionActor(dataset: Dataset, } // On completion of the future, send IngestionStopped // except for noOpSource, which would stop right away, and is used for sending in tons of data - streamSubscriptions(shard).foreach { x => + // also: small chance for race condition here due to remove call in stop() method + streamSubscriptions.get(shard).map(_.foreach { x => if (source != NodeClusterActor.noOpSource) clusterActor ! IngestionStopped(dataset.ref, shard) ingestionStream.teardown() - } + }) } recover { case NonFatal(t) => handleError(dataset.ref, shard, t) } diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesPartition.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesPartition.scala index 5b35d0d69..0c3569462 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesPartition.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesPartition.scala @@ -1,24 +1,25 @@ package filodb.core.memstore import java.lang.management.ManagementFactory +import java.util.concurrent.ConcurrentSkipListMap import scala.concurrent.duration._ import com.typesafe.scalalogging.StrictLogging import monix.eval.Task import monix.reactive.Observable -import org.jctools.maps.NonBlockingHashMapLong import scalaxy.loops._ import filodb.core.Iterators import filodb.core.Types._ import filodb.core.binaryrecord.BinaryRecord import filodb.core.metadata.Dataset -import filodb.core.query.{ChunkIDPartitionChunkIndex, ChunkSetReader} +import filodb.core.query.ChunkSetReader import filodb.core.store._ import filodb.memory.{BlockHolder, ReclaimListener} import filodb.memory.format._ +final case class InfoChunks(info: ChunkSetInfo, chunks: Array[BinaryVector[_]]) /** * A MemStore Partition holding chunks of data for different columns (a schema) for time series use cases. @@ -50,39 +51,27 @@ class TimeSeriesPartition(val dataset: Dataset, bufferPool: WriteBufferPool, val shardStats: TimeSeriesShardStats) extends FiloPartition with StrictLogging { import ChunkSetInfo._ - /** - * This is a map from chunkId to the Array of optimized/frozen chunks(BinaryVector) corresponding to that chunkId. - * - * NOTE: private final compiles down to a field in bytecode, faster than method invocation - */ - private final val vectors = new NonBlockingHashMapLong[Array[BinaryVector[_]]](32, false) + import collection.JavaConverters._ - /** - * As new chunks are initialized in this partition, their ids are appended to this queue - */ - private final val chunkIDs = new collection.mutable.Queue[ChunkID] + private val nullAppenders = UnsafeUtils.ZeroPointer.asInstanceOf[Array[RowReaderAppender]] + private val nullChunks = UnsafeUtils.ZeroPointer.asInstanceOf[Array[BinaryAppendableVector[_]]] - /** - * This is the index enabling queries to be done on the ingested data. Allows for - * query by chunkId, rowKey etc. See [[filodb.core.memstore.TimeSeriesPartition#readers]] - * for how this is done. - * - * This only holds immutable, finished chunks. - * - */ - private final val index = new ChunkIDPartitionChunkIndex(binPartition, dataset) - // Set initial size to a fraction of the max chunk size, so that partitions with sparse amount of data - // will not cause too much memory bloat. GrowableVector allows vectors to grow, so this should be OK - private val (initAppenders, initCurChunks) = bufferPool.obtain() + /** + * This is the ONE main data structure holding the chunks of a TSPartition. It is appended to as chunks arrive + * and are encoded. The last or most recent item should be the write buffer chunks. + * Key(ChunkID) -> Value InfoChunks(ChunkSetInfo, Array[BinaryVector]) + * Thus it is sorted by increasing chunkID, yet addressable by chunkID, and concurrent. + */ + private var infosChunks = new ConcurrentSkipListMap[ChunkID, InfoChunks].asScala /** * Ingested data goes into this appender. There is one appender for each column in the dataset. * Var mutates when buffers are switched for optimization. During switching of buffers * in [[filodb.core.memstore.TimeSeriesPartition#switchBuffers]], current var - * value is assigned to flushingAppenders, and new appender that is added to the partition is assigned to this var. + * value is assigned to flushingAppenders, and null appenders is assigned until more data arrives. */ - private var appenders = initAppenders + private var appenders = nullAppenders /** * This is essentially the chunks (binaryVectors) associated with 'appenders' member of this class. This var will @@ -92,17 +81,17 @@ class TimeSeriesPartition(val dataset: Dataset, * in [[filodb.core.memstore.TimeSeriesPartition#switchBuffers]], * and new chunk is added to the partition. */ - private var currentChunks = initCurChunks + private var currentChunks = nullChunks /** * This var holds the next appender to be optimized and persisted. * Mutates when buffers are switched for optimization in [[filodb.core.memstore.TimeSeriesPartition#switchBuffers]]. * There is one element for each column of the dataset. - * Initialized to ZeroPointer since at the beginning nothing is ready to be flushed + * At the beginning nothing is ready to be flushed, so set to null */ - private var flushingAppenders = UnsafeUtils.ZeroPointer.asInstanceOf[Array[RowReaderAppender]] + private var flushingAppenders = nullAppenders - /** + /** * Holds the id of the chunks in flushingAppender that should be flushed next. * Mutates when buffers are switched for optimization in [[filodb.core.memstore.TimeSeriesPartition#switchBuffers]]. * @@ -114,7 +103,7 @@ class TimeSeriesPartition(val dataset: Dataset, /** * Number of columns in the dataset */ - private final val numColumns = appenders.size + private final val numColumns = dataset.dataColumns.length private val jvmStartTime = ManagementFactory.getRuntimeMXBean.getStartTime @@ -125,39 +114,33 @@ class TimeSeriesPartition(val dataset: Dataset, private var partitionLoadedFromPersistentStore = false - // Not used for now - // private final val partitionVectors = new Array[FiloVector[_]](dataset.partitionColumns.length) - - // Add initial write buffers as the first chunkSet/chunkID - initNewChunk() - /** * Ingests a new row, adding it to currentChunks. * Note that it is the responsibility of flush() to ensure the right currentChunks is allocated. */ def ingest(row: RowReader, offset: Long): Unit = { + if (appenders == nullAppenders) initNewChunk() for { col <- 0 until numColumns optimized } { appenders(col).append(row) } } /** - * Atomically switches the writeBuffers/appenders to a new empty one. + * Atomically switches the writeBuffers/appenders to a null one. If and when we get more data, then + * we will initialize the appenders to new ones. This way dead partitions not getting more data will not + * waste empty appenders. * The old writeBuffers/chunks becomes flushingAppenders. * In theory this may be called from another thread from ingest(), but then ingest() may continue to write * to the old flushingAppenders buffers until the concurrent ingest() finishes. * To guarantee no more writes happen when switchBuffers is called, have ingest() and switchBuffers() be * called from a single thread / single synchronous stream. */ - def switchBuffers(): Unit = if (currentChunks(0).length > 0) { - // Get new write buffers from pool + def switchBuffers(): Unit = if (latestChunkLen > 0) { flushingAppenders = appenders - flushingChunkID = chunkIDs.last - val newAppendersAndChunks = bufferPool.obtain() - // Right after this all ingest() calls will append to new chunks - appenders = newAppendersAndChunks._1 - currentChunks = newAppendersAndChunks._2 - initNewChunk() // At this point the new buffers can be read from + flushingChunkID = infosChunks.keys.last + // Right after this all ingest() calls will check and potentially append to new chunks + appenders = nullAppenders + currentChunks = nullChunks } /** @@ -183,7 +166,7 @@ class TimeSeriesPartition(val dataset: Dataset, //This assumption cannot break. We should ensure one vector can be written //to one block always atleast as per the current design. We want a flush group //to typically end in atmost 2 blocks. - //TODO Check if this is alright + //TODO: Break up chunks longer than blockAllocationSize into smaller ones assert(blockHolder.blockAllocationSize() > appender.appender.frozenSize) val optimized = appender.appender.optimize(blockHolder) shardStats.encodedBytes.increment(optimized.numBytes) @@ -193,29 +176,28 @@ class TimeSeriesPartition(val dataset: Dataset, shardStats.numSamplesEncoded.increment(numSamples) shardStats.numChunksEncoded.increment(frozenVectors.length) - // replace appendableVectors reference in vectors hash with compacted, immutable chunks - vectors.put(flushingChunkID, frozenVectors) - - // release older appenders back to pool. Nothing at this point should reference the older appenders. - bufferPool.release(flushingAppenders) - flushingAppenders = UnsafeUtils.ZeroPointer.asInstanceOf[Array[RowReaderAppender]] - // Create ChunkSetInfo val reader = new FastFiloRowReader(frozenVectors.asInstanceOf[Array[FiloVector[_]]]) reader.setRowNo(0) val firstRowKey = dataset.rowKey(reader) reader.setRowNo(numSamples - 1) val lastRowKey = dataset.rowKey(reader) - val chunkInfo = ChunkSetInfo(flushingChunkID, numSamples, firstRowKey, lastRowKey) - index.add(chunkInfo, Nil) + val chunkInfo = infosChunks(flushingChunkID).info.copy(numRows = numSamples, + firstKey = firstRowKey, + lastKey = lastRowKey) + // replace write buffers / vectors with compacted, immutable chunks + infosChunks.put(flushingChunkID, InfoChunks(chunkInfo, frozenVectors)) + + // release older appenders back to pool. Nothing at this point should reference the older appenders. + bufferPool.release(flushingAppenders) + flushingAppenders = nullAppenders blockHolder.endPartition(new ReclaimListener { //It is very likely that a flushChunk ends only in one block. At worst in may end up in a couple. //So a blockGroup contains atmost 2. When any one Block in the flushGroup is evicted the flushChunk is removed. //So if and when a second block gets reclaimed this is a no-op override def onReclaim(): Unit = { - vectors.remove(chunkInfo.id) - index.remove(chunkInfo.id) + infosChunks.remove(chunkInfo.id) shardStats.chunkIdsEvicted.increment() } }) @@ -225,30 +207,11 @@ class TimeSeriesPartition(val dataset: Dataset, } } - def latestN(n: Int): InfosSkipsIt = - if (latestChunkLen > 0) { latestChunkIt ++ index.latestN(n - 1) } - else { index.latestN(n) } - - def numChunks: Int = chunkIDs.size - def latestChunkLen: Int = currentChunks(0).length - - /** - * Gets the most recent n ChunkSetInfos and skipMaps (which will be empty) - */ - def newestChunkIds(n: Int): InfosSkipsIt = { - val latest = latestChunkIt - val numToTake = if (latest.isEmpty) n else (n - 1) - index.latestN(numToTake) ++ latest - } - - private def latestChunkInfo: ChunkSetInfo = - ChunkSetInfo(chunkIDs.last, latestChunkLen, BinaryRecord.empty, BinaryRecord.empty) - - private def latestChunkIt: InfosSkipsIt = Iterator.single((latestChunkInfo, emptySkips)) + def numChunks: Int = infosChunks.size + def latestChunkLen: Int = if (currentChunks != nullChunks) currentChunks(0).length else 0 private def getVectors(columnIds: Array[Int], - vectors: Array[BinaryVector[_]], - vectLength: Int): Array[FiloVector[_]] = { + vectors: Array[BinaryVector[_]]): Array[FiloVector[_]] = { val finalVectors = new Array[FiloVector[_]](columnIds.size) for { i <- 0 until columnIds.size optimized } { finalVectors(i) = if (Dataset.isPartitionID(columnIds(i))) { constPartitionVector(columnIds(i)) } @@ -258,23 +221,31 @@ class TimeSeriesPartition(val dataset: Dataset, } private def readersFromMemory(method: ChunkScanMethod, columnIds: Array[Int]): Iterator[ChunkSetReader] = { - val infosSkips = method match { - case AllChunkScan => index.allChunks ++ latestChunkIt + val infosVects = method match { + case AllChunkScan => + infosChunks.values.toIterator // To derive time range: r.startkey.getLong(0) -> r.endkey.getLong(0) - case r: RowKeyChunkScan => index.rowKeyRange(r.startkey, r.endkey) ++ latestChunkIt - case r @ SingleChunkScan(_, id) => index.singleChunk(r.startkey, id) - case LastSampleChunkScan => latestN(1) + case r: RowKeyChunkScan => + infosChunks.values.toIterator.filter { ic => + ic.info.firstKey.isEmpty || // empty key = most recent chunk, get a free pass + ic.info.intersection(r.startkey, r.endkey).isDefined + } + case LastSampleChunkScan => + if (infosChunks.isEmpty) Iterator.empty else Iterator.single(infosChunks.values.last) } - infosSkips.map { case (info, skips) => - val vectArray = vectors.get(info.id) + infosVects.map { case InfoChunks(info, vectArray) => //scalastyle:off - require(vectArray != null, - s"INCONSISTENCY! vectArray is null, ${info.id} does not exist in vectors but $info is in index") + require(vectArray != null, "INCONSISTENCY! vectArray is null!") //scalastyle:on - val chunkset = getVectors(columnIds, vectArray, info.numRows) + val realInfo = info match { + // First chunk, fill in real chunk length. TODO: also fill in current first & last key + case i @ ChunkSetInfo(_, -1, _, _) => i.copy(numRows = latestChunkLen) + case _: Any => info + } + val chunkset = getVectors(columnIds, vectArray) shardStats.numChunksQueried.increment(chunkset.length) - new ChunkSetReader(info, skips, chunkset) + new ChunkSetReader(realInfo, emptySkips, chunkset) } } @@ -282,12 +253,16 @@ class TimeSeriesPartition(val dataset: Dataset, /** * Initializes vectors, chunkIDs for a new chunkset/chunkID. - * This is called once every chunk-duration for each group, when the buffers are switched for that group + * This is called after switchBuffers() upon the first data that arrives. + * Also compacts infosChunks tombstones. */ private def initNewChunk(): Unit = { - val newChunkID = timeUUID64 - vectors.put(newChunkID, currentChunks.asInstanceOf[Array[BinaryVector[_]]]) - chunkIDs += newChunkID + val (newAppenders, newCurChunks) = bufferPool.obtain() + appenders = newAppenders + currentChunks = newCurChunks + val newChunkId = timeUUID64 + val newInfo = ChunkSetInfo(newChunkId, -1, BinaryRecord.empty, BinaryRecord.empty) + infosChunks.put(newChunkId, InfoChunks(newInfo, currentChunks.asInstanceOf[Array[BinaryVector[_]]])) } /** @@ -345,8 +320,7 @@ class TimeSeriesPartition(val dataset: Dataset, .flatMap(_.streamReaders(AllChunkScan, dataset.dataColumns.map(_.id).toArray)) // all chunks, all columns readers.map { r => // TODO r.vectors is on-heap. It should be copied to the offheap store before adding to index. - vectors.put(r.info.id, r.vectors.map(_.asInstanceOf[BinaryVector[_]])) - index.add(r.info, Nil) + infosChunks.put(r.info.id, InfoChunks(r.info, r.vectors.map(_.asInstanceOf[BinaryVector[_]]))) }.countL.map { count => shardStats.partitionsPagedFromColStore.increment() shardStats.chunkIdsPagedFromColStore.increment(count) diff --git a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala index 9120d6b2f..bf12671e5 100644 --- a/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala +++ b/core/src/main/scala/filodb.core/memstore/TimeSeriesShard.scala @@ -17,7 +17,7 @@ import filodb.core.Types.PartitionKey import filodb.core.metadata.Dataset import filodb.core.query.PartitionKeyIndex import filodb.core.store._ -import filodb.memory.{BlockHolder, MemoryStats, NativeMemoryManager, PageAlignedBlockManager} +import filodb.memory.{BlockMemFactoryPool, MemoryStats, NativeMemoryManager, PageAlignedBlockManager} import filodb.memory.format.{SchemaRowReader, ZeroCopyUTF8String} @@ -56,6 +56,8 @@ class TimeSeriesShardStats(dataset: DatasetRef, shardNum: Int) { val partitionsQueried = Kamon.metrics.counter("memstore-partitions-queried", tags) val numChunksQueried = Kamon.metrics.counter("memstore-chunks-queried", tags) val memoryStats = new MemoryStats(tags) + + val bufferPoolSize = Kamon.metrics.gauge("memstore-writebuffer-pool-size", tags)(0L) } // TODO for scalability: get rid of stale partitions? @@ -120,6 +122,7 @@ class TimeSeriesShard(dataset: Dataset, config: Config, val shardNum: Int, sink: // The off-heap block store used for encoded chunks protected val blockMemorySize: Long = shardMemoryMB * 1024 * 1024L private val blockStore = new PageAlignedBlockManager(blockMemorySize, shardStats.memoryStats, numPagesPerBlock) + private val blockFactoryPool = new BlockMemFactoryPool(blockStore) private val numColumns = dataset.dataColumns.size // The off-heap buffers used for ingesting the newest data samples @@ -129,6 +132,8 @@ class TimeSeriesShard(dataset: Dataset, config: Config, val shardNum: Int, sink: /** * Unencoded/unoptimized ingested data is stored in buffers that are allocated from this off-heap pool + * Set initial size to a fraction of the max chunk size, so that partitions with sparse amount of data + * will not cause too much memory bloat. GrowableVector allows vectors to grow, so this should be OK */ private val bufferPool = new WriteBufferPool(bufferMemoryManager, dataset, maxChunksSize / 8, maxNumPartitions) logger.info(s"Finished initializing memory pools for shard $shardNum") @@ -219,13 +224,14 @@ class TimeSeriesShard(dataset: Dataset, config: Config, val shardNum: Int, sink: Task.fromFuture(commitCheckpoint(dataset.ref, shardNum, flushGroup)) } taskToReturn.runOnComplete(_ => tracer.finish()) + shardStats.bufferPoolSize.record(bufferPool.poolSize) taskToReturn } private def doFlushSteps(flushGroup: FlushGroup, partitionIt: Iterator[TimeSeriesPartition]): Task[Response] = { // Only allocate the blockHolder when we actually have chunks/partitions to flush - val blockHolder = new BlockHolder(blockStore) + val blockHolder = blockFactoryPool.checkout() // Given the flush group, create an observable of ChunkSets val chunkSetIt = partitionIt.flatMap(_.makeFlushChunks(blockHolder)) @@ -259,9 +265,11 @@ class TimeSeriesShard(dataset: Dataset, config: Config, val shardNum: Int, sink: }.map { case resp => logger.info(s"Flush of shard=$shardNum group=$flushGroup response=$resp offset=${_offset}") blockHolder.markUsedBlocksReclaimable() + blockFactoryPool.release(blockHolder) resp }.recover { case e => logger.error("Internal Error - should have not reached this state", e) + blockFactoryPool.release(blockHolder) DataDropped } Task.fromFuture(taskFuture) diff --git a/core/src/main/scala/filodb.core/query/Aggregate.scala b/core/src/main/scala/filodb.core/query/Aggregate.scala index 90ce470c0..d3ef407a9 100644 --- a/core/src/main/scala/filodb.core/query/Aggregate.scala +++ b/core/src/main/scala/filodb.core/query/Aggregate.scala @@ -2,6 +2,7 @@ package filodb.core.query import scala.reflect.{classTag, ClassTag} +import com.typesafe.scalalogging.StrictLogging import enumeratum.{Enum, EnumEntry} import enumeratum.EnumEntry.Snakecase import org.scalactic._ @@ -97,14 +98,18 @@ trait OneValueAggregator extends Aggregator { def combine(first: A, second: A): A = ??? } -trait ChunkAggregator extends Aggregator { +trait ChunkAggregator extends Aggregator with StrictLogging { def add(orig: A, reader: ChunkSetReader): A def positions: Array[Int] - def aggPartition(method: ChunkScanMethod, partition: FiloPartition): A = + def aggPartition(method: ChunkScanMethod, partition: FiloPartition): A = { + logger.trace(s"Aggregating partition ${partition.binPartition}") partition.readers(method, positions).foldLeft(emptyAggregate) { - case (agg, reader) => add(agg, reader) + case (agg, reader) => + logger.trace(s" --> agg=$agg reader=${reader.info}") + add(agg, reader) } + } } class NumBytesAggregator(vectorPos: Int = 0) extends ChunkAggregator { diff --git a/core/src/main/scala/filodb.core/query/PartitionChunkIndex.scala b/core/src/main/scala/filodb.core/query/PartitionChunkIndex.scala index ab699599b..7fb429c0d 100644 --- a/core/src/main/scala/filodb.core/query/PartitionChunkIndex.scala +++ b/core/src/main/scala/filodb.core/query/PartitionChunkIndex.scala @@ -45,7 +45,6 @@ trait PartitionChunkIndex { method match { case AllChunkScan => allChunks case RowKeyChunkScan(k1, k2) => rowKeyRange(k1.binRec, k2.binRec) - case SingleChunkScan(key, id) => singleChunk(key.binRec, id) case LastSampleChunkScan => latestN(1) } } diff --git a/core/src/main/scala/filodb.core/store/ColumnStore.scala b/core/src/main/scala/filodb.core/store/ColumnStore.scala index 9104f5c07..1dfdaa152 100644 --- a/core/src/main/scala/filodb.core/store/ColumnStore.scala +++ b/core/src/main/scala/filodb.core/store/ColumnStore.scala @@ -2,7 +2,6 @@ package filodb.core.store import com.typesafe.scalalogging.StrictLogging -import filodb.core._ import filodb.core.binaryrecord.{BinaryRecord, BinaryRecordWrapper} import filodb.core.metadata.Dataset import filodb.core.query._ @@ -32,10 +31,6 @@ final case class RowKeyChunkScan(firstBinKey: BinaryRecordWrapper, def startkey: BinaryRecord = firstBinKey.binRec def endkey: BinaryRecord = lastBinKey.binRec } -final case class SingleChunkScan(firstBinKey: BinaryRecordWrapper, - chunkId: Types.ChunkID) extends ChunkScanMethod { - def startkey: BinaryRecord = firstBinKey.binRec -} case object LastSampleChunkScan extends ChunkScanMethod object RowKeyChunkScan { diff --git a/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala b/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala index cd576c134..bd4973f27 100644 --- a/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala +++ b/core/src/test/scala/filodb.core/memstore/TimeSeriesMemStoreSpec.scala @@ -228,6 +228,7 @@ class TimeSeriesMemStoreSpec extends FunSpec with Matchers with BeforeAndAfter w // Try reading - should be able to read optimized chunks too val splits = memStore.getScanSplits(dataset1.ref, 1) val query = QuerySpec("min", AggregationFunction.Sum) + val agg1 = memStore.aggregate(dataset1, query, FilteredPartitionScan(splits.head)) .get.runAsync.futureValue agg1.result should equal (Array((1 to 100).map(_.toDouble).sum)) diff --git a/core/src/test/scala/filodb.core/memstore/TimeSeriesPartitionSpec.scala b/core/src/test/scala/filodb.core/memstore/TimeSeriesPartitionSpec.scala index c4a3fd869..2acd750ff 100644 --- a/core/src/test/scala/filodb.core/memstore/TimeSeriesPartitionSpec.scala +++ b/core/src/test/scala/filodb.core/memstore/TimeSeriesPartitionSpec.scala @@ -51,13 +51,21 @@ class TimeSeriesPartitionSpec extends FunSpec with Matchers with BeforeAndAfter // First 10 rows ingested. Now flush in a separate Future while ingesting the remaining row part.switchBuffers() - bufferPool.poolSize shouldEqual (origPoolSize - 1) + // After switchBuffers, currentChunks should be null, pool size the same (nothing new allocated yet) + bufferPool.poolSize shouldEqual origPoolSize + part.latestChunkLen shouldEqual 0 + + // Before flush happens, should be able to read all chunks + part.numChunks shouldEqual 1 + val chunks1 = part.readers(AllChunkScan, Array(1)).map(_.vectors(0).toSeq).toBuffer + chunks1 shouldEqual Seq(minData take 10) + val blockHolder = new BlockHolder(blockStore) val flushFut = Future(part.makeFlushChunks(blockHolder)) data.drop(10).zipWithIndex.foreach { case (r, i) => part.ingest(r, 1100L + i) } val chunkSetOpt = flushFut.futureValue - // After flush, the old writebuffers should be returned to pool + // After flush, the old writebuffers should be returned to pool, but new one allocated for ingesting bufferPool.poolSize shouldEqual origPoolSize // there should be a frozen chunk of 10 records plus 1 record in currently appending chunks @@ -83,13 +91,13 @@ class TimeSeriesPartitionSpec extends FunSpec with Matchers with BeforeAndAfter // First 10 rows ingested. Now flush in a separate Future while ingesting 6 more rows part.switchBuffers() - bufferPool.poolSize shouldEqual (origPoolSize - 1) + bufferPool.poolSize shouldEqual origPoolSize // current chunks become null, no new allocation yet val blockHolder = new BlockHolder(blockStore) val flushFut = Future(part.makeFlushChunks(blockHolder)) data.drop(10).take(6).zipWithIndex.foreach { case (r, i) => part.ingest(r, 1100L + i) } val chunkSetOpt = flushFut.futureValue - // After flush, the old writebuffers should be returned to pool + // After flush, the old writebuffers should be returned to pool, but new one allocated too bufferPool.poolSize shouldEqual origPoolSize // there should be a frozen chunk of 10 records plus 6 records in currently appending chunks @@ -141,12 +149,12 @@ class TimeSeriesPartitionSpec extends FunSpec with Matchers with BeforeAndAfter part.switchBuffers() val blockHolder = new BlockHolder(blockStore) part.makeFlushChunks(blockHolder).isDefined shouldEqual true - part.numChunks shouldEqual 2 + part.numChunks shouldEqual 1 part.latestChunkLen shouldEqual 0 // Now, switch buffers again without ingesting more data. Clearly there are no rows, no switch, and no flush. part.switchBuffers() - part.numChunks shouldEqual 2 + part.numChunks shouldEqual 1 part.makeFlushChunks(blockHolder) shouldEqual None val minData = data.map(_.getDouble(1)) diff --git a/memory/src/main/scala/filodb.memory/BlockManager.scala b/memory/src/main/scala/filodb.memory/BlockManager.scala index 61ca50476..f6bc26c4f 100644 --- a/memory/src/main/scala/filodb.memory/BlockManager.scala +++ b/memory/src/main/scala/filodb.memory/BlockManager.scala @@ -49,7 +49,6 @@ class MemoryStats(tags: Map[String, String]) { val usedBlocksMetric = Kamon.metrics.gauge("blockstore-used-blocks", tags)(0L) val freeBlocksMetric = Kamon.metrics.gauge("blockstore-free-blocks", tags)(0L) val blocksReclaimedMetric = Kamon.metrics.counter("blockstore-blocks-reclaimed", tags) - val blockUtilizationMetric = Kamon.metrics.histogram("blockstore-block-utilized-bytes", tags) } diff --git a/memory/src/main/scala/filodb.memory/BlockMemFactoryPool.scala b/memory/src/main/scala/filodb.memory/BlockMemFactoryPool.scala new file mode 100644 index 000000000..eb5d581d6 --- /dev/null +++ b/memory/src/main/scala/filodb.memory/BlockMemFactoryPool.scala @@ -0,0 +1,29 @@ +package filodb.memory + +import com.typesafe.scalalogging.StrictLogging + +/** + * This class allows BlockMemFactory's to be reused so that the blocks can be fully utilized, instead of left stranded + * and half empty. It has a checkout and return semantics. Multiple parallel tasks each do their own + * checkout and return, thus there should be one blockholder outstanding per task. + */ +class BlockMemFactoryPool(blockStore: BlockManager) extends StrictLogging { + private val factoryPool = new collection.mutable.Queue[BlockHolder]() + + def poolSize: Int = factoryPool.length + + def checkout(): BlockHolder = synchronized { + if (factoryPool.nonEmpty) { + logger.debug(s"Checking out BlockMemFactory from pool. poolSize=$poolSize") + factoryPool.dequeue + } else { + logger.debug(s"Nothing in BlockMemFactory pool. Creating a new one") + new BlockHolder(blockStore) + } + } + + def release(factory: BlockHolder): Unit = synchronized { + logger.debug(s"Returning factory $factory to the pool. New size ${poolSize + 1}") + factoryPool += factory + } +} \ No newline at end of file diff --git a/memory/src/main/scala/filodb.memory/MemFactory.scala b/memory/src/main/scala/filodb.memory/MemFactory.scala index a0eff235c..827218679 100644 --- a/memory/src/main/scala/filodb.memory/MemFactory.scala +++ b/memory/src/main/scala/filodb.memory/MemFactory.scala @@ -174,9 +174,6 @@ class BlockHolder(blockStore: BlockManager) extends MemFactory with StrictLoggin protected def ensureCapacity(forSize: Long): Block = { if (!currentBlock.get().hasCapacity(forSize)) { - val currentBlockRemaining = currentBlock.get().remaining() - val currentBlockCapacity = currentBlock.get().capacity - blockStore.stats().blockUtilizationMetric.record(currentBlockCapacity - currentBlockRemaining) currentBlock.set(blockStore.requestBlock().get) blockGroup += currentBlock.get() }