From 5aa7797476f13e7a31fb66b0fbd99d86e2efad53 Mon Sep 17 00:00:00 2001 From: Alex Theimer Date: Thu, 7 Apr 2022 23:57:10 -0700 Subject: [PATCH] TEMP-- adapt existing test/code for no-shard-key case --- .../columnstore/CassandraColumnStore.scala | 30 ++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala b/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala index f7eb299e4..185701ae5 100644 --- a/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala +++ b/cassandra/src/main/scala/filodb.cassandra/columnstore/CassandraColumnStore.scala @@ -3,11 +3,9 @@ package filodb.cassandra.columnstore import java.net.InetSocketAddress import java.nio.ByteBuffer import java.util.concurrent.TimeUnit - import scala.collection.mutable.ArrayBuffer import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ - import com.datastax.driver.core.{ConsistencyLevel, Metadata, Session, TokenRange} import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap import com.typesafe.config.Config @@ -17,7 +15,6 @@ import kamon.metric.MeasurementUnit import monix.eval.Task import monix.execution.Scheduler import monix.reactive.Observable - import filodb.cassandra.FiloCassandraConnector import filodb.core._ import filodb.core.binaryrecord2.RecordSchema @@ -25,6 +22,7 @@ import filodb.core.metadata.Schemas import filodb.core.store._ import filodb.memory.BinaryRegionLarge import filodb.memory.format.UnsafeUtils +import filodb.core.ErrorResponse /** * Implementation of a column store using Apache Cassandra tables. @@ -460,7 +458,15 @@ extends ColumnStore with CassandraChunkSource with StrictLogging { // TODO(a_theimer): should be private, but used in test-- generalize and move to util class? def shardKeyFromPartKey(partKey: Array[Byte], schemas: Schemas): Array[Byte] = { - val nonMetricShardKeyCols = schemas(RecordSchema.schemaID(partKey)).options.nonMetricShardColumns + val schemaId = RecordSchema.schemaID(partKey) + if (schemaId == -1) { + return Array.emptyByteArray + } + val schema = schemas(schemaId) + if (schema == null) { + return Array.emptyByteArray + } + val nonMetricShardKeyCols = schema.options.nonMetricShardColumns schemas.part.binSchema.colValues(partKey, UnsafeUtils.arayOffset, nonMetricShardKeyCols) .flatMap(_.getBytes).toArray // TODO(a_theimer): this is wildly inefficient } @@ -485,7 +491,11 @@ extends ColumnStore with CassandraChunkSource with StrictLogging { } val writeSkToPkFut = { val shardKey = shardKeyFromPartKey(pk.partKey, schemas) - skToPkTable.addMapping(shardKey, pk.partKey, diskTTLSeconds) + if (shardKey.nonEmpty) { + skToPkTable.addMapping(shardKey, pk.partKey, diskTTLSeconds) + } else { + Future(Success) + } } Task.fromFuture(Future.sequence(Seq(writePkFut, writeSkToPkFut))).map{ respSeq => sinkStats.partKeysWrite(1) @@ -520,8 +530,14 @@ extends ColumnStore with CassandraChunkSource with StrictLogging { val pkTable = getOrCreatePartitionKeysTable(ref, shard) val skToPkTable = getOrCreateShardKeyToPartKeyTable(ref, shard) val shardKey = shardKeyFromPartKey(pk, schemas) - pkTable.deletePartKeyNoAsync(pk) - skToPkTable.deleteMappingNoAsync(shardKey, pk) + // TODO(a_theimer): literally anything better than this + var resp = pkTable.deletePartKeyNoAsync(pk) + if (shardKey.nonEmpty) { + skToPkTable.deleteMappingNoAsync(shardKey, pk) match { + case er: ErrorResponse => resp = er + } + } + resp } def getPartKeysByUpdateHour(ref: DatasetRef,