Skip to content

Commit

Permalink
Stream transformer: Use Http4s client for iglu lookups (close #1258)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter authored and oguzhanunlu committed Jun 2, 2023
1 parent c9cd80f commit 5e88175
Show file tree
Hide file tree
Showing 12 changed files with 41 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import fs2.compression.{Compression => FS2Compression}
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event

import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload, Processor}
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup

import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue}
Expand Down Expand Up @@ -69,6 +70,7 @@ object Processing {
config: Config,
processor: Processor
): Stream[F, Unit] = {
implicit val lookup: RegistryLookup[F] = resources.registryLookup
val transformer: Transformer[F] = config.formats match {
case f: TransformerConfig.Formats.Shred =>
Transformer.ShredTransformer(resources.igluResolver, resources.propertiesCache, f, processor)
Expand Down Expand Up @@ -117,7 +119,7 @@ object Processing {
}

/** Build a sink according to settings and pass it through `generic.Partitioned` */
def getSink[F[_]: Async, C: Monoid](
def getSink[F[_]: Async: RegistryLookup, C: Monoid](
resources: Resources[F, C],
config: Config.Output,
formats: Formats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.typelevel.log4cats.Logger

import io.circe.Json

import cats.Applicative
import cats.{Applicative, Monad, MonadThrow}
import cats.implicits._
import cats.effect._
import cats.effect.std.Random
Expand Down Expand Up @@ -59,7 +59,8 @@ case class Resources[F[_], C](
inputStream: Queue.Consumer[F],
checkpointer: Queue.Consumer.Message[F] => C,
blobStorage: BlobStorage[F],
badSink: BadSink[F]
badSink: BadSink[F],
registryLookup: RegistryLookup[F]
)

object Resources {
Expand Down Expand Up @@ -114,7 +115,8 @@ object Resources {
inputStream,
checkpointer,
blobStorage,
badSink
badSink,
registryLookup
)

private def mkBadSink[F[_]: Applicative](
Expand Down Expand Up @@ -145,17 +147,21 @@ object Resources {
case Left(error) => Sync[F].raiseError[Resolver[F]](error)
}
}
private def mkEventParser[F[_]: Sync: Clock](igluResolver: Resolver[F], config: Config): Resource[F, EventParser] = Resource.eval {
mkAtomicLengths(igluResolver, config).flatMap {
case Right(atomicLengths) => Sync[F].pure(Event.parser(atomicLengths))
case Left(error) => Sync[F].raiseError[EventParser](error)
private def mkEventParser[F[_]: MonadThrow: Clock: RegistryLookup](igluResolver: Resolver[F], config: Config): Resource[F, EventParser] =
Resource.eval {
mkAtomicLengths(igluResolver, config).flatMap {
case Right(atomicLengths) => Monad[F].pure(Event.parser(atomicLengths))
case Left(error) => MonadThrow[F].raiseError[EventParser](error)
}
}
}
private def mkAtomicLengths[F[_]: Sync: Clock](igluResolver: Resolver[F], config: Config): F[Either[RuntimeException, Map[String, Int]]] =
private def mkAtomicLengths[F[_]: Monad: Clock: RegistryLookup](
igluResolver: Resolver[F],
config: Config
): F[Either[RuntimeException, Map[String, Int]]] =
if (config.featureFlags.truncateAtomicFields) {
EventUtils.getAtomicLengths(igluResolver)
} else {
Sync[F].pure(Right(Map.empty[String, Int]))
Monad[F].pure(Right(Map.empty[String, Int]))
}

private def mkTransformerInstanceId[F[_]: Sync] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.parquet

import cats.data.EitherT
import cats.effect.Async
import cats.Monad
import cats.effect.{Async, Clock}
import cats.implicits._
import com.github.mjakubowski84.parquet4s.{ParquetWriter, Path, RowParquetRecord}
import com.github.mjakubowski84.parquet4s.parquet.viaParquet
import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup
import com.snowplowanalytics.snowplow.analytics.scalasdk.Data
import com.snowplowanalytics.snowplow.badrows.FailureDetails
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow
Expand All @@ -39,7 +41,7 @@ import java.net.URI

object ParquetSink {

def parquetSink[F[_]: Async, C](
def parquetSink[F[_]: Async: RegistryLookup, C](
resources: Resources[F, C],
compression: Compression,
maxRecordsPerFile: Long,
Expand All @@ -66,7 +68,7 @@ object ParquetSink {
}
}

private def createSchemaFromTypes[F[_]: Async, C](
private def createSchemaFromTypes[F[_]: Monad: Clock: RegistryLookup, C](
resources: Resources[F, C],
types: List[Data.ShreddedType]
): EitherT[F, FailureDetails.LoaderIgluError, MessageType] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sinks

import cats.effect.IO
import cats.effect.{IO, Resource}
import cats.effect.unsafe.implicits.global

import java.time.Instant
Expand All @@ -24,8 +24,9 @@ import fs2.{Stream, text}
import io.circe.Json
import io.circe.optics.JsonPath._
import io.circe.parser.{parse => parseCirce}
import org.http4s.client.{Client => Http4sClient}
import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.Registry
import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLookup, Registry, RegistryLookup}
import com.snowplowanalytics.iglu.schemaddl.Properties
import com.snowplowanalytics.lrumap.CreateLruMap
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
Expand Down Expand Up @@ -118,6 +119,12 @@ object TransformingSpec {
def data: Transformed.Data = value._2
}

implicit val registryLookup: RegistryLookup[IO] = Http4sRegistryLookup {
Http4sClient[IO] { _ =>
Resource.eval(IO.raiseError(new RuntimeException("Unexpected registry lookup")))
}
}

val VersionPlaceholder = "version_placeholder"
val BadPathPrefix = "output=bad"
val DefaultTimestamp = "2020-09-29T10:38:56.653Z"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ package object common {

def isInputError(clientError: ClientError): Boolean =
clientError match {
case ClientError.ValidationError(_) =>
case ClientError.ValidationError(_, _) =>
false
case ClientError.ResolutionError(map) =>
map.values.toList.flatMap(_.errors.toList).exists {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.snowplowanalytics.snowplow.rdbloader.common

import cats.Id
import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup._
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}
import com.snowplowanalytics.iglu.schemaddl.parquet.Type.Nullability.{Nullable, Required}
import com.snowplowanalytics.iglu.schemaddl.parquet.{Field, Type}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup._
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage
import com.snowplowanalytics.snowplow.rdbloader.dsl.Monitoring.AlertPayload
import com.snowplowanalytics.snowplow.rdbloader.generated.BuildInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.batch
import cats.Id
import cats.implicits._
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup._
import com.snowplowanalytics.iglu.client.{Client, Resolver}
import com.snowplowanalytics.iglu.client.validator.CirceValidator
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.batch
import cats.Id
import cats.implicits._
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup._
import com.snowplowanalytics.iglu.schemaddl.parquet.FieldValue
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.ParquetTransformer
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.fields.AllFields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import cats.syntax.show._

import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig
import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup._
import com.snowplowanalytics.iglu.schemaddl.Properties

import com.snowplowanalytics.lrumap.CreateLruMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.batch

import cats.Id
import com.snowplowanalytics.iglu.client.Resolver
import com.snowplowanalytics.iglu.client.resolver.registries.JavaNetRegistryLookup._
import com.snowplowanalytics.snowplow.rdbloader.common.catsClockIdInstance
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.{AtomicFieldsProvider, NonAtomicFieldsProvider}
import com.snowplowanalytics.snowplow.rdbloader.common.transformation.parquet.fields.AllFields
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object Dependencies {
object V {
// Scala (Loader)
val decline = "2.4.1"
val igluClient = "3.0.0-M1"
val igluClient = "3.0.0"
val igluCore = "1.1.1"
val badrows = "2.2.0"
val analyticsSdk = "3.1.0"
Expand Down

0 comments on commit 5e88175

Please sign in to comment.