Skip to content

Commit

Permalink
typelevel#787 - Spark 4 starter pack
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-twiner committed Mar 1, 2024
1 parent 0616953 commit a70d5c3
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 10 deletions.
9 changes: 7 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
val sparkVersion = "3.5.0"
val sparkVersion = "3.5.0"//"4.0.0-SNAPSHOT" must have the apache_snaps configured
val spark34Version = "3.4.2"
val spark33Version = "3.3.4"
val catsCoreVersion = "2.10.0"
Expand All @@ -21,6 +21,11 @@ resolvers in Global += MavenRepository(
"sonatype-s01-snapshots",
Resolver.SonatypeS01RepositoryRoot + "/snapshots"
)
resolvers in Global += MavenRepository(
"apache_snaps",
"https://repository.apache.org/content/repositories/snapshots"
)

import scala.concurrent.duration.DurationInt
import lmcoursier.definitions.CachePolicy

Expand Down Expand Up @@ -103,7 +108,7 @@ lazy val dataset = project
Test / unmanagedSourceDirectories += baseDirectory.value / "src" / "test" / "spark-3.3+"
)
.settings(
libraryDependencies += "com.sparkutils" %% "shim_runtime_3.5.0.oss_3.5" % shimVersion changing ()
libraryDependencies += "com.sparkutils" %% "shim_runtime_3.5.0.oss_3.5" % shimVersion changing () //4.0.0.oss_4.0 for 4 snapshot
)
.settings(datasetSettings)
.settings(sparkDependencies(sparkVersion))
Expand Down
2 changes: 1 addition & 1 deletion dataset/src/main/scala/frameless/TypedColumn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package frameless
import frameless.functions.{litAggr, lit => flit}
import frameless.syntax._

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.{With => _, _} // 787 - Spark 4 source code compat
import org.apache.spark.sql.types.DecimalType
import org.apache.spark.sql.{Column, FramelessInternals}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.{Alias, CreateStruct}
import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.types._
Expand All @@ -17,8 +16,7 @@ object FramelessInternals {

def resolveExpr(ds: Dataset[_], colNames: Seq[String]): NamedExpression = {
ds.toDF().queryExecution.analyzed.resolve(colNames, ds.sparkSession.sessionState.analyzer.resolver).getOrElse {
throw new AnalysisException(
s"""Cannot resolve column name "$colNames" among (${ds.schema.fieldNames.mkString(", ")})""")
throw org.apache.spark.sql.ShimUtils.analysisException(ds, colNames)
}
}

Expand Down
4 changes: 2 additions & 2 deletions dataset/src/test/scala/frameless/SchemaTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package frameless

import frameless.functions.aggregate._
import frameless.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{Metadata, StructType}
import org.scalacheck.Prop
import org.scalacheck.Prop._
import org.scalatest.matchers.should.Matchers

class SchemaTests extends TypedDatasetSuite with Matchers {

def structToNonNullable(struct: StructType): StructType = {
StructType(struct.fields.map( f => f.copy(nullable = false)))
StructType(struct.fields.map( f => f.copy(nullable = false, metadata = Metadata.empty))) // Spark 4 puts metadata in _2 in schema test
}

def prop[A](dataset: TypedDataset[A], ignoreNullable: Boolean = false): Prop = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package frameless.sql.rules

import frameless._
import frameless.functions.Lit
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{currentTimestamp, microsToInstant}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{microsToInstant, instantToMicros}
import org.apache.spark.sql.sources.{EqualTo, GreaterThanOrEqual, IsNotNull}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import java.time.Instant

class FramelessLitPushDownTests extends SQLRulesSuite {
private val now: Long = currentTimestamp()
private val now: Long = instantToMicros(Instant.now())

test("java.sql.Timestamp push-down") {
val expected = java.sql.Timestamp.from(microsToInstant(now))
Expand Down

0 comments on commit a70d5c3

Please sign in to comment.