Skip to content

Commit

Permalink
Merge branch 'master' into audunska/multi-output
Browse files Browse the repository at this point in the history
  • Loading branch information
audunska authored Jul 13, 2023
2 parents 8cba67d + 22f05a9 commit fa27165
Show file tree
Hide file tree
Showing 8 changed files with 375 additions and 286 deletions.
11 changes: 6 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import com.typesafe.sbt.packager.docker.Cmd
import sbtassembly.AssemblyPlugin.autoImport._
import sbtassembly.MergeStrategy

val scala212 = "2.12.15"
Expand All @@ -8,7 +9,7 @@ val sparkVersion = "3.3.1"
val circeVersion = "0.14.5"
val sttpVersion = "3.5.2"
val Specs2Version = "4.6.0"
val cogniteSdkVersion = "2.7.723"
val cogniteSdkVersion = "2.7.724"

val prometheusVersion = "0.15.0"
val log4sVersion = "1.8.2"
Expand Down Expand Up @@ -69,13 +70,13 @@ lazy val commonSettings = Seq(
pomIncludeRepository := { _ => false },
publishTo := {
val nexus = "https://oss.sonatype.org/"
if (isSnapshot.value) Some("snapshots" at nexus + "content/repositories/snapshots")
else Some("releases" at nexus + "service/local/staging/deploy/maven2")
if (isSnapshot.value) { Some("snapshots" at nexus + "content/repositories/snapshots") }
else { Some("releases" at nexus + "service/local/staging/deploy/maven2") }
},
publishMavenStyle := true,
pgpPassphrase := {
if (gpgPass.isDefined) gpgPass.map(_.toCharArray)
else None
if (gpgPass.isDefined) { gpgPass.map(_.toCharArray) }
else { None }
},
Test / fork := true,
Test / testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-oD"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private[spark] class FlexibleDataModelConnectionRelation(
.flatMap { instances =>
val instanceCreate = InstanceCreate(
items = instances,
replace = Some(true),
replace = Some(false),
autoCreateStartNodes = Some(true),
autoCreateEndNodes = Some(true)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,12 @@ private[spark] class FlexibleDataModelCorePropertyRelation(
propDefMap: Map[String, ViewPropertyDefinition],
instanceSpace: Option[String]) = {
val nodesOrEdges = intendedUsage match {
case Usage.Node => createNodes(rows, schema, propDefMap, source, instanceSpace)
case Usage.Edge => createEdges(rows, schema, propDefMap, source, instanceSpace)
case Usage.All => createNodesOrEdges(rows, schema, propDefMap, source, instanceSpace)
case Usage.Node =>
createNodes(rows, schema, propDefMap, source, instanceSpace, config.ignoreNullFields)
case Usage.Edge =>
createEdges(rows, schema, propDefMap, source, instanceSpace, config.ignoreNullFields)
case Usage.All =>
createNodesOrEdges(rows, schema, propDefMap, source, instanceSpace, config.ignoreNullFields)
}
nodesOrEdges match {
case Right(items) if items.nonEmpty =>
Expand Down
275 changes: 136 additions & 139 deletions src/main/scala/cognite/spark/v1/FlexibleDataModelRelationUtils.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,106 @@ class FlexibleDataModelCorePropertyRelationTest
}
}

it should "handle nulls according to ignoreNullFields" in {
val externalId = s"sparkDsTestNullProps-${shortRandomString()}"
try {
// First, create a random node
val insertDf = spark
.sql(
s"""
|select
|'${externalId}' as externalId,
|'stringProp1Val' as stringProp1,
|'stringProp2Val' as stringProp2
|""".stripMargin)
val insertResult = Try {
insertRowsToModel(
modelSpace = spaceExternalId,
modelExternalId = testDataModelExternalId,
modelVersion = viewVersion,
viewExternalId = viewStartNodeAndEndNodesExternalId,
instanceSpace = Some(spaceExternalId),
insertDf
)
}
insertResult shouldBe Success(())
// Now, update one of the values with a null, but ignoreNullFields
val updateDf1 = spark
.sql(
s"""
|select
|'${externalId}' as externalId,
|'updatedProp1Val' as stringProp1,
|null as stringProp2
|""".stripMargin)
val updateResult1 = Try {
insertRowsToModel(
modelSpace = spaceExternalId,
modelExternalId = testDataModelExternalId,
modelVersion = viewVersion,
viewExternalId = viewStartNodeAndEndNodesExternalId,
instanceSpace = Some(spaceExternalId),
updateDf1,
ignoreNullFields=true
)
}
updateResult1 shouldBe Success(())

// Fetch the updated instance
val instance1 = client.instances.retrieveByExternalIds(
items = Seq(InstanceRetrieve(InstanceType.Node, externalId, spaceExternalId)),
sources = Some(Seq(InstanceSource(ViewReference(
space = spaceExternalId,
externalId = viewStartNodeAndEndNodesExternalId,
version = viewVersion
))))
).unsafeRunSync().items.head
val props1 = instance1.properties.get.get(spaceExternalId)
.get(s"${viewStartNodeAndEndNodesExternalId}/${viewVersion}")
// Check the properties
props1.get("stringProp1") shouldBe Some(InstancePropertyValue.String("updatedProp1Val"))
props1.get("stringProp2") shouldBe Some(InstancePropertyValue.String("stringProp2Val"))

// Update the value with null, and don't ignoreNullFields
val updateDf2 = spark
.sql(
s"""
|select
|'${externalId}' as externalId,
|'updatedProp1ValAgain' as stringProp1,
|null as stringProp2
|""".stripMargin)
val updateResult2 = Try {
insertRowsToModel(
modelSpace = spaceExternalId,
modelExternalId = testDataModelExternalId,
modelVersion = viewVersion,
viewExternalId = viewStartNodeAndEndNodesExternalId,
instanceSpace = Some(spaceExternalId),
updateDf2,
ignoreNullFields = false
)
}
updateResult2 shouldBe Success(())
// Fetch the updated instance
val instance = client.instances.retrieveByExternalIds(
items = Seq(InstanceRetrieve(InstanceType.Node, externalId, spaceExternalId)),
sources = Some(Seq(InstanceSource(ViewReference(
space = spaceExternalId,
externalId = viewStartNodeAndEndNodesExternalId,
version = viewVersion
))))
).unsafeRunSync().items.head
val props2 = instance.properties.get.get(spaceExternalId)
.get(s"${viewStartNodeAndEndNodesExternalId}/${viewVersion}")
// Check the properties
props2.get("stringProp1") shouldBe Some(InstancePropertyValue.String("updatedProp1ValAgain"))
props2.get("stringProp2") shouldBe None
} finally {
val _ = client.instances.delete(Seq(NodeDeletionRequest(space = spaceExternalId, externalId = externalId))).unsafeRunSync()
}
}

// This should be kept as ignored
ignore should "delete containers and views used for testing" in {
client.containers
Expand Down Expand Up @@ -1031,15 +1131,15 @@ class FlexibleDataModelCorePropertyRelationTest
Some(Seq(EdgeOrNodeData(
viewRef,
Some(Map(
"forEqualsFilter" -> InstancePropertyValue.String("str1"),
"forInFilter" -> InstancePropertyValue.String("str1"),
"forGteFilter" -> InstancePropertyValue.Int32(1),
"forGtFilter" -> InstancePropertyValue.Int32(2),
"forLteFilter" -> InstancePropertyValue.Int64(2),
"forLtFilter" -> InstancePropertyValue.Int64(3),
"forOrFilter1" -> InstancePropertyValue.Float64(5.1),
"forOrFilter2" -> InstancePropertyValue.Float64(6.1),
"forIsNotNullFilter" -> InstancePropertyValue.Date(LocalDate.now())
"forEqualsFilter" -> Some(InstancePropertyValue.String("str1")),
"forInFilter" -> Some(InstancePropertyValue.String("str1")),
"forGteFilter" -> Some(InstancePropertyValue.Int32(1)),
"forGtFilter" -> Some(InstancePropertyValue.Int32(2)),
"forLteFilter" -> Some(InstancePropertyValue.Int64(2)),
"forLtFilter" -> Some(InstancePropertyValue.Int64(3)),
"forOrFilter1" -> Some(InstancePropertyValue.Float64(5.1)),
"forOrFilter2" -> Some(InstancePropertyValue.Float64(6.1)),
"forIsNotNullFilter" -> Some(InstancePropertyValue.Date(LocalDate.now()))
))
)))
),
Expand All @@ -1049,18 +1149,18 @@ class FlexibleDataModelCorePropertyRelationTest
Some(Seq(EdgeOrNodeData(
viewRef,
Some(Map(
"forEqualsFilter" -> InstancePropertyValue.String("str2"),
"forInFilter" -> InstancePropertyValue.String("str2"),
"forGteFilter" -> InstancePropertyValue.Int32(5),
"forGtFilter" -> InstancePropertyValue.Int32(2),
"forLteFilter" -> InstancePropertyValue.Int64(1),
"forLtFilter" -> InstancePropertyValue.Int64(-1),
"forOrFilter1" -> InstancePropertyValue.Float64(5.1),
"forOrFilter2" -> InstancePropertyValue.Float64(6.1),
"forIsNotNullFilter" -> InstancePropertyValue.Date(LocalDate.now()),
"forIsNullFilter" -> InstancePropertyValue.Object(Json.fromJsonObject(
"forEqualsFilter" -> Some(InstancePropertyValue.String("str2")),
"forInFilter" -> Some(InstancePropertyValue.String("str2")),
"forGteFilter" -> Some(InstancePropertyValue.Int32(5)),
"forGtFilter" -> Some(InstancePropertyValue.Int32(2)),
"forLteFilter" -> Some(InstancePropertyValue.Int64(1)),
"forLtFilter" -> Some(InstancePropertyValue.Int64(-1)),
"forOrFilter1" -> Some(InstancePropertyValue.Float64(5.1)),
"forOrFilter2" -> Some(InstancePropertyValue.Float64(6.1)),
"forIsNotNullFilter" -> Some(InstancePropertyValue.Date(LocalDate.now())),
"forIsNullFilter" -> Some(InstancePropertyValue.Object(Json.fromJsonObject(
JsonObject("a" -> Json.fromString("a"), "b" -> Json.fromInt(1))))
))
)))
)))
)
),
Expand Down Expand Up @@ -1171,7 +1271,8 @@ class FlexibleDataModelCorePropertyRelationTest
viewExternalId: String,
instanceSpace: Option[String],
df: DataFrame,
onConflict: String = "upsert"): Unit =
onConflict: String = "upsert",
ignoreNullFields: Boolean = true): Unit =
df.write
.format("cognite.spark.v1")
.option("type", FlexibleDataModelRelationFactory.ResourceType)
Expand All @@ -1189,6 +1290,7 @@ class FlexibleDataModelCorePropertyRelationTest
.option("onconflict", onConflict)
.option("collectMetrics", true)
.option("metricsPrefix", s"$modelExternalId-$modelVersion")
.option("ignoreNullFields", ignoreNullFields)
.save()

private def getUpsertedMetricsCountForModel(modelSpace: String, modelExternalId: String): Long =
Expand Down
Loading

0 comments on commit fa27165

Please sign in to comment.