Skip to content

Commit

Permalink
byte file size limit
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob-Eliat-Eliat committed Nov 25, 2024
1 parent cf1ce3d commit 9e7276a
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
26 changes: 25 additions & 1 deletion src/main/scala/cognite/spark/v1/FileContentRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cognite.spark.v1
import cats.effect.std.Dispatcher
import cats.effect.{IO, Resource}
import com.cognite.sdk.scala.v1.FileDownloadExternalId
import fs2.{Pipe, Stream}
import fs2.{Chunk, Pipe, Stream}
import org.apache.commons.io.FileUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.col
Expand All @@ -18,12 +18,14 @@ import sttp.client3.asynchttpclient.fs2.AsyncHttpClientFs2Backend
import sttp.client3.{SttpBackend, UriContext, asStreamUnsafe, basicRequest}
import sttp.model.Uri

import java.nio.charset.Charset
import scala.collection.immutable._
import scala.concurrent.duration.Duration

//The trait exist for testing purposes
trait WithSizeLimit {
val sizeLimit: Long
val lineSizeLimit: Long
}

class FileContentRelation(config: RelationConfig, fileExternalId: String, inferSchema: Boolean)(
Expand All @@ -36,6 +38,9 @@ class FileContentRelation(config: RelationConfig, fileExternalId: String, inferS

override val sizeLimit: Long = 5 * FileUtils.ONE_GB

//In utf-8 2.5MB is 2.5 million characters at most
override val lineSizeLimit: Long = 2500 * FileUtils.ONE_KB

@transient private lazy val sttpFileContentStreamingBackendResource
: Resource[IO, SttpBackend[IO, Fs2Streams[IO] with WebSockets]] =
for {
Expand Down Expand Up @@ -114,6 +119,7 @@ class FileContentRelation(config: RelationConfig, fileExternalId: String, inferS
byteStream
.through(enforceSizeLimit)
.through(fs2.text.utf8.decode)
.through(enforceLineSizeLimit)
.through(fs2.text.lines)
case Left(error) =>
Stream.raiseError[IO](new Exception(s"Error while requesting underlying file: $error"))
Expand All @@ -137,6 +143,24 @@ class FileContentRelation(config: RelationConfig, fileExternalId: String, inferS
(newSize, chunk)
}

private val enforceLineSizeLimit: Pipe[IO, String, String] =
in =>
in.scanChunks(0L) { (acc, chunk: Chunk[String]) =>

val stringChunk = chunk.asSeq.mkString
val totalLineSize = acc + stringChunk.split("\n")(0).getBytes(Charset.forName("UTF-8")).length.toLong
val newSize = {
if(stringChunk.contains("\n")) {
stringChunk.split("\n")(1).getBytes(Charset.forName("UTF-8")).length.toLong
} else {
acc + stringChunk.getBytes(Charset.forName("UTF-8")).length.toLong
}
}
if (totalLineSize > lineSizeLimit || newSize > lineSizeLimit)
throw new CdfSparkException(s"Line size too big. SizeLimit: $lineSizeLimit")
else
(newSize, chunk)
}
}

object FileContentRelation extends NamedRelation {
Expand Down
20 changes: 19 additions & 1 deletion src/test/scala/cognite/spark/v1/FileContentRelationTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class FileContentRelationTest extends FlatSpec with Matchers with SparkTest wit
)
}

it should "get size from endpoint and check for it" in {
it should "limit by total size" in {
val relation = new FileContentRelation(
getDefaultConfig(auth = CdfSparkAuth.OAuth2ClientCredentials(credentials = writeCredentials), projectName = OIDCWrite.project, cluster = OIDCWrite.cluster, applicationName = Some("jetfire-test")),
fileExternalId = fileExternalId,
Expand All @@ -242,6 +242,24 @@ class FileContentRelationTest extends FlatSpec with Matchers with SparkTest wit
}
}

it should "limit by line size" in {
val relation = new FileContentRelation(
getDefaultConfig(auth = CdfSparkAuth.OAuth2ClientCredentials(credentials = writeCredentials), projectName = OIDCWrite.project, cluster = OIDCWrite.cluster, applicationName = Some("jetfire-test")),
fileExternalId = fileExternalId,
true
)(spark.sqlContext) {
override val lineSizeLimit: Long = 10
}

val expectedMessage = "Line size too big. SizeLimit: 10"
val exception = sparkIntercept {
relation.createDataFrame
}
withClue(s"Expected '$expectedMessage' but got: '${exception.getMessage}'") {
exception.getMessage.contains(expectedMessage) should be(true)
}
}

it should "throw if the file was never uploaded" in {

val toCreate: FileCreate = FileCreate(
Expand Down

0 comments on commit 9e7276a

Please sign in to comment.