-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support optional filtering of null values from RAW #986
Changes from 1 commit
c5549ef
55688ec
6e52908
d044733
5d18bf3
3c36761
0d5b939
f880973
8504717
c6c8efd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,6 +66,7 @@ class RawTableRelation( | |
filter = RawRowFilter(), | ||
requestedKeys = None, | ||
schema = None, | ||
filterNulls = false, | ||
collectMetrics = collectSchemaInferenceMetrics, | ||
collectTestMetrics = false | ||
) | ||
|
@@ -83,15 +84,17 @@ class RawTableRelation( | |
} | ||
} | ||
|
||
private def getStreams(filter: RawRowFilter, cursors: Vector[String])( | ||
private def getStreams(filter: RawRowFilter, filterNullFields: Boolean, cursors: Vector[String])( | ||
limit: Option[Int], | ||
numPartitions: Int)(client: GenericClient[IO]): Seq[Stream[IO, RawRow]] = { | ||
assert(numPartitions == cursors.length) | ||
val rawClient = client.rawRows(database, table) | ||
val rawClient = client.rawRows(database, table, filterNullFields) | ||
cursors.map(rawClient.filterOnePartition(filter, _, limit)) | ||
} | ||
|
||
private def getStreamByKeys(client: GenericClient[IO], keys: Set[String]): Stream[IO, RawRow] = { | ||
// Note that retrieveByKey does not currently support filtering out null fields. When/If that is | ||
// added, we should also pass in the flag to filter out those here. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. column filtering param is also missing for it, at least in the docs https://api-docs.cognite.com/20230101/tag/Raw/operation/getRow |
||
val rawClient = client.rawRows(database, table) | ||
Stream | ||
.emits(keys.toSeq) | ||
|
@@ -127,6 +130,7 @@ class RawTableRelation( | |
filter: RawRowFilter, | ||
requestedKeys: Option[Set[String]], | ||
schema: Option[StructType], | ||
filterNulls: Boolean, | ||
collectMetrics: Boolean = config.collectMetrics, | ||
collectTestMetrics: Boolean = config.collectTestMetrics): RDD[Row] = { | ||
val configWithLimit = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rowConverter 2 lines below does the dataframe conversion and has inference/noinference cases. For noinference it will be def untypedRowConverter(row: RawRow): Row =
new GenericRowWithSchema(
Array(
row.key,
row.lastUpdatedTime.map(java.sql.Timestamp.from).orNull,
JsonObject(row.columns.toSeq: _*).asJson.noSpaces
),
RawTableRelation.defaultSchema
) the question is whether There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added a test that verifies that the old behavior is still there if we don't enable it. If filtering is enabled, then it will be filtered, though, but that is kind of the point. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also added one to verify that setting it to false and not set it is the same. That should cover most bases. |
||
|
@@ -142,11 +146,11 @@ class RawTableRelation( | |
val partitionCursors = | ||
CdpConnector | ||
.clientFromConfig(config) | ||
.rawRows(database, table) | ||
.rawRows(database, table, filterNulls) | ||
.getPartitionCursors(filter, configWithLimit.partitions) | ||
.unsafeRunSync() | ||
.toVector | ||
getStreams(filter, partitionCursors)( | ||
getStreams(filter, filterNulls, partitionCursors)( | ||
configWithLimit.limitPerPartition, | ||
configWithLimit.partitions) | ||
} | ||
|
@@ -203,7 +207,13 @@ class RawTableRelation( | |
} | ||
|
||
val rdd = | ||
readRows(config.limitPerPartition, None, rawRowFilter, requestedKeys, jsonSchema) | ||
readRows( | ||
config.limitPerPartition, | ||
None, | ||
rawRowFilter, | ||
requestedKeys, | ||
jsonSchema, | ||
config.filterNullsOnNonSchemaQueries) | ||
|
||
rdd.map(row => { | ||
val filteredCols = requiredColumns.map(colName => row.get(schema.fieldIndex(colName))) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ import cats.implicits._ | |
import cognite.spark.v1.CdpConnector.ioRuntime | ||
import com.cognite.sdk.scala.common.CdpApiException | ||
import com.cognite.sdk.scala.v1.{RawDatabase, RawRow, RawTable} | ||
import io.circe.Json | ||
import io.circe.{Json, JsonObject} | ||
import org.apache.spark.sql.{DataFrame, Row} | ||
import org.apache.spark.sql.functions.col | ||
import org.apache.spark.sql.types._ | ||
|
@@ -117,6 +117,16 @@ class RawTableRelationTest | |
RawRow("k2", Map("bool" -> Json.fromBoolean(java.lang.Boolean.parseBoolean("true")))), | ||
RawRow("k3", Map("bool" -> Json.fromBoolean(false))) | ||
) | ||
private val dataWithNullFieldValue = Seq( | ||
RawRow("k1", Map("toBeFiltered" -> Json.Null)), | ||
RawRow("k2", Map("toBeFiltered" -> Json.Null, "notFiltered" -> Json.fromString("string"))), | ||
RawRow("k3", Map("toBeFiltered" -> Json.fromString("but not here"), "notFiltered" -> Json.fromString("string2"))) | ||
) | ||
private val dataWithEmptyColumn = Seq( | ||
RawRow("k1", Map("toBeFiltered" -> Json.Null, "notFiltered" -> Json.fromString("string_k1"))), | ||
RawRow("k2", Map("toBeFiltered" -> Json.Null, "notFiltered" -> Json.fromString("string_k2"))), | ||
RawRow("k3", Map("toBeFiltered" -> Json.Null, "notFiltered" -> Json.fromString("string_k3"))), | ||
) | ||
|
||
case class TestTable(name: String, data: Seq[RawRow]) | ||
case class TestData(dbName: String, tables: Seq[TestTable]) | ||
|
@@ -137,6 +147,8 @@ class RawTableRelationTest | |
TestTable("with-long-empty-str", dataWithEmptyStringInLongField), | ||
TestTable("with-number-empty-str", dataWithEmptyStringInDoubleField), | ||
TestTable("with-boolean-empty-str", dataWithEmptyStringInBooleanField), | ||
TestTable("with-some-null-values", dataWithNullFieldValue), | ||
TestTable("with-only-null-values-for-field", dataWithEmptyColumn), | ||
TestTable("cryptoAssets", (1 to 500).map(i => | ||
RawRow(i.toString, Map("i" -> Json.fromString("exist"))) | ||
)), | ||
|
@@ -237,7 +249,8 @@ class RawTableRelationTest | |
table: String, | ||
database: String = "spark-test-database", | ||
inferSchema: Boolean = true, | ||
metricsPrefix: Option[String] = None): DataFrame = { | ||
metricsPrefix: Option[String] = None, | ||
filterNullFields: Option[Boolean] = None): DataFrame = { | ||
val df = spark.read | ||
.format(DefaultSource.sparkFormatString) | ||
.useOIDCWrite | ||
|
@@ -248,6 +261,8 @@ class RawTableRelationTest | |
.option("inferSchema", inferSchema) | ||
.option("inferSchemaLimit", "100") | ||
|
||
filterNullFields.foreach(v => df.option("filterNullsOnNonSchemaQueries", v.toString)) | ||
|
||
metricsPrefix match { | ||
case Some(prefix) => | ||
df.option("collectMetrics", "true") | ||
|
@@ -894,4 +909,40 @@ class RawTableRelationTest | |
err.getMessage shouldBe "Error while loading RAW row [key='k'] in column 'value': java.lang.NumberFormatException: For input string: \"test\"" | ||
|
||
} | ||
|
||
it should "filter out fields with null value but not impact schema inference" in { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's also add test for no-null-filtering case There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As if it isn't enabled, it should behave as before? Sure, can do that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I think what I actually wanted is no-inference case with null filtering and without, both outcomes of there being a diff and no diff will be interesting. For inference case not a lot to see looking at resulting dataframe, but otoh extra small test never hurts |
||
val tableName = "with-some-null-values" | ||
val df = rawRead(table = tableName, database = testData.dbName, inferSchema = true, filterNullFields = Some(true)) | ||
df.count() shouldBe 3 | ||
df.schema.fieldNames.toSet shouldBe Set("key", "lastUpdatedTime", "notFiltered", "toBeFiltered") | ||
val items = RawJsonConverter.rowsToRawItems(df.columns, "key", df.collect().toSeq).map(r => (r.key, r.columns)).toMap | ||
items("k1")("toBeFiltered") shouldBe Json.Null | ||
items("k2")("toBeFiltered") shouldBe Json.Null | ||
items("k2")("notFiltered") shouldBe Json.fromString("string") | ||
items("k3")("toBeFiltered") shouldBe Json.fromString("but not here") | ||
items("k3")("notFiltered") shouldBe Json.fromString("string2") | ||
} | ||
|
||
it should "filter out columns completely when not inferring schema (confirming it filters from RAW)" in { | ||
val tableName = "with-some-null-values" | ||
val df = rawRead(table = tableName, database = testData.dbName, inferSchema = false, filterNullFields = Some(true)) | ||
df.count() shouldBe 3 | ||
val items = RawJsonConverter.rowsToRawItems(df.columns, "key", df.collect().toSeq).map(r => (r.key, r.columns)).toMap | ||
items("k1")("columns") shouldBe Json.fromString("{}") | ||
items("k2")("columns") shouldBe Json.fromString("{\"notFiltered\":\"string\"}") | ||
val columnsParsed: Option[JsonObject] = io.circe.parser.parse(items("k3")("columns").asString.get) match { | ||
case Right(json) => json.asObject | ||
case Left(error) => throw error | ||
} | ||
columnsParsed.get("notFiltered") shouldBe Some(Json.fromString("string2")) | ||
columnsParsed.get("toBeFiltered") shouldBe Some(Json.fromString("but not here")) | ||
} | ||
|
||
it should "return column in schema, even if every row has it filtered out" in { | ||
val tableName = "with-only-null-values-for-field" | ||
val df = rawRead(table = tableName, database = testData.dbName, inferSchema = true, filterNullFields = Some(true)) | ||
df.count() shouldBe 3 | ||
df.schema.fieldNames.toSet shouldBe Set("key", "lastUpdatedTime", "notFiltered", "toBeFiltered") | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it should be raw-specific like
rawOptimizeNullTransfer
or something? May depend on whether is is purely an optimization or can be visible in the results. If can be visible then mostly only applies to dynamic schemas like raw&filecontent (less so for datamodels?) - if we want to expose pruned json outputsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another option is name it
experimentalFilterRawNulls
so that we don't apply it by default yet and can revisit naming&semantic later after experimentsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is very raw specific, so naming it something in that nature makes sense! I'll find a new name for it.
It will not be visible for the users (the dataframe is the same). So at some point we might just make it default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added the new name to README.md too, and specified that it will probably become default in the future.