Skip to content

Commit

Permalink
Merge pull request #253 from pluralsight/feature/bootstrapLogic
Browse files Browse the repository at this point in the history
Bootstrap Topic Creation
  • Loading branch information
lewisjkl authored Feb 14, 2020
2 parents d398293 + 7a2680c commit 4d7fbd2
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 26 deletions.
70 changes: 70 additions & 0 deletions ingest/src/main/scala/hydra.ingest/app/AppConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package hydra.ingest.app

import cats.implicits._
import ciris.{ConfigValue, _}
import hydra.kafka.config.MetadataSchemaConfig
import hydra.kafka.model.Subject
import org.apache.avro.Schema

import scala.concurrent.duration._

object AppConfig {

final case class SchemaRegistryConfig(
fullUrl: String,
maxCacheSize: Int
)

private val schemaRegistryConfig: ConfigValue[SchemaRegistryConfig] =
(
env("HYDRA_SCHEMA_REGISTRY_URL").as[String].default("http://localhost:8081"),
env("HYDRA_MAX_SCHEMAS_PER_SUBJECT").as[Int].default(1000)
)
.parMapN(SchemaRegistryConfig)

final case class CreateTopicConfig(
schemaRegistryConfig: SchemaRegistryConfig,
numRetries: Int,
baseBackoffDelay: FiniteDuration
)

private val createTopicConfig: ConfigValue[CreateTopicConfig] =
(
schemaRegistryConfig,
env("CREATE_TOPIC_NUM_RETRIES").as[Int].default(1),
env("CREATE_TOPIC_BASE_BACKOFF_DELAY").as[FiniteDuration].default(1.second)
)
.parMapN(CreateTopicConfig)

private implicit val subjectConfigDecoder: ConfigDecoder[String, Subject] =
ConfigDecoder[String, String].mapOption("Subject")(Subject.createValidated)

final case class V2MetadataTopicConfig(
topicName: Subject,
keySchema: Schema,
valueSchema: Schema,
createOnStartup: Boolean
)

private val v2MetadataTopicConfig: ConfigValue[V2MetadataTopicConfig] =
(
env("HYDRA_V2_METADATA_TOPIC_NAME").as[Subject].default(Subject.createValidated("_hydra.v2.metadata").get),
env("HYDRA_V2_METADATA_CREATE_ON_STARTUP").as[Boolean].default(false)
)
.parMapN { (subject, createOnStartup) =>
V2MetadataTopicConfig(subject, MetadataSchemaConfig.keySchema, MetadataSchemaConfig.valueSchema, createOnStartup)
}

final case class AppConfig(
createTopicConfig: CreateTopicConfig,
v2MetadataTopicConfig: V2MetadataTopicConfig
)

val appConfig: ConfigValue[AppConfig] =
(
createTopicConfig,
v2MetadataTopicConfig
)
.parMapN(AppConfig)

}
27 changes: 25 additions & 2 deletions ingest/src/main/scala/hydra.ingest/app/Main.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
package hydra.ingest.app

import cats.effect.{ExitCode, IO, IOApp}
import cats.implicits._
import configs.syntax._
import hydra.common.logging.LoggingAdapter
import hydra.core.bootstrap.BootstrappingSupport
import hydra.ingest.modules.{Algebras, Bootstrap, Programs}
import io.chrisdavenport.log4cats.SelfAwareStructuredLogger
import io.chrisdavenport.log4cats.slf4j.Slf4jLogger
import kamon.Kamon
import kamon.prometheus.PrometheusReporter

import scala.concurrent.ExecutionContext.Implicits.global

// $COVERAGE-OFF$Disabling highlighting by default until a workaround for https://issues.scala-lang.org/browse/SI-8596 is found
object Main extends App with BootstrappingSupport with LoggingAdapter {
try {
object Main extends IOApp with BootstrappingSupport with LoggingAdapter {

private implicit val catsLogger: SelfAwareStructuredLogger[IO] = Slf4jLogger.getLogger[IO]

private val oldBoostrap = IO(try {
val enablePrometheus = applicationConfig
.get[Boolean]("monitoring.prometheus.enable")
.valueOrElse(false)
Expand All @@ -28,7 +36,22 @@ object Main extends App with BootstrappingSupport with LoggingAdapter {
.map(_ => containerService.shutdown())
.onComplete(throw e)
}
})

private val mainProgram = AppConfig.appConfig.load[IO].flatMap { config =>
for {
algebras <- Algebras.make[IO](config.createTopicConfig.schemaRegistryConfig)
programs <- Programs.make[IO](config.createTopicConfig, algebras)
bootstrap <- Bootstrap.make[IO](programs.createTopic, config.v2MetadataTopicConfig)
_ <- bootstrap.bootstrapAll
_ <- oldBoostrap
} yield ()
}

override def run(args: List[String]): IO[ExitCode] = {
mainProgram.as(ExitCode.Success)
}

}

// $COVERAGE-ON
19 changes: 19 additions & 0 deletions ingest/src/main/scala/hydra.ingest/modules/Algebras.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package hydra.ingest.modules

import hydra.avro.registry.SchemaRegistry
import hydra.ingest.app.AppConfig.SchemaRegistryConfig
import cats.effect.Sync
import cats.implicits._

final class Algebras[F[_]] private (
val schemaRegistry: SchemaRegistry[F]
)

object Algebras {
def make[F[_]: Sync](
schemaRegistryConfig: SchemaRegistryConfig
): F[Algebras[F]] =
for {
schemaRegistry <- SchemaRegistry.live[F](schemaRegistryConfig.fullUrl, schemaRegistryConfig.maxCacheSize)
} yield new Algebras[F](schemaRegistry)
}
33 changes: 33 additions & 0 deletions ingest/src/main/scala/hydra.ingest/modules/Bootstrap.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package hydra.ingest.modules

import cats.effect.Sync
import hydra.core.bootstrap.CreateTopicProgram
import cats.implicits._
import hydra.ingest.app.AppConfig.V2MetadataTopicConfig

final class Bootstrap[F[_]: Sync] private (
createTopicProgram: CreateTopicProgram[F],
cfg: V2MetadataTopicConfig
) {

def bootstrapAll: F[Unit] = for {
_ <- bootstrapMetadataTopic
} yield ()

private def bootstrapMetadataTopic: F[Unit] =
if (cfg.createOnStartup) {
createTopicProgram.createTopic(cfg.topicName.value, cfg.keySchema, cfg.valueSchema)
} else {
Sync[F].unit
}

}

object Bootstrap {
def make[F[_]: Sync](
createTopicProgram: CreateTopicProgram[F],
v2MetadataTopicConfig: V2MetadataTopicConfig
): F[Bootstrap[F]] = Sync[F].delay {
new Bootstrap[F](createTopicProgram, v2MetadataTopicConfig)
}
}
35 changes: 35 additions & 0 deletions ingest/src/main/scala/hydra.ingest/modules/Programs.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package hydra.ingest.modules

import cats.effect._
import cats.implicits._
import hydra.core.bootstrap.CreateTopicProgram
import hydra.ingest.app.AppConfig.CreateTopicConfig
import io.chrisdavenport.log4cats.Logger
import retry.RetryPolicies._
import retry.RetryPolicy

final class Programs[F[_]: Logger: Sync: Timer] private (
cfg: CreateTopicConfig,
algebras: Algebras[F]
) {

val retryPolicy: RetryPolicy[F] =
limitRetries[F](cfg.numRetries) |+| exponentialBackoff[F](cfg.baseBackoffDelay)

val createTopic: CreateTopicProgram[F] = new CreateTopicProgram[F](
algebras.schemaRegistry,
retryPolicy
)

}

object Programs {

def make[F[_]: Logger: Sync: Timer](
createTopicConfig: CreateTopicConfig,
algebras: Algebras[F]
): F[Programs[F]] = Sync[F].delay {
new Programs[F](createTopicConfig, algebras)
}

}
19 changes: 19 additions & 0 deletions kafka/src/main/scala/hydra/kafka/config/MetadataSchemaConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package hydra.kafka.config

import org.apache.avro.Schema
import org.apache.avro.Schema.Parser

import scala.io.Source

object MetadataSchemaConfig {

private def parseSchema(fileName: String): Schema = {
val jsonSchemaString: String = Source.fromResource(fileName).mkString
new Parser().parse(jsonSchemaString)
}

lazy val keySchema: Schema = parseSchema("HydraMetadataTopicKeyV2.avsc")

lazy val valueSchema: Schema = parseSchema("HydraMetadataTopicValueV2.avsc")

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package hydra.kafka.config

import org.apache.avro.Schema
import org.scalatest.{FlatSpec, Matchers}

class MetadataSchemaConfigSpec extends FlatSpec with Matchers {

it should "be able to parse the HydraMetadataTopicValueV2.avsc into the Schema class" in {
MetadataSchemaConfig.keySchema shouldBe a[Schema]
}

it should "be able to parse the HydraMetadataTopicKeyV2.avsc into the Schema Class" in {
MetadataSchemaConfig.valueSchema shouldBe a[Schema]
}

}
5 changes: 4 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ object Dependencies {
val catsLoggerVersion = "1.0.1"
val catsRetryVersion = "1.0.0"
val catsVersion = "2.0.0"
val cirisVersion = "1.0.3"
val confluentVersion = "5.4.0"
val easyMockVersion = "3.5" //needed for mocking static java methods
val hikariCPVersion = "2.6.2"
Expand Down Expand Up @@ -49,6 +50,8 @@ object Dependencies {
"com.github.cb372" %% "cats-retry" % catsRetryVersion
)

val ciris = "is.cir" %% "ciris" % cirisVersion

val scalaConfigs = "com.github.kxbmap" %% "configs" % kxbmapConfigVersion

val typesafeConfig = "com.typesafe" % "config" % typesafeConfigVersion
Expand Down Expand Up @@ -176,7 +179,7 @@ object Dependencies {
Seq(guavacache, reflections, akkaKryo, serviceContainer, sdNotify, postgres, h2db, retry, catsLogger) ++
confluent ++ kamon ++ aeron

val ingestDeps: Seq[ModuleID] = coreDeps ++ akkaHttpHal
val ingestDeps: Seq[ModuleID] = coreDeps ++ akkaHttpHal ++ Seq(ciris)

val rabbitDeps: Seq[ModuleID] = logging ++ Seq(scalaConfigs) ++ joda ++ opRabbit ++ testDeps

Expand Down

0 comments on commit 4d7fbd2

Please sign in to comment.