Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support optional filtering of null values from RAW #986

Merged
merged 10 commits into from
Nov 14, 2024

Conversation

thorkildcognite
Copy link
Contributor

@thorkildcognite thorkildcognite commented Nov 13, 2024

This change exposes the Raw-API support for filtering nulls server side through the cdp data source. It does not impact schema interference, as that is still done without filtering.

This change exposes the Raw-API support for filtering nulls server
side through the cdp data source. It does not impact schema
interference, as that is still done without interference.
@thorkildcognite thorkildcognite requested a review from a team as a code owner November 13, 2024 10:20
@thorkildcognite thorkildcognite requested review from dmivankov and removed request for kornelione November 13, 2024 10:21
useSharedThrottle = toBoolean(parameters, "useSharedThrottle", defaultValue = false)
useSharedThrottle = toBoolean(parameters, "useSharedThrottle", defaultValue = false),
filterNullsOnNonSchemaQueries =
toBoolean(parameters, "filterNullsOnNonSchemaQueries", defaultValue = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it should be raw-specific like rawOptimizeNullTransfer or something? May depend on whether is is purely an optimization or can be visible in the results. If can be visible then mostly only applies to dynamic schemas like raw&filecontent (less so for datamodels?) - if we want to expose pruned json outputs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another option is name it experimentalFilterRawNulls so that we don't apply it by default yet and can revisit naming&semantic later after experiments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is very raw specific, so naming it something in that nature makes sense! I'll find a new name for it.

It will not be visible for the users (the dataframe is the same). So at some point we might just make it default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added the new name to README.md too, and specified that it will probably become default in the future.

@@ -127,6 +130,7 @@ class RawTableRelation(
filter: RawRowFilter,
requestedKeys: Option[Set[String]],
schema: Option[StructType],
filterNulls: Boolean,
collectMetrics: Boolean = config.collectMetrics,
collectTestMetrics: Boolean = config.collectTestMetrics): RDD[Row] = {
val configWithLimit =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rowConverter 2 lines below does the dataframe conversion and has inference/noinference cases. For noinference it will be

  def untypedRowConverter(row: RawRow): Row =
    new GenericRowWithSchema(
      Array(
        row.key,
        row.lastUpdatedTime.map(java.sql.Timestamp.from).orNull,
        JsonObject(row.columns.toSeq: _*).asJson.noSpaces
      ),
      RawTableRelation.defaultSchema
    )

the question is whether JsonObject(row.columns.toSeq: _*).asJson was also dropping nulls or was keeping them (or if anything else dropped them earlier). If they were preserved up to here before then it will be observable disappearance under new parameter, we should probably find a way to keep the old visible behavior by default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a test that verifies that the old behavior is still there if we don't enable it. If filtering is enabled, then it will be filtered, though, but that is kind of the point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also added one to verify that setting it to false and not set it is the same. That should cover most bases.

cursors.map(rawClient.filterOnePartition(filter, _, limit))
}

private def getStreamByKeys(client: GenericClient[IO], keys: Set[String]): Stream[IO, RawRow] = {
// Note that retrieveByKey does not currently support filtering out null fields. When/If that is
// added, we should also pass in the flag to filter out those here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

column filtering param is also missing for it, at least in the docs https://api-docs.cognite.com/20230101/tag/Raw/operation/getRow

@@ -894,4 +909,40 @@ class RawTableRelationTest
err.getMessage shouldBe "Error while loading RAW row [key='k'] in column 'value': java.lang.NumberFormatException: For input string: \"test\""

}

it should "filter out fields with null value but not impact schema inference" in {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also add test for no-null-filtering case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As if it isn't enabled, it should behave as before? Sure, can do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think what I actually wanted is no-inference case with null filtering and without, both outcomes of there being a diff and no diff will be interesting.

For inference case not a lot to see looking at resulting dataframe, but otoh extra small test never hurts

Copy link

codecov bot commented Nov 13, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 83.23%. Comparing base (b56b570) to head (c6c8efd).
Report is 2 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master     #986      +/-   ##
==========================================
+ Coverage   83.17%   83.23%   +0.05%     
==========================================
  Files          48       48              
  Lines        3133     3138       +5     
  Branches      451      448       -3     
==========================================
+ Hits         2606     2612       +6     
+ Misses        527      526       -1     
Files with missing lines Coverage Δ
...rc/main/scala/cognite/spark/v1/DefaultSource.scala 91.33% <100.00%> (+0.03%) ⬆️
...main/scala/cognite/spark/v1/RawTableRelation.scala 94.70% <100.00%> (+0.14%) ⬆️
...c/main/scala/cognite/spark/v1/RelationConfig.scala 100.00% <ø> (ø)

... and 1 file with indirect coverage changes

Copy link
Contributor

@dmivankov dmivankov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as it is only under parameter also open to merging in current shape, looking into no-inference case closer at a later point when we consider enabling it by default

auto-merge was automatically disabled November 14, 2024 10:05

Pull Request is not mergeable

@dmivankov dmivankov merged commit 4e12657 into master Nov 14, 2024
4 checks passed
@dmivankov dmivankov deleted the start-filtering-those-pesky-nulls branch November 14, 2024 11:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants