Skip to content

Commit

Permalink
feat(memory, core): Allocate WriteBuffers lazily and reuse BlockHolde…
Browse files Browse the repository at this point in the history
…rs during flushes (#151)

* fix(core): Allocate WriteBuffers in TSPartition only when new data arrives.  Saves memory for stale partitions.
* feat(memory,core): BlockMemFactoryPool: reuse blocks between flush tasks to maximize memory usage
* Get rid of memory block utilization metric
* Refactor TimeSeriesPartition data structures to one main SkipListMap.  Much simpler, less memory and easier to reason about
* Fix race condition in IngestionActor;  get rid of SingleChunkScan
  • Loading branch information
Evan Chan authored and Evan Chan committed Mar 3, 2018
1 parent 202711a commit a80c827
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,6 @@ class CassandraPartition(index: ChunkIDPartitionChunkIndex,
val (rangeQuery, infosSkips) = method match {
case AllChunkScan => (true, index.allChunks.toSeq)
case RowKeyChunkScan(k1, k2) => (false, index.rowKeyRange(k1.binRec, k2.binRec).toSeq)
case SingleChunkScan(key, id) => (false, index.singleChunk(key.binRec, id).toSeq)
case LastSampleChunkScan => (false, index.latestN(1).toSeq)
}
logger.debug(s"Reading chunks from columns ${columnIds.toList}, ${index.binPartition}, method $method")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,25 @@ class CassandraBackedTimeSeriesPartitionSpec extends TimeSeriesPartitionSpec wit
val start: BinaryRecord = BinaryRecord(dataset1, Seq(now))
val end: BinaryRecord = BinaryRecord(dataset1, Seq(now + 20000 - 100)) // query for 2 chunks
val scan = RowKeyChunkScan(start, end)
val colIds = Array(0,1)
val colIds = Array(0, 1)
val readers1 = part.readers(scan, colIds).toList
// we should see 20 rows in two chunks, then add one for the latest unencoded chunk
readers1.size shouldBe 3
// we should see 20 rows in two chunks. No data ingested means no chunks for write buffer
readers1.size shouldBe 2
readers1.map(_.rowIterator().size).sum shouldEqual 20
val readTuples = readers1.flatMap(_.rowIterator().map(r => (r.getLong(0), r.getDouble(1))))
val ingestedTuples = data.take(20).toList.map(r => (r.getLong(0), r.getDouble(1)))
readTuples shouldEqual ingestedTuples

// now query all chunks
val readers2 = part.readers(AllChunkScan, colIds).toList
// we should see at least 4 chunks, each with 10 rows
readers2.size should be > 4
// we should see exactly 4 chunks, each with 10 rows (since we didn't ingest new data)
readers2.size shouldEqual 4
readers2.map(_.rowIterator().size).sum shouldEqual 40 // since it may include rows from earlier runs

// now test streamReaders
val readers3 = part.streamReaders(scan, colIds).toListL.runAsync.futureValue
// we should see 20 rows in two chunks, then add one for the latest unencoded chunk
readers3.size shouldBe 3
// we should see 20 rows in two chunks
readers3.size shouldBe 2
readers3.map(_.rowIterator().size).sum shouldEqual 20

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,19 @@ private[filodb] final class IngestionActor(dataset: Dataset,
private def normalIngestion(shard: Int, offset: Option[Long], startingGroupNo: Int): Unit = {
create(shard, offset) map { ingestionStream =>
val stream = ingestionStream.get
logger.info(s"Starting normal/active ingestion for shard $shard at offset $offset")
clusterActor ! IngestionStarted(dataset.ref, shard, context.parent)

streamSubscriptions(shard) = memStore.ingestStream(dataset.ref, shard, stream, flushStream(startingGroupNo)) {
ex => handleError(dataset.ref, shard, ex)
}
// On completion of the future, send IngestionStopped
// except for noOpSource, which would stop right away, and is used for sending in tons of data
streamSubscriptions(shard).foreach { x =>
// also: small chance for race condition here due to remove call in stop() method
streamSubscriptions.get(shard).map(_.foreach { x =>
if (source != NodeClusterActor.noOpSource) clusterActor ! IngestionStopped(dataset.ref, shard)
ingestionStream.teardown()
}
})
} recover { case NonFatal(t) =>
handleError(dataset.ref, shard, t)
}
Expand Down
168 changes: 71 additions & 97 deletions core/src/main/scala/filodb.core/memstore/TimeSeriesPartition.scala
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
package filodb.core.memstore

import java.lang.management.ManagementFactory
import java.util.concurrent.ConcurrentSkipListMap

import scala.concurrent.duration._

import com.typesafe.scalalogging.StrictLogging
import monix.eval.Task
import monix.reactive.Observable
import org.jctools.maps.NonBlockingHashMapLong
import scalaxy.loops._

import filodb.core.Iterators
import filodb.core.Types._
import filodb.core.binaryrecord.BinaryRecord
import filodb.core.metadata.Dataset
import filodb.core.query.{ChunkIDPartitionChunkIndex, ChunkSetReader}
import filodb.core.query.ChunkSetReader
import filodb.core.store._
import filodb.memory.{BlockHolder, ReclaimListener}
import filodb.memory.format._

final case class InfoChunks(info: ChunkSetInfo, chunks: Array[BinaryVector[_]])

/**
* A MemStore Partition holding chunks of data for different columns (a schema) for time series use cases.
Expand Down Expand Up @@ -50,39 +51,27 @@ class TimeSeriesPartition(val dataset: Dataset,
bufferPool: WriteBufferPool,
val shardStats: TimeSeriesShardStats) extends FiloPartition with StrictLogging {
import ChunkSetInfo._
/**
* This is a map from chunkId to the Array of optimized/frozen chunks(BinaryVector) corresponding to that chunkId.
*
* NOTE: private final compiles down to a field in bytecode, faster than method invocation
*/
private final val vectors = new NonBlockingHashMapLong[Array[BinaryVector[_]]](32, false)
import collection.JavaConverters._

/**
* As new chunks are initialized in this partition, their ids are appended to this queue
*/
private final val chunkIDs = new collection.mutable.Queue[ChunkID]
private val nullAppenders = UnsafeUtils.ZeroPointer.asInstanceOf[Array[RowReaderAppender]]
private val nullChunks = UnsafeUtils.ZeroPointer.asInstanceOf[Array[BinaryAppendableVector[_]]]

/**
* This is the index enabling queries to be done on the ingested data. Allows for
* query by chunkId, rowKey etc. See [[filodb.core.memstore.TimeSeriesPartition#readers]]
* for how this is done.
*
* This only holds immutable, finished chunks.
*
*/
private final val index = new ChunkIDPartitionChunkIndex(binPartition, dataset)

// Set initial size to a fraction of the max chunk size, so that partitions with sparse amount of data
// will not cause too much memory bloat. GrowableVector allows vectors to grow, so this should be OK
private val (initAppenders, initCurChunks) = bufferPool.obtain()
/**
* This is the ONE main data structure holding the chunks of a TSPartition. It is appended to as chunks arrive
* and are encoded. The last or most recent item should be the write buffer chunks.
* Key(ChunkID) -> Value InfoChunks(ChunkSetInfo, Array[BinaryVector])
* Thus it is sorted by increasing chunkID, yet addressable by chunkID, and concurrent.
*/
private var infosChunks = new ConcurrentSkipListMap[ChunkID, InfoChunks].asScala

/**
* Ingested data goes into this appender. There is one appender for each column in the dataset.
* Var mutates when buffers are switched for optimization. During switching of buffers
* in [[filodb.core.memstore.TimeSeriesPartition#switchBuffers]], current var
* value is assigned to flushingAppenders, and new appender that is added to the partition is assigned to this var.
* value is assigned to flushingAppenders, and null appenders is assigned until more data arrives.
*/
private var appenders = initAppenders
private var appenders = nullAppenders

/**
* This is essentially the chunks (binaryVectors) associated with 'appenders' member of this class. This var will
Expand All @@ -92,17 +81,17 @@ class TimeSeriesPartition(val dataset: Dataset,
* in [[filodb.core.memstore.TimeSeriesPartition#switchBuffers]],
* and new chunk is added to the partition.
*/
private var currentChunks = initCurChunks
private var currentChunks = nullChunks

/**
* This var holds the next appender to be optimized and persisted.
* Mutates when buffers are switched for optimization in [[filodb.core.memstore.TimeSeriesPartition#switchBuffers]].
* There is one element for each column of the dataset.
* Initialized to ZeroPointer since at the beginning nothing is ready to be flushed
* At the beginning nothing is ready to be flushed, so set to null
*/
private var flushingAppenders = UnsafeUtils.ZeroPointer.asInstanceOf[Array[RowReaderAppender]]
private var flushingAppenders = nullAppenders

/**
/**
* Holds the id of the chunks in flushingAppender that should be flushed next.
* Mutates when buffers are switched for optimization in [[filodb.core.memstore.TimeSeriesPartition#switchBuffers]].
*
Expand All @@ -114,7 +103,7 @@ class TimeSeriesPartition(val dataset: Dataset,
/**
* Number of columns in the dataset
*/
private final val numColumns = appenders.size
private final val numColumns = dataset.dataColumns.length


private val jvmStartTime = ManagementFactory.getRuntimeMXBean.getStartTime
Expand All @@ -125,39 +114,33 @@ class TimeSeriesPartition(val dataset: Dataset,

private var partitionLoadedFromPersistentStore = false

// Not used for now
// private final val partitionVectors = new Array[FiloVector[_]](dataset.partitionColumns.length)

// Add initial write buffers as the first chunkSet/chunkID
initNewChunk()

/**
* Ingests a new row, adding it to currentChunks.
* Note that it is the responsibility of flush() to ensure the right currentChunks is allocated.
*/
def ingest(row: RowReader, offset: Long): Unit = {
if (appenders == nullAppenders) initNewChunk()
for { col <- 0 until numColumns optimized } {
appenders(col).append(row)
}
}

/**
* Atomically switches the writeBuffers/appenders to a new empty one.
* Atomically switches the writeBuffers/appenders to a null one. If and when we get more data, then
* we will initialize the appenders to new ones. This way dead partitions not getting more data will not
* waste empty appenders.
* The old writeBuffers/chunks becomes flushingAppenders.
* In theory this may be called from another thread from ingest(), but then ingest() may continue to write
* to the old flushingAppenders buffers until the concurrent ingest() finishes.
* To guarantee no more writes happen when switchBuffers is called, have ingest() and switchBuffers() be
* called from a single thread / single synchronous stream.
*/
def switchBuffers(): Unit = if (currentChunks(0).length > 0) {
// Get new write buffers from pool
def switchBuffers(): Unit = if (latestChunkLen > 0) {
flushingAppenders = appenders
flushingChunkID = chunkIDs.last
val newAppendersAndChunks = bufferPool.obtain()
// Right after this all ingest() calls will append to new chunks
appenders = newAppendersAndChunks._1
currentChunks = newAppendersAndChunks._2
initNewChunk() // At this point the new buffers can be read from
flushingChunkID = infosChunks.keys.last
// Right after this all ingest() calls will check and potentially append to new chunks
appenders = nullAppenders
currentChunks = nullChunks
}

/**
Expand All @@ -183,7 +166,7 @@ class TimeSeriesPartition(val dataset: Dataset,
//This assumption cannot break. We should ensure one vector can be written
//to one block always atleast as per the current design. We want a flush group
//to typically end in atmost 2 blocks.
//TODO Check if this is alright
//TODO: Break up chunks longer than blockAllocationSize into smaller ones
assert(blockHolder.blockAllocationSize() > appender.appender.frozenSize)
val optimized = appender.appender.optimize(blockHolder)
shardStats.encodedBytes.increment(optimized.numBytes)
Expand All @@ -193,29 +176,28 @@ class TimeSeriesPartition(val dataset: Dataset,
shardStats.numSamplesEncoded.increment(numSamples)
shardStats.numChunksEncoded.increment(frozenVectors.length)

// replace appendableVectors reference in vectors hash with compacted, immutable chunks
vectors.put(flushingChunkID, frozenVectors)

// release older appenders back to pool. Nothing at this point should reference the older appenders.
bufferPool.release(flushingAppenders)
flushingAppenders = UnsafeUtils.ZeroPointer.asInstanceOf[Array[RowReaderAppender]]

// Create ChunkSetInfo
val reader = new FastFiloRowReader(frozenVectors.asInstanceOf[Array[FiloVector[_]]])
reader.setRowNo(0)
val firstRowKey = dataset.rowKey(reader)
reader.setRowNo(numSamples - 1)
val lastRowKey = dataset.rowKey(reader)
val chunkInfo = ChunkSetInfo(flushingChunkID, numSamples, firstRowKey, lastRowKey)
index.add(chunkInfo, Nil)
val chunkInfo = infosChunks(flushingChunkID).info.copy(numRows = numSamples,
firstKey = firstRowKey,
lastKey = lastRowKey)
// replace write buffers / vectors with compacted, immutable chunks
infosChunks.put(flushingChunkID, InfoChunks(chunkInfo, frozenVectors))

// release older appenders back to pool. Nothing at this point should reference the older appenders.
bufferPool.release(flushingAppenders)
flushingAppenders = nullAppenders

blockHolder.endPartition(new ReclaimListener {
//It is very likely that a flushChunk ends only in one block. At worst in may end up in a couple.
//So a blockGroup contains atmost 2. When any one Block in the flushGroup is evicted the flushChunk is removed.
//So if and when a second block gets reclaimed this is a no-op
override def onReclaim(): Unit = {
vectors.remove(chunkInfo.id)
index.remove(chunkInfo.id)
infosChunks.remove(chunkInfo.id)
shardStats.chunkIdsEvicted.increment()
}
})
Expand All @@ -225,30 +207,11 @@ class TimeSeriesPartition(val dataset: Dataset,
}
}

def latestN(n: Int): InfosSkipsIt =
if (latestChunkLen > 0) { latestChunkIt ++ index.latestN(n - 1) }
else { index.latestN(n) }

def numChunks: Int = chunkIDs.size
def latestChunkLen: Int = currentChunks(0).length

/**
* Gets the most recent n ChunkSetInfos and skipMaps (which will be empty)
*/
def newestChunkIds(n: Int): InfosSkipsIt = {
val latest = latestChunkIt
val numToTake = if (latest.isEmpty) n else (n - 1)
index.latestN(numToTake) ++ latest
}

private def latestChunkInfo: ChunkSetInfo =
ChunkSetInfo(chunkIDs.last, latestChunkLen, BinaryRecord.empty, BinaryRecord.empty)

private def latestChunkIt: InfosSkipsIt = Iterator.single((latestChunkInfo, emptySkips))
def numChunks: Int = infosChunks.size
def latestChunkLen: Int = if (currentChunks != nullChunks) currentChunks(0).length else 0

private def getVectors(columnIds: Array[Int],
vectors: Array[BinaryVector[_]],
vectLength: Int): Array[FiloVector[_]] = {
vectors: Array[BinaryVector[_]]): Array[FiloVector[_]] = {
val finalVectors = new Array[FiloVector[_]](columnIds.size)
for { i <- 0 until columnIds.size optimized } {
finalVectors(i) = if (Dataset.isPartitionID(columnIds(i))) { constPartitionVector(columnIds(i)) }
Expand All @@ -258,36 +221,48 @@ class TimeSeriesPartition(val dataset: Dataset,
}

private def readersFromMemory(method: ChunkScanMethod, columnIds: Array[Int]): Iterator[ChunkSetReader] = {
val infosSkips = method match {
case AllChunkScan => index.allChunks ++ latestChunkIt
val infosVects = method match {
case AllChunkScan =>
infosChunks.values.toIterator
// To derive time range: r.startkey.getLong(0) -> r.endkey.getLong(0)
case r: RowKeyChunkScan => index.rowKeyRange(r.startkey, r.endkey) ++ latestChunkIt
case r @ SingleChunkScan(_, id) => index.singleChunk(r.startkey, id)
case LastSampleChunkScan => latestN(1)
case r: RowKeyChunkScan =>
infosChunks.values.toIterator.filter { ic =>
ic.info.firstKey.isEmpty || // empty key = most recent chunk, get a free pass
ic.info.intersection(r.startkey, r.endkey).isDefined
}
case LastSampleChunkScan =>
if (infosChunks.isEmpty) Iterator.empty else Iterator.single(infosChunks.values.last)
}

infosSkips.map { case (info, skips) =>
val vectArray = vectors.get(info.id)
infosVects.map { case InfoChunks(info, vectArray) =>
//scalastyle:off
require(vectArray != null,
s"INCONSISTENCY! vectArray is null, ${info.id} does not exist in vectors but $info is in index")
require(vectArray != null, "INCONSISTENCY! vectArray is null!")
//scalastyle:on
val chunkset = getVectors(columnIds, vectArray, info.numRows)
val realInfo = info match {
// First chunk, fill in real chunk length. TODO: also fill in current first & last key
case i @ ChunkSetInfo(_, -1, _, _) => i.copy(numRows = latestChunkLen)
case _: Any => info
}
val chunkset = getVectors(columnIds, vectArray)
shardStats.numChunksQueried.increment(chunkset.length)
new ChunkSetReader(info, skips, chunkset)
new ChunkSetReader(realInfo, emptySkips, chunkset)
}
}

def lastVectors: Array[FiloVector[_]] = currentChunks.asInstanceOf[Array[FiloVector[_]]]

/**
* Initializes vectors, chunkIDs for a new chunkset/chunkID.
* This is called once every chunk-duration for each group, when the buffers are switched for that group
* This is called after switchBuffers() upon the first data that arrives.
* Also compacts infosChunks tombstones.
*/
private def initNewChunk(): Unit = {
val newChunkID = timeUUID64
vectors.put(newChunkID, currentChunks.asInstanceOf[Array[BinaryVector[_]]])
chunkIDs += newChunkID
val (newAppenders, newCurChunks) = bufferPool.obtain()
appenders = newAppenders
currentChunks = newCurChunks
val newChunkId = timeUUID64
val newInfo = ChunkSetInfo(newChunkId, -1, BinaryRecord.empty, BinaryRecord.empty)
infosChunks.put(newChunkId, InfoChunks(newInfo, currentChunks.asInstanceOf[Array[BinaryVector[_]]]))
}

/**
Expand Down Expand Up @@ -345,8 +320,7 @@ class TimeSeriesPartition(val dataset: Dataset,
.flatMap(_.streamReaders(AllChunkScan, dataset.dataColumns.map(_.id).toArray)) // all chunks, all columns
readers.map { r =>
// TODO r.vectors is on-heap. It should be copied to the offheap store before adding to index.
vectors.put(r.info.id, r.vectors.map(_.asInstanceOf[BinaryVector[_]]))
index.add(r.info, Nil)
infosChunks.put(r.info.id, InfoChunks(r.info, r.vectors.map(_.asInstanceOf[BinaryVector[_]])))
}.countL.map { count =>
shardStats.partitionsPagedFromColStore.increment()
shardStats.chunkIdsPagedFromColStore.increment(count)
Expand Down
Loading

0 comments on commit a80c827

Please sign in to comment.