Skip to content

Commit

Permalink
Merge branch 'master' into sequence-should-support-update
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob-Eliat-Eliat authored Nov 15, 2024
2 parents 97c7ea4 + 9f48e29 commit 56806d1
Show file tree
Hide file tree
Showing 10 changed files with 13 additions and 35 deletions.
3 changes: 2 additions & 1 deletion src/main/scala/cognite/spark/v1/AssetsRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,14 @@ object AssetsRelation
with NamedRelation
with InsertSchema
with DeleteWithIdSchema
with UpdateSchemaFromUpsertSchema {
with UpdateSchema {
override val name = "assets"
import cognite.spark.compiletime.macros.StructTypeEncoderMacro._

override val upsertSchema: StructType = structType[AssetsUpsertSchema]()
override val insertSchema: StructType = structType[AssetsInsertSchema]()
override val readSchema: StructType = structType[AssetsReadSchema]()
override val updateSchema: StructType = upsertSchema

}

Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/cognite/spark/v1/DataSetsRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,15 @@ object DataSetsRelation
extends UpsertSchema
with ReadSchema
with InsertSchema
with UpdateSchemaFromUpsertSchema
with UpdateSchema
with NamedRelation {
override val name = "datasets"
import cognite.spark.compiletime.macros.StructTypeEncoderMacro._

override val upsertSchema: StructType = structType[DataSetsUpsertSchema]()
override val insertSchema: StructType = structType[DataSetsInsertSchema]()
override val readSchema: StructType = structType[DataSetsReadSchema]()
override val updateSchema: StructType = upsertSchema

}

Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/cognite/spark/v1/EventsRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,14 @@ object EventsRelation
with NamedRelation
with InsertSchema
with DeleteWithIdSchema
with UpdateSchemaFromUpsertSchema {
with UpdateSchema {
override val name: String = "events"
import cognite.spark.compiletime.macros.StructTypeEncoderMacro._

override val upsertSchema: StructType = structType[EventsUpsertSchema]()
override val insertSchema: StructType = structType[EventsInsertSchema]()
override val readSchema: StructType = structType[EventsReadSchema]()
override val updateSchema: StructType = upsertSchema
}

trait WithNullableExternalId extends WithExternalIdGeneric[OptionalField] {
Expand Down
8 changes: 0 additions & 8 deletions src/main/scala/cognite/spark/v1/FileContentRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ class FileContentRelation(config: RelationConfig, fileExternalId: String, inferS
)(backend => backend.close())
} yield backend

val acceptedMimeTypes: Seq[String] =
Seq("application/jsonlines", "application/x-ndjson", "application/jsonl")

@transient private lazy val dataFrame: DataFrame = createDataFrame

override def schema: StructType =
Expand All @@ -64,15 +61,10 @@ class FileContentRelation(config: RelationConfig, fileExternalId: String, inferS

val validUrl = for {
file <- client.files.retrieveByExternalId(fileExternalId)
mimeType <- IO.pure(file.mimeType)
_ <- IO.raiseWhen(!file.uploaded)(
new CdfSparkException(
f"Could not read file because no file was uploaded for externalId: $fileExternalId")
)
_ <- IO.raiseWhen(mimeType.exists(!acceptedMimeTypes.contains(_)))(
new CdfSparkException("Wrong mimetype. Expects application/jsonlines")
)

downloadLink <- client.files
.downloadLink(FileDownloadExternalId(fileExternalId))
uri <- IO.pure(uri"${downloadLink.downloadUrl}")
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/cognite/spark/v1/FilesRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,15 @@ object FilesRelation
with ReadSchema
with InsertSchema
with DeleteWithIdSchema
with UpdateSchemaFromUpsertSchema
with UpdateSchema
with NamedRelation {
override val name: String = "files"
import cognite.spark.compiletime.macros.StructTypeEncoderMacro._

override val upsertSchema: StructType = structType[FilesUpsertSchema]()
override val insertSchema: StructType = structType[FilesInsertSchema]()
override val readSchema: StructType = structType[FilesReadSchema]()

override val updateSchema: StructType = upsertSchema
}

final case class FilesUpsertSchema(
Expand Down
4 changes: 0 additions & 4 deletions src/main/scala/cognite/spark/v1/RelationSchemas.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ trait UpdateSchema {
val updateSchema: StructType
}

trait UpdateSchemaFromUpsertSchema extends UpsertSchema with UpdateSchema {
override val updateSchema: StructType = upsertSchema
}

// TODO: this isn't applied to some relations that have read support
trait ReadSchema {
val readSchema: StructType
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/cognite/spark/v1/RelationshipsRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,15 @@ object RelationshipsRelation
with ReadSchema
with DeleteWithExternalIdSchema
with InsertSchema
with UpdateSchemaFromUpsertSchema
with UpdateSchema
with NamedRelation {
override val name: String = "relationships"
import cognite.spark.compiletime.macros.StructTypeEncoderMacro._

override val insertSchema: StructType = structType[RelationshipsInsertSchema]()
override val readSchema: StructType = structType[RelationshipsReadSchema]()
override val upsertSchema: StructType = structType[RelationshipsUpsertSchema]()
override val updateSchema: StructType = upsertSchema
}

final case class RelationshipsInsertSchema(
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/cognite/spark/v1/TimeSeriesRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cognite.spark.v1
import cats.effect.IO
import cognite.spark.v1.PushdownUtilities._
import cognite.spark.compiletime.macros.SparkSchemaHelper._
import com.cognite.sdk.scala.common.{WithId}
import com.cognite.sdk.scala.common.WithId
import com.cognite.sdk.scala.v1._
import com.cognite.sdk.scala.v1.resources.TimeSeriesResource
import fs2.Stream
Expand Down Expand Up @@ -96,14 +96,14 @@ object TimeSeriesRelation
with InsertSchema
with NamedRelation
with DeleteWithIdSchema
with UpdateSchemaFromUpsertSchema {
with UpdateSchema {
override val name: String = "timeseries"
import cognite.spark.compiletime.macros.StructTypeEncoderMacro._

override val upsertSchema: StructType = structType[TimeSeriesUpsertSchema]()
override val insertSchema: StructType = structType[TimeSeriesInsertSchema]()
override val readSchema: StructType = structType[TimeSeriesReadSchema]()

override val updateSchema: StructType = upsertSchema
}

final case class TimeSeriesUpsertSchema(
Expand Down
13 changes: 0 additions & 13 deletions src/test/scala/cognite/spark/v1/FileContentRelationTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,6 @@ class FileContentRelationTest extends FlatSpec with Matchers with SparkTest wit
)
}

it should "fail with a sensible error if the mimetype is wrong" in {
val exception = sparkIntercept {
val sourceDf: DataFrame = dataFrameReaderUsingOidc
.useOIDCWrite
.option("type", "filecontent")
.option("externalId", fileWithWrongMimeTypeExternalId)
.load()
sourceDf.createOrReplaceTempView("fileContent")
spark.sqlContext.sql(s"select * from filecontent").collect()
}
assert(exception.getMessage.contains("Wrong mimetype. Expects application/jsonlines"))
}

it should "infer the schema" in {
val sourceDf: DataFrame = dataFrameReaderUsingOidc
.useOIDCWrite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,4 @@ class SttpClientBackendFactoryTest extends FlatSpec with Matchers {
asyncHttpClient.close()
asyncHttpClient.isClosed shouldBe true
}

}

0 comments on commit 56806d1

Please sign in to comment.