Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support optional filtering of null values from RAW #986

Merged
merged 10 commits into from
Nov 14, 2024
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ val circeVersion = "0.14.9"
val sttpVersion = "3.5.2"
val natchezVersion = "0.3.1"
val Specs2Version = "4.20.3"
val cogniteSdkVersion = "2.30.860"
val cogniteSdkVersion = "2.31.861"

val prometheusVersion = "0.16.0"
val log4sVersion = "1.10.0"
Expand Down Expand Up @@ -41,7 +41,7 @@ lazy val commonSettings = Seq(
organization := "com.cognite.spark.datasource",
organizationName := "Cognite",
organizationHomepage := Some(url("https://cognite.com")),
version := "3.20." + patchVersion,
version := "3.21." + patchVersion,
isSnapshot := patchVersion.endsWith("-SNAPSHOT"),
crossScalaVersions := supportedScalaVersions,
semanticdbEnabled := true,
Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/cognite/spark/v1/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,9 @@ object DefaultSource {
rawEnsureParent = toBoolean(parameters, "rawEnsureParent", defaultValue = true),
enableSinglePartitionDeleteAssetHierarchy = enableSinglePartitionDeleteAssetHierarchy,
tracingParent = extractTracingHeadersKernel(parameters),
useSharedThrottle = toBoolean(parameters, "useSharedThrottle", defaultValue = false)
useSharedThrottle = toBoolean(parameters, "useSharedThrottle", defaultValue = false),
filterNullsOnNonSchemaQueries =
toBoolean(parameters, "filterNullsOnNonSchemaQueries", defaultValue = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it should be raw-specific like rawOptimizeNullTransfer or something? May depend on whether is is purely an optimization or can be visible in the results. If can be visible then mostly only applies to dynamic schemas like raw&filecontent (less so for datamodels?) - if we want to expose pruned json outputs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another option is name it experimentalFilterRawNulls so that we don't apply it by default yet and can revisit naming&semantic later after experiments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is very raw specific, so naming it something in that nature makes sense! I'll find a new name for it.

It will not be visible for the users (the dataframe is the same). So at some point we might just make it default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added the new name to README.md too, and specified that it will probably become default in the future.

)
}

Expand Down
20 changes: 15 additions & 5 deletions src/main/scala/cognite/spark/v1/RawTableRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class RawTableRelation(
filter = RawRowFilter(),
requestedKeys = None,
schema = None,
filterNulls = false,
collectMetrics = collectSchemaInferenceMetrics,
collectTestMetrics = false
)
Expand All @@ -83,15 +84,17 @@ class RawTableRelation(
}
}

private def getStreams(filter: RawRowFilter, cursors: Vector[String])(
private def getStreams(filter: RawRowFilter, filterNullFields: Boolean, cursors: Vector[String])(
limit: Option[Int],
numPartitions: Int)(client: GenericClient[IO]): Seq[Stream[IO, RawRow]] = {
assert(numPartitions == cursors.length)
val rawClient = client.rawRows(database, table)
val rawClient = client.rawRows(database, table, filterNullFields)
cursors.map(rawClient.filterOnePartition(filter, _, limit))
}

private def getStreamByKeys(client: GenericClient[IO], keys: Set[String]): Stream[IO, RawRow] = {
// Note that retrieveByKey does not currently support filtering out null fields. When/If that is
// added, we should also pass in the flag to filter out those here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

column filtering param is also missing for it, at least in the docs https://api-docs.cognite.com/20230101/tag/Raw/operation/getRow

val rawClient = client.rawRows(database, table)
Stream
.emits(keys.toSeq)
Expand Down Expand Up @@ -127,6 +130,7 @@ class RawTableRelation(
filter: RawRowFilter,
requestedKeys: Option[Set[String]],
schema: Option[StructType],
filterNulls: Boolean,
collectMetrics: Boolean = config.collectMetrics,
collectTestMetrics: Boolean = config.collectTestMetrics): RDD[Row] = {
val configWithLimit =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rowConverter 2 lines below does the dataframe conversion and has inference/noinference cases. For noinference it will be

  def untypedRowConverter(row: RawRow): Row =
    new GenericRowWithSchema(
      Array(
        row.key,
        row.lastUpdatedTime.map(java.sql.Timestamp.from).orNull,
        JsonObject(row.columns.toSeq: _*).asJson.noSpaces
      ),
      RawTableRelation.defaultSchema
    )

the question is whether JsonObject(row.columns.toSeq: _*).asJson was also dropping nulls or was keeping them (or if anything else dropped them earlier). If they were preserved up to here before then it will be observable disappearance under new parameter, we should probably find a way to keep the old visible behavior by default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a test that verifies that the old behavior is still there if we don't enable it. If filtering is enabled, then it will be filtered, though, but that is kind of the point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also added one to verify that setting it to false and not set it is the same. That should cover most bases.

Expand All @@ -142,11 +146,11 @@ class RawTableRelation(
val partitionCursors =
CdpConnector
.clientFromConfig(config)
.rawRows(database, table)
.rawRows(database, table, filterNulls)
.getPartitionCursors(filter, configWithLimit.partitions)
.unsafeRunSync()
.toVector
getStreams(filter, partitionCursors)(
getStreams(filter, filterNulls, partitionCursors)(
configWithLimit.limitPerPartition,
configWithLimit.partitions)
}
Expand Down Expand Up @@ -203,7 +207,13 @@ class RawTableRelation(
}

val rdd =
readRows(config.limitPerPartition, None, rawRowFilter, requestedKeys, jsonSchema)
readRows(
config.limitPerPartition,
None,
rawRowFilter,
requestedKeys,
jsonSchema,
config.filterNullsOnNonSchemaQueries)

rdd.map(row => {
val filteredCols = requiredColumns.map(colName => row.get(schema.fieldIndex(colName)))
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/cognite/spark/v1/RelationConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ final case class RelationConfig(
enableSinglePartitionDeleteAssetHierarchy: Boolean, // flag to test whether single partition helps avoid NPE in asset hierarchy builder
tracingParent: Kernel,
initialRetryDelayMillis: Int,
useSharedThrottle: Boolean
useSharedThrottle: Boolean,
filterNullsOnNonSchemaQueries: Boolean,
) {

/** Desired number of Spark partitions ~= partitions / parallelismPerPartition */
Expand Down
55 changes: 53 additions & 2 deletions src/test/scala/cognite/spark/v1/RawTableRelationTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import cats.implicits._
import cognite.spark.v1.CdpConnector.ioRuntime
import com.cognite.sdk.scala.common.CdpApiException
import com.cognite.sdk.scala.v1.{RawDatabase, RawRow, RawTable}
import io.circe.Json
import io.circe.{Json, JsonObject}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -117,6 +117,16 @@ class RawTableRelationTest
RawRow("k2", Map("bool" -> Json.fromBoolean(java.lang.Boolean.parseBoolean("true")))),
RawRow("k3", Map("bool" -> Json.fromBoolean(false)))
)
private val dataWithNullFieldValue = Seq(
RawRow("k1", Map("toBeFiltered" -> Json.Null)),
RawRow("k2", Map("toBeFiltered" -> Json.Null, "notFiltered" -> Json.fromString("string"))),
RawRow("k3", Map("toBeFiltered" -> Json.fromString("but not here"), "notFiltered" -> Json.fromString("string2")))
)
private val dataWithEmptyColumn = Seq(
RawRow("k1", Map("toBeFiltered" -> Json.Null, "notFiltered" -> Json.fromString("string_k1"))),
RawRow("k2", Map("toBeFiltered" -> Json.Null, "notFiltered" -> Json.fromString("string_k2"))),
RawRow("k3", Map("toBeFiltered" -> Json.Null, "notFiltered" -> Json.fromString("string_k3"))),
)

case class TestTable(name: String, data: Seq[RawRow])
case class TestData(dbName: String, tables: Seq[TestTable])
Expand All @@ -137,6 +147,8 @@ class RawTableRelationTest
TestTable("with-long-empty-str", dataWithEmptyStringInLongField),
TestTable("with-number-empty-str", dataWithEmptyStringInDoubleField),
TestTable("with-boolean-empty-str", dataWithEmptyStringInBooleanField),
TestTable("with-some-null-values", dataWithNullFieldValue),
TestTable("with-only-null-values-for-field", dataWithEmptyColumn),
TestTable("cryptoAssets", (1 to 500).map(i =>
RawRow(i.toString, Map("i" -> Json.fromString("exist")))
)),
Expand Down Expand Up @@ -237,7 +249,8 @@ class RawTableRelationTest
table: String,
database: String = "spark-test-database",
inferSchema: Boolean = true,
metricsPrefix: Option[String] = None): DataFrame = {
metricsPrefix: Option[String] = None,
filterNullFields: Option[Boolean] = None): DataFrame = {
val df = spark.read
.format(DefaultSource.sparkFormatString)
.useOIDCWrite
Expand All @@ -248,6 +261,8 @@ class RawTableRelationTest
.option("inferSchema", inferSchema)
.option("inferSchemaLimit", "100")

filterNullFields.foreach(v => df.option("filterNullsOnNonSchemaQueries", v.toString))

metricsPrefix match {
case Some(prefix) =>
df.option("collectMetrics", "true")
Expand Down Expand Up @@ -894,4 +909,40 @@ class RawTableRelationTest
err.getMessage shouldBe "Error while loading RAW row [key='k'] in column 'value': java.lang.NumberFormatException: For input string: \"test\""

}

it should "filter out fields with null value but not impact schema inference" in {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also add test for no-null-filtering case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As if it isn't enabled, it should behave as before? Sure, can do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think what I actually wanted is no-inference case with null filtering and without, both outcomes of there being a diff and no diff will be interesting.

For inference case not a lot to see looking at resulting dataframe, but otoh extra small test never hurts

val tableName = "with-some-null-values"
val df = rawRead(table = tableName, database = testData.dbName, inferSchema = true, filterNullFields = Some(true))
df.count() shouldBe 3
df.schema.fieldNames.toSet shouldBe Set("key", "lastUpdatedTime", "notFiltered", "toBeFiltered")
val items = RawJsonConverter.rowsToRawItems(df.columns, "key", df.collect().toSeq).map(r => (r.key, r.columns)).toMap
items("k1")("toBeFiltered") shouldBe Json.Null
items("k2")("toBeFiltered") shouldBe Json.Null
items("k2")("notFiltered") shouldBe Json.fromString("string")
items("k3")("toBeFiltered") shouldBe Json.fromString("but not here")
items("k3")("notFiltered") shouldBe Json.fromString("string2")
}

it should "filter out columns completely when not inferring schema (confirming it filters from RAW)" in {
val tableName = "with-some-null-values"
val df = rawRead(table = tableName, database = testData.dbName, inferSchema = false, filterNullFields = Some(true))
df.count() shouldBe 3
val items = RawJsonConverter.rowsToRawItems(df.columns, "key", df.collect().toSeq).map(r => (r.key, r.columns)).toMap
items("k1")("columns") shouldBe Json.fromString("{}")
items("k2")("columns") shouldBe Json.fromString("{\"notFiltered\":\"string\"}")
val columnsParsed: Option[JsonObject] = io.circe.parser.parse(items("k3")("columns").asString.get) match {
case Right(json) => json.asObject
case Left(error) => throw error
}
columnsParsed.get("notFiltered") shouldBe Some(Json.fromString("string2"))
columnsParsed.get("toBeFiltered") shouldBe Some(Json.fromString("but not here"))
}

it should "return column in schema, even if every row has it filtered out" in {
val tableName = "with-only-null-values-for-field"
val df = rawRead(table = tableName, database = testData.dbName, inferSchema = true, filterNullFields = Some(true))
df.count() shouldBe 3
df.schema.fieldNames.toSet shouldBe Set("key", "lastUpdatedTime", "notFiltered", "toBeFiltered")
}

}
3 changes: 2 additions & 1 deletion src/test/scala/cognite/spark/v1/SparkTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ trait SparkTest {
rawEnsureParent = false,
enableSinglePartitionDeleteAssetHierarchy = false,
tracingParent = new Kernel(Map.empty),
useSharedThrottle = false
useSharedThrottle = false,
filterNullsOnNonSchemaQueries = false
)

private def getCounterSafe(metricsNamespace: String, resource: String): Option[Long] = {
Expand Down
Loading