diff --git a/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala b/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala new file mode 100644 index 000000000..af972be87 --- /dev/null +++ b/ingest/src/main/scala/hydra.ingest/app/AppConfig.scala @@ -0,0 +1,70 @@ +package hydra.ingest.app + +import cats.implicits._ +import ciris.{ConfigValue, _} +import hydra.kafka.config.MetadataSchemaConfig +import hydra.kafka.model.Subject +import org.apache.avro.Schema + +import scala.concurrent.duration._ + +object AppConfig { + + final case class SchemaRegistryConfig( + fullUrl: String, + maxCacheSize: Int + ) + + private val schemaRegistryConfig: ConfigValue[SchemaRegistryConfig] = + ( + env("HYDRA_SCHEMA_REGISTRY_URL").as[String].default("http://localhost:8081"), + env("HYDRA_MAX_SCHEMAS_PER_SUBJECT").as[Int].default(1000) + ) + .parMapN(SchemaRegistryConfig) + + final case class CreateTopicConfig( + schemaRegistryConfig: SchemaRegistryConfig, + numRetries: Int, + baseBackoffDelay: FiniteDuration + ) + + private val createTopicConfig: ConfigValue[CreateTopicConfig] = + ( + schemaRegistryConfig, + env("CREATE_TOPIC_NUM_RETRIES").as[Int].default(1), + env("CREATE_TOPIC_BASE_BACKOFF_DELAY").as[FiniteDuration].default(1.second) + ) + .parMapN(CreateTopicConfig) + + private implicit val subjectConfigDecoder: ConfigDecoder[String, Subject] = + ConfigDecoder[String, String].mapOption("Subject")(Subject.createValidated) + + final case class V2MetadataTopicConfig( + topicName: Subject, + keySchema: Schema, + valueSchema: Schema, + createOnStartup: Boolean + ) + + private val v2MetadataTopicConfig: ConfigValue[V2MetadataTopicConfig] = + ( + env("HYDRA_V2_METADATA_TOPIC_NAME").as[Subject].default(Subject.createValidated("_hydra.v2.metadata").get), + env("HYDRA_V2_METADATA_CREATE_ON_STARTUP").as[Boolean].default(false) + ) + .parMapN { (subject, createOnStartup) => + V2MetadataTopicConfig(subject, MetadataSchemaConfig.keySchema, MetadataSchemaConfig.valueSchema, createOnStartup) + } + + final case class AppConfig( + createTopicConfig: CreateTopicConfig, + v2MetadataTopicConfig: V2MetadataTopicConfig + ) + + val appConfig: ConfigValue[AppConfig] = + ( + createTopicConfig, + v2MetadataTopicConfig + ) + .parMapN(AppConfig) + +} \ No newline at end of file diff --git a/ingest/src/main/scala/hydra.ingest/app/Main.scala b/ingest/src/main/scala/hydra.ingest/app/Main.scala index 2f3ac7d2b..ca2855586 100644 --- a/ingest/src/main/scala/hydra.ingest/app/Main.scala +++ b/ingest/src/main/scala/hydra.ingest/app/Main.scala @@ -1,16 +1,24 @@ package hydra.ingest.app +import cats.effect.{ExitCode, IO, IOApp} +import cats.implicits._ import configs.syntax._ import hydra.common.logging.LoggingAdapter import hydra.core.bootstrap.BootstrappingSupport +import hydra.ingest.modules.{Algebras, Bootstrap, Programs} +import io.chrisdavenport.log4cats.SelfAwareStructuredLogger +import io.chrisdavenport.log4cats.slf4j.Slf4jLogger import kamon.Kamon import kamon.prometheus.PrometheusReporter + import scala.concurrent.ExecutionContext.Implicits.global // $COVERAGE-OFF$Disabling highlighting by default until a workaround for https://issues.scala-lang.org/browse/SI-8596 is found -object Main extends App with BootstrappingSupport with LoggingAdapter { - try { +object Main extends IOApp with BootstrappingSupport with LoggingAdapter { + + private implicit val catsLogger: SelfAwareStructuredLogger[IO] = Slf4jLogger.getLogger[IO] + private val oldBoostrap = IO(try { val enablePrometheus = applicationConfig .get[Boolean]("monitoring.prometheus.enable") .valueOrElse(false) @@ -28,7 +36,22 @@ object Main extends App with BootstrappingSupport with LoggingAdapter { .map(_ => containerService.shutdown()) .onComplete(throw e) } + }) + + private val mainProgram = AppConfig.appConfig.load[IO].flatMap { config => + for { + algebras <- Algebras.make[IO](config.createTopicConfig.schemaRegistryConfig) + programs <- Programs.make[IO](config.createTopicConfig, algebras) + bootstrap <- Bootstrap.make[IO](programs.createTopic, config.v2MetadataTopicConfig) + _ <- bootstrap.bootstrapAll + _ <- oldBoostrap + } yield () } + + override def run(args: List[String]): IO[ExitCode] = { + mainProgram.as(ExitCode.Success) + } + } // $COVERAGE-ON diff --git a/ingest/src/main/scala/hydra.ingest/modules/Algebras.scala b/ingest/src/main/scala/hydra.ingest/modules/Algebras.scala new file mode 100644 index 000000000..3c4747e07 --- /dev/null +++ b/ingest/src/main/scala/hydra.ingest/modules/Algebras.scala @@ -0,0 +1,19 @@ +package hydra.ingest.modules + +import hydra.avro.registry.SchemaRegistry +import hydra.ingest.app.AppConfig.SchemaRegistryConfig +import cats.effect.Sync +import cats.implicits._ + +final class Algebras[F[_]] private ( + val schemaRegistry: SchemaRegistry[F] +) + +object Algebras { + def make[F[_]: Sync]( + schemaRegistryConfig: SchemaRegistryConfig + ): F[Algebras[F]] = + for { + schemaRegistry <- SchemaRegistry.live[F](schemaRegistryConfig.fullUrl, schemaRegistryConfig.maxCacheSize) + } yield new Algebras[F](schemaRegistry) +} \ No newline at end of file diff --git a/ingest/src/main/scala/hydra.ingest/modules/Bootstrap.scala b/ingest/src/main/scala/hydra.ingest/modules/Bootstrap.scala new file mode 100644 index 000000000..d9b0f8717 --- /dev/null +++ b/ingest/src/main/scala/hydra.ingest/modules/Bootstrap.scala @@ -0,0 +1,33 @@ +package hydra.ingest.modules + +import cats.effect.Sync +import hydra.core.bootstrap.CreateTopicProgram +import cats.implicits._ +import hydra.ingest.app.AppConfig.V2MetadataTopicConfig + +final class Bootstrap[F[_]: Sync] private ( + createTopicProgram: CreateTopicProgram[F], + cfg: V2MetadataTopicConfig + ) { + + def bootstrapAll: F[Unit] = for { + _ <- bootstrapMetadataTopic + } yield () + + private def bootstrapMetadataTopic: F[Unit] = + if (cfg.createOnStartup) { + createTopicProgram.createTopic(cfg.topicName.value, cfg.keySchema, cfg.valueSchema) + } else { + Sync[F].unit + } + +} + +object Bootstrap { + def make[F[_]: Sync]( + createTopicProgram: CreateTopicProgram[F], + v2MetadataTopicConfig: V2MetadataTopicConfig + ): F[Bootstrap[F]] = Sync[F].delay { + new Bootstrap[F](createTopicProgram, v2MetadataTopicConfig) + } +} diff --git a/ingest/src/main/scala/hydra.ingest/modules/Programs.scala b/ingest/src/main/scala/hydra.ingest/modules/Programs.scala new file mode 100644 index 000000000..958876f69 --- /dev/null +++ b/ingest/src/main/scala/hydra.ingest/modules/Programs.scala @@ -0,0 +1,35 @@ +package hydra.ingest.modules + +import cats.effect._ +import cats.implicits._ +import hydra.core.bootstrap.CreateTopicProgram +import hydra.ingest.app.AppConfig.CreateTopicConfig +import io.chrisdavenport.log4cats.Logger +import retry.RetryPolicies._ +import retry.RetryPolicy + +final class Programs[F[_]: Logger: Sync: Timer] private ( + cfg: CreateTopicConfig, + algebras: Algebras[F] +) { + + val retryPolicy: RetryPolicy[F] = + limitRetries[F](cfg.numRetries) |+| exponentialBackoff[F](cfg.baseBackoffDelay) + + val createTopic: CreateTopicProgram[F] = new CreateTopicProgram[F]( + algebras.schemaRegistry, + retryPolicy + ) + +} + +object Programs { + + def make[F[_]: Logger: Sync: Timer]( + createTopicConfig: CreateTopicConfig, + algebras: Algebras[F] + ): F[Programs[F]] = Sync[F].delay { + new Programs[F](createTopicConfig, algebras) + } + +} diff --git a/kafka/src/main/scala/hydra/kafka/config/MetadataSchemaConfig.scala b/kafka/src/main/scala/hydra/kafka/config/MetadataSchemaConfig.scala new file mode 100644 index 000000000..347aa3dde --- /dev/null +++ b/kafka/src/main/scala/hydra/kafka/config/MetadataSchemaConfig.scala @@ -0,0 +1,19 @@ +package hydra.kafka.config + +import org.apache.avro.Schema +import org.apache.avro.Schema.Parser + +import scala.io.Source + +object MetadataSchemaConfig { + + private def parseSchema(fileName: String): Schema = { + val jsonSchemaString: String = Source.fromResource(fileName).mkString + new Parser().parse(jsonSchemaString) + } + + lazy val keySchema: Schema = parseSchema("HydraMetadataTopicKeyV2.avsc") + + lazy val valueSchema: Schema = parseSchema("HydraMetadataTopicValueV2.avsc") + +} diff --git a/kafka/src/test/scala/hydra/kafka/HydraMetadataTopicV2SchemaSpec.scala b/kafka/src/test/scala/hydra/kafka/HydraMetadataTopicV2SchemaSpec.scala deleted file mode 100644 index a4b9b6d0c..000000000 --- a/kafka/src/test/scala/hydra/kafka/HydraMetadataTopicV2SchemaSpec.scala +++ /dev/null @@ -1,23 +0,0 @@ -package hydra.kafka - -import org.scalatest.FlatSpec -import org.scalatest.Matchers -import org.apache.avro.Schema.Parser -import scala.io.Source -import org.apache.avro.Schema - -class HydraMetadataTopicV2SchemaSpec extends FlatSpec with Matchers { - - it should "be able to parse the HydraMetadataTopicValueV2.avsc into the Schema class" in { - val jsonSchemaString: String = Source.fromResource("HydraMetadataTopicValueV2.avsc").mkString - val schema = new Parser().parse(jsonSchemaString) - schema shouldBe a[Schema] - } - - it should "be able to parse the HydraMetadataTopicKeyV2.avsc into the Schema Class" in { - val jsonSchemaString: String = Source.fromResource("HydraMetadataTopicKeyV2.avsc").mkString - val schema = new Parser().parse(jsonSchemaString) - schema shouldBe a[Schema] - } - -} \ No newline at end of file diff --git a/kafka/src/test/scala/hydra/kafka/config/MetadataSchemaConfigSpec.scala b/kafka/src/test/scala/hydra/kafka/config/MetadataSchemaConfigSpec.scala new file mode 100644 index 000000000..663cbc8d1 --- /dev/null +++ b/kafka/src/test/scala/hydra/kafka/config/MetadataSchemaConfigSpec.scala @@ -0,0 +1,16 @@ +package hydra.kafka.config + +import org.apache.avro.Schema +import org.scalatest.{FlatSpec, Matchers} + +class MetadataSchemaConfigSpec extends FlatSpec with Matchers { + + it should "be able to parse the HydraMetadataTopicValueV2.avsc into the Schema class" in { + MetadataSchemaConfig.keySchema shouldBe a[Schema] + } + + it should "be able to parse the HydraMetadataTopicKeyV2.avsc into the Schema Class" in { + MetadataSchemaConfig.valueSchema shouldBe a[Schema] + } + +} \ No newline at end of file diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 1e2e84a36..2d08601f4 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -15,6 +15,7 @@ object Dependencies { val catsLoggerVersion = "1.0.1" val catsRetryVersion = "1.0.0" val catsVersion = "2.0.0" + val cirisVersion = "1.0.3" val confluentVersion = "5.4.0" val easyMockVersion = "3.5" //needed for mocking static java methods val hikariCPVersion = "2.6.2" @@ -49,6 +50,8 @@ object Dependencies { "com.github.cb372" %% "cats-retry" % catsRetryVersion ) + val ciris = "is.cir" %% "ciris" % cirisVersion + val scalaConfigs = "com.github.kxbmap" %% "configs" % kxbmapConfigVersion val typesafeConfig = "com.typesafe" % "config" % typesafeConfigVersion @@ -176,7 +179,7 @@ object Dependencies { Seq(guavacache, reflections, akkaKryo, serviceContainer, sdNotify, postgres, h2db, retry, catsLogger) ++ confluent ++ kamon ++ aeron - val ingestDeps: Seq[ModuleID] = coreDeps ++ akkaHttpHal + val ingestDeps: Seq[ModuleID] = coreDeps ++ akkaHttpHal ++ Seq(ciris) val rabbitDeps: Seq[ModuleID] = logging ++ Seq(scalaConfigs) ++ joda ++ opRabbit ++ testDeps