Skip to content

Commit

Permalink
splain plugin
Browse files Browse the repository at this point in the history
+jvmopts

scalafmt
  • Loading branch information
tribbloid committed Nov 26, 2023
1 parent 75fc432 commit f58ccd8
Show file tree
Hide file tree
Showing 159 changed files with 10,375 additions and 6,249 deletions.
1 change: 1 addition & 0 deletions .jvmopts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-Xmx4G
17 changes: 16 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ val Scala213 = "2.13.12"
ThisBuild / tlBaseVersion := "0.15"

ThisBuild / crossScalaVersions := Seq(Scala213, Scala212)
ThisBuild / scalaVersion := Scala212
ThisBuild / scalaVersion := Scala213

lazy val root = project
.in(file("."))
Expand Down Expand Up @@ -315,6 +315,21 @@ lazy val framelessSettings = Seq(
mimaPreviousArtifacts ~= {
_.filterNot(_.revision == "0.11.0") // didn't release properly
},
//
// addCompilerPlugin(
// "io.tryp" % "splain" % "1.1.0-RC0" cross CrossVersion.patch
// ),
// scalacOptions ++= Seq(
// "-Vimplicits",
// "-Vimplicits-verbose-tree",
// "-Vtype-diffs",
// "-P:splain:Vimplicits-diverging",
// "-P:splain:Vtype-detail:4",
// "-P:splain:Vtype-diffs-detail:3"
// // "-P:splain:Vdebug"
// ),
//

/**
* The old Scala XML is pulled from Scala 2.12.x.
*
Expand Down
17 changes: 12 additions & 5 deletions cats/src/main/scala/frameless/cats/FramelessSyntax.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,25 @@ import _root_.cats.mtl.Ask
import org.apache.spark.sql.SparkSession

trait FramelessSyntax extends frameless.FramelessSyntax {
implicit class SparkJobOps[F[_], A](fa: F[A])(implicit S: Sync[F], A: Ask[F, SparkSession]) {

implicit class SparkJobOps[F[_], A](
fa: F[A]
)(implicit
S: Sync[F],
A: Ask[F, SparkSession]) {
import S._, A._

def withLocalProperty(key: String, value: String): F[A] =
for {
session <- ask
_ <- delay(session.sparkContext.setLocalProperty(key, value))
a <- fa
_ <- delay(session.sparkContext.setLocalProperty(key, value))
a <- fa
} yield a

def withGroupId(groupId: String): F[A] = withLocalProperty("spark.jobGroup.id", groupId)
def withGroupId(groupId: String): F[A] =
withLocalProperty("spark.jobGroup.id", groupId)

def withDescription(description: String): F[A] = withLocalProperty("spark.job.description", description)
def withDescription(description: String): F[A] =
withLocalProperty("spark.job.description", description)
}
}
13 changes: 11 additions & 2 deletions cats/src/main/scala/frameless/cats/SparkDelayInstances.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,16 @@ import _root_.cats.effect.Sync
import org.apache.spark.sql.SparkSession

trait SparkDelayInstances {
implicit def framelessCatsSparkDelayForSync[F[_]](implicit S: Sync[F]): SparkDelay[F] = new SparkDelay[F] {
def delay[A](a: => A)(implicit spark: SparkSession): F[A] = S.delay(a)

implicit def framelessCatsSparkDelayForSync[F[_]](
implicit
S: Sync[F]
): SparkDelay[F] = new SparkDelay[F] {

def delay[A](
a: => A
)(implicit
spark: SparkSession
): F[A] = S.delay(a)
}
}
1 change: 1 addition & 0 deletions cats/src/main/scala/frameless/cats/SparkTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import _root_.cats.data.Kleisli
import org.apache.spark.SparkContext

object SparkTask {

def apply[A](f: SparkContext => A): SparkTask[A] =
Kleisli[Id, SparkContext, A](f)

Expand Down
76 changes: 61 additions & 15 deletions cats/src/main/scala/frameless/cats/implicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,73 +2,119 @@ package frameless
package cats

import _root_.cats._
import _root_.cats.kernel.{CommutativeMonoid, CommutativeSemigroup}
import _root_.cats.kernel.{ CommutativeMonoid, CommutativeSemigroup }
import _root_.cats.syntax.all._
import alleycats.Empty

import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD

object implicits extends FramelessSyntax with SparkDelayInstances {

implicit class rddOps[A: ClassTag](lhs: RDD[A]) {
def csum(implicit m: CommutativeMonoid[A]): A =

def csum(
implicit
m: CommutativeMonoid[A]
): A =
lhs.fold(m.empty)(_ |+| _)
def csumOption(implicit m: CommutativeSemigroup[A]): Option[A] =

def csumOption(
implicit
m: CommutativeSemigroup[A]
): Option[A] =
lhs.aggregate[Option[A]](None)(
(acc, a) => Some(acc.fold(a)(_ |+| a)),
(l, r) => l.fold(r)(x => r.map(_ |+| x) orElse Some(x))
)

def cmin(implicit o: Order[A], e: Empty[A]): A = {
def cmin(implicit
o: Order[A],
e: Empty[A]
): A = {
if (lhs.isEmpty()) e.empty
else lhs.reduce(_ min _)
}
def cminOption(implicit o: Order[A]): Option[A] =

def cminOption(
implicit
o: Order[A]
): Option[A] =
csumOption(new CommutativeSemigroup[A] {
def combine(l: A, r: A) = l min r
})

def cmax(implicit o: Order[A], e: Empty[A]): A = {
def cmax(implicit
o: Order[A],
e: Empty[A]
): A = {
if (lhs.isEmpty()) e.empty
else lhs.reduce(_ max _)
}
def cmaxOption(implicit o: Order[A]): Option[A] =

def cmaxOption(
implicit
o: Order[A]
): Option[A] =
csumOption(new CommutativeSemigroup[A] {
def combine(l: A, r: A) = l max r
})
}

implicit class pairRddOps[K: ClassTag, V: ClassTag](lhs: RDD[(K, V)]) {
def csumByKey(implicit m: CommutativeSemigroup[V]): RDD[(K, V)] = lhs.reduceByKey(_ |+| _)
def cminByKey(implicit o: Order[V]): RDD[(K, V)] = lhs.reduceByKey(_ min _)
def cmaxByKey(implicit o: Order[V]): RDD[(K, V)] = lhs.reduceByKey(_ max _)

def csumByKey(
implicit
m: CommutativeSemigroup[V]
): RDD[(K, V)] = lhs.reduceByKey(_ |+| _)

def cminByKey(
implicit
o: Order[V]
): RDD[(K, V)] = lhs.reduceByKey(_ min _)

def cmaxByKey(
implicit
o: Order[V]
): RDD[(K, V)] = lhs.reduceByKey(_ max _)
}
}

object union {

implicit def unionSemigroup[A]: Semigroup[RDD[A]] =
new Semigroup[RDD[A]] {
def combine(lhs: RDD[A], rhs: RDD[A]): RDD[A] = lhs union rhs
}
}

object inner {
implicit def pairwiseInnerSemigroup[K: ClassTag, V: ClassTag: Semigroup]: Semigroup[RDD[(K, V)]] =

implicit def pairwiseInnerSemigroup[
K: ClassTag,
V: ClassTag: Semigroup
]: Semigroup[RDD[(K, V)]] =
new Semigroup[RDD[(K, V)]] {

def combine(lhs: RDD[(K, V)], rhs: RDD[(K, V)]): RDD[(K, V)] =
lhs.join(rhs).mapValues { case (x, y) => x |+| y }
}
}

object outer {
implicit def pairwiseOuterSemigroup[K: ClassTag, V: ClassTag](implicit m: Monoid[V]): Semigroup[RDD[(K, V)]] =

implicit def pairwiseOuterSemigroup[K: ClassTag, V: ClassTag](
implicit
m: Monoid[V]
): Semigroup[RDD[(K, V)]] =
new Semigroup[RDD[(K, V)]] {

def combine(lhs: RDD[(K, V)], rhs: RDD[(K, V)]): RDD[(K, V)] =
lhs.fullOuterJoin(rhs).mapValues {
case (Some(x), Some(y)) => x |+| y
case (None, Some(y)) => y
case (Some(x), None) => x
case (None, None) => m.empty
case (None, Some(y)) => y
case (Some(x), None) => x
case (None, None) => m.empty
}
}
}
24 changes: 16 additions & 8 deletions cats/src/test/scala/frameless/cats/FramelessSyntaxTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ import _root_.cats.effect.IO
import _root_.cats.effect.unsafe.implicits.global
import org.apache.spark.sql.SparkSession
import org.scalatest.matchers.should.Matchers
import org.scalacheck.{Test => PTest}
import org.scalacheck.{ Test => PTest }
import org.scalacheck.Prop, Prop._
import org.scalacheck.effect.PropF, PropF._

class FramelessSyntaxTests extends TypedDatasetSuite with Matchers {
override val sparkDelay = null

def prop[A, B](data: Vector[X2[A, B]])(
implicit ev: TypedEncoder[X2[A, B]]
): Prop = {
def prop[A, B](
data: Vector[X2[A, B]]
)(implicit
ev: TypedEncoder[X2[A, B]]
): Prop = {
import implicits._

val dataset = TypedDataset.create(data).dataset
Expand All @@ -24,7 +26,13 @@ class FramelessSyntaxTests extends TypedDatasetSuite with Matchers {
val typedDataset = dataset.typed
val typedDatasetFromDataFrame = dataframe.unsafeTyped[X2[A, B]]

typedDataset.collect[IO]().unsafeRunSync().toVector ?= typedDatasetFromDataFrame.collect[IO]().unsafeRunSync().toVector
typedDataset
.collect[IO]()
.unsafeRunSync()
.toVector ?= typedDatasetFromDataFrame
.collect[IO]()
.unsafeRunSync()
.toVector
}

test("dataset typed - toTyped") {
Expand All @@ -37,8 +45,7 @@ class FramelessSyntaxTests extends TypedDatasetSuite with Matchers {

forAllF { (k: String, v: String) =>
val scopedKey = "frameless.tests." + k
1
.pure[ReaderT[IO, SparkSession, *]]
1.pure[ReaderT[IO, SparkSession, *]]
.withLocalProperty(scopedKey, v)
.withGroupId(v)
.withDescription(v)
Expand All @@ -47,7 +54,8 @@ class FramelessSyntaxTests extends TypedDatasetSuite with Matchers {
sc.getLocalProperty(scopedKey) shouldBe v
sc.getLocalProperty("spark.jobGroup.id") shouldBe v
sc.getLocalProperty("spark.job.description") shouldBe v
}.void
}
.void
}.check().unsafeRunSync().status shouldBe PTest.Passed
}
}
Loading

0 comments on commit f58ccd8

Please sign in to comment.