Skip to content

Commit

Permalink
Merge pull request #254 from pluralsight/feature/create-streamV2-endp…
Browse files Browse the repository at this point in the history
…oint

Feature/create stream v2 endpoint
  • Loading branch information
lewisjkl authored Feb 19, 2020
2 parents 4d7fbd2 + 961f755 commit b00079b
Show file tree
Hide file tree
Showing 16 changed files with 256 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import akka.actor.Props
* A trait that can be extended by classes providing services (actors) to be loaded during
* boot time for Hydra.
*
* The [[hydra.core.bootstrap.BootstrappingSupport]] trait scans the classpath for objects that extend this interface
* The [[BootstrappingSupport]] trait scans the classpath for objects that extend this interface
* and adds the services provided to the list of services to be managed by Hydra.
*
*/
Expand Down
2 changes: 2 additions & 0 deletions ingest/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ akka {
}

hydra {
v2.create-topic.enabled = false
v2.create-topic.enabled = ${?HYDRA_V2_CREATE_TOPIC_ENABLED}
monitoring.prometheus {
enable = false
enable = ${?MONITORING_PROMETHEUS_ENABLE}
Expand Down
4 changes: 3 additions & 1 deletion ingest/src/main/scala/hydra.ingest/app/Main.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package hydra.ingest.app

import akka.actor.ActorSystem
import cats.effect.{ExitCode, IO, IOApp}
import cats.implicits._
import configs.syntax._
import hydra.common.logging.LoggingAdapter
import hydra.core.bootstrap.BootstrappingSupport
import hydra.ingest.bootstrap.BootstrappingSupport
import hydra.ingest.modules.{Algebras, Bootstrap, Programs}
import io.chrisdavenport.log4cats.SelfAwareStructuredLogger
import io.chrisdavenport.log4cats.slf4j.Slf4jLogger
Expand All @@ -16,6 +17,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
// $COVERAGE-OFF$Disabling highlighting by default until a workaround for https://issues.scala-lang.org/browse/SI-8596 is found
object Main extends IOApp with BootstrappingSupport with LoggingAdapter {

private implicit val system: ActorSystem = containerService.system
private implicit val catsLogger: SelfAwareStructuredLogger[IO] = Slf4jLogger.getLogger[IO]

private val oldBoostrap = IO(try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,66 @@
package hydra.core.bootstrap
package hydra.ingest.bootstrap

import java.lang.reflect.Modifier

import akka.actor.Props
import akka.actor.{ActorSystem, Props}
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.directives.RouteDirectives
import cats.effect.{IO, Timer}
import com.github.vonnagy.service.container.ContainerBuilder
import com.github.vonnagy.service.container.http.routing.RoutedEndpoints
import com.github.vonnagy.service.container.listener.ContainerLifecycleListener
import com.github.vonnagy.service.container.service.ContainerService
import com.pluralsight.hydra.reflect.DoNotScan
import com.typesafe.config.ConfigFactory
import hydra.avro.registry.SchemaRegistry
import hydra.common.config.ConfigSupport
import hydra.common.logging.LoggingAdapter
import hydra.common.reflect.{ComponentInstantiator, ReflectionUtils}
import hydra.core.bootstrap.{CreateTopicProgram, ReflectionsWrapper, ServiceProvider}
import hydra.kafka.endpoints.BootstrapEndpointV2
import io.chrisdavenport.log4cats.Logger
import io.chrisdavenport.log4cats.slf4j.Slf4jLogger
import retry.{RetryPolicies, RetryPolicy}

import scala.concurrent.ExecutionContext
import scala.util.Try

class BootstrapEndpoints(implicit val system: ActorSystem, implicit val ec: ExecutionContext) extends RoutedEndpoints {

private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
private implicit val logger: Logger[IO] = Slf4jLogger.getLogger

private val schemaRegistryUrl = ConfigFactory.load().getString("hydra.schema.registry.url")

private val schemaRegistry = SchemaRegistry.live[IO](schemaRegistryUrl, 100).unsafeRunSync()

private val isBootstrapV2Enabled = ConfigFactory.load().getBoolean("hydra.v2.create-topic.enabled")

private val bootstrapV2Endpoint = {
if (isBootstrapV2Enabled) {
val retryPolicy: RetryPolicy[IO] = RetryPolicies.alwaysGiveUp
new BootstrapEndpointV2(new CreateTopicProgram[IO](schemaRegistry, retryPolicy)).route
} else {
RouteDirectives.reject
}
}

override def route: Route = bootstrapV2Endpoint
}

trait BootstrappingSupport extends ConfigSupport with LoggingAdapter {

import ReflectionsWrapper._

import scala.collection.JavaConverters._
import scala.util.control.Exception._

private def scanFor[T](clazz: Class[T]): Seq[Class[_ <: T]] = {
reflections.getSubTypesOf(clazz)
.asScala
.filterNot(c => Modifier.isAbstract(c.getModifiers))
.filterNot(c => c.isAnnotationPresent(classOf[DoNotScan])).toSeq
}

private val exceptionLogger = handling(classOf[Exception]) by { ex =>
log.error("Could not instantiate class.", ex); None
Expand All @@ -39,13 +79,6 @@ trait BootstrappingSupport extends ConfigSupport with LoggingAdapter {
exceptionLogger(Some(ComponentInstantiator.instantiate(clz, List(applicationConfig)).get))
}

private def scanFor[T](clazz: Class[T]): Seq[Class[_ <: T]] = {
reflections.getSubTypesOf(clazz)
.asScala
.filterNot(c => Modifier.isAbstract(c.getModifiers))
.filterNot(c => c.isAnnotationPresent(classOf[DoNotScan])).toSeq
}

def containerService: ContainerService = {
log.info(s"The following services will be started: ${services.map(_._1).mkString(", ")}")
ContainerBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package hydra.ingest.http
import akka.actor._
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.server.{ExceptionHandler, Rejection, Route}
import akka.stream.ActorMaterializer
import com.github.vonnagy.service.container.http.routing.RoutedEndpoints
import configs.syntax._
import hydra.common.logging.LoggingAdapter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import hydra.common.config.ConfigSupport
import hydra.common.logging.LoggingAdapter
import hydra.core.http.HydraDirectives
import hydra.ingest.bootstrap.HydraIngestorRegistryClient
import hydra.ingest.services.IngestorRegistry.{ FindAll, LookupResult }
import hydra.ingest.services.IngestorRegistry.{FindAll, LookupResult}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.concurrent.duration.{FiniteDuration, _}

/**
* Created by alexsilva on 12/22/15.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import akka.http.scaladsl.server.{ExceptionHandler, Route}
import akka.pattern.ask
import akka.util.Timeout
import ch.megard.akka.http.cors.scaladsl.CorsDirectives._
import com.github.vonnagy.service.container.http.routing.RoutedEndpoints
import hydra.avro.resource.SchemaResource
import hydra.common.config.ConfigSupport
import hydra.common.logging.LoggingAdapter
Expand All @@ -33,6 +32,8 @@ import hydra.core.http.CorsSupport
import hydra.core.marshallers.{GenericServiceResponse, HydraJsonSupport}
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException
import org.apache.avro.SchemaParseException
import akka.http.scaladsl.server.Directives._
import com.github.vonnagy.service.container.http.routing.RoutedEndpoints

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,62 @@
package hydra.core.bootstrap
package hydra.ingest.bootstrap

import akka.actor.{ActorSystem, Props}
import akka.http.scaladsl.server.Route
import akka.testkit.TestKit
import com.github.vonnagy.service.container.http.routing.RoutedEndpoints
import com.github.vonnagy.service.container.listener.ContainerLifecycleListener
import com.github.vonnagy.service.container.service.ContainerService
import hydra.core.ingest.TestIngestorDefault
import hydra.core.bootstrap.ServiceProvider
import hydra.core.ingest.{HydraRequest, Ingestor}
import hydra.core.transport.{AckStrategy, HydraRecord, RecordFactory}
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}

import scala.concurrent.ExecutionContext
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._

class TestIngestorDefault extends Ingestor {


/**
* This will _not_ override; instead it will use the default value of 1.second. We'll test it.
*/
override val initTimeout = 2.millisecond

val to = context.receiveTimeout

ingest {
case "hello" => sender ! "hi!"
case "timeout" => sender ! to
}

override val recordFactory = TestRecordFactory
}

object TestRecordFactory extends RecordFactory[String, String] {
override def build(r: HydraRequest)(implicit ec: ExecutionContext) = {
val timeout = r.metadataValueEquals("timeout", "true")
if (timeout) {
Future.successful(TimeoutRecord("test-topic", r.correlationId.toString, r.payload,
r.ackStrategy))
}
else {
Future.successful(TestRecord("test-topic", r.correlationId.toString, r.payload,
r.ackStrategy))
}
}
}

case class TestRecord(destination: String,
key: String,
payload: String,
ackStrategy: AckStrategy) extends HydraRecord[String, String]


case class TimeoutRecord(destination: String,
key: String,
payload: String,
ackStrategy: AckStrategy) extends HydraRecord[String, String]

/**
* Created by alexsilva on 3/7/17.
*/
class BootstrappingSupportSpec extends Matchers with FlatSpecLike with BootstrappingSupport with BeforeAndAfterAll {

val conf =
Expand All @@ -40,7 +83,7 @@ class BootstrappingSupportSpec extends Matchers with FlatSpecLike with Bootstrap

"The BootstrappingSupport trait" should
"load endpoints" in {
endpoints shouldBe Seq(classOf[DummyEndpoint])
endpoints should contain (classOf[DummyEndpoint])
}

it should "load listeners" in {
Expand All @@ -60,7 +103,6 @@ class BootstrappingSupportSpec extends Matchers with FlatSpecLike with Bootstrap
Seq("test" -> Props[TestIngestorDefault], "test2" -> Props[TestIngestorDefault])
, Seq(new DummyListener), "hydra_test")(container.system)
csvc.name shouldBe container.name
csvc.registeredRoutes shouldBe container.registeredRoutes
csvc.listeners.map(_.getClass) should contain(classOf[DummyListener])
csvc.name shouldBe container.name
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,22 @@ package hydra.ingest.http.mock

import akka.actor.ActorSystem
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{ExceptionHandler, Route}
import com.github.vonnagy.service.container.http.routing.RoutedEndpoints
import hydra.ingest.http.SchemasEndpoint
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException

import scala.concurrent.{ExecutionContext, Future}

class MockEndpoint(implicit system: ActorSystem, implicit val e: ExecutionContext)
extends RoutedEndpoints {
class MockEndpoint(implicit system: ActorSystem, implicit val e: ExecutionContext) {

def throwRestClientException(statusCode: Int, errorCode: Int, errorMessage: String): Future[Any] = {
throw new RestClientException(errorMessage, statusCode, errorCode)
}

val schemaRouteExceptionHandler: ExceptionHandler = new SchemasEndpoint().excptHandler

override def route: Route = {
def route: Route = {
pathPrefix("throwRestClientException") {
handleExceptions(schemaRouteExceptionHandler) {
get {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,14 @@ import akka.actor._
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Route
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.util.Timeout
import ch.megard.akka.http.cors.scaladsl.CorsDirectives._
import com.github.vonnagy.service.container.http.routing.RoutedEndpoints
import hydra.avro.registry.ConfluentSchemaRegistry
import hydra.common.logging.LoggingAdapter
import hydra.core.akka.SchemaRegistryActor
import hydra.core.http.{CorsSupport, HydraDirectives}
import hydra.core.marshallers.TopicMetadataRequest
import hydra.kafka.model.TopicMetadataAdapter
import hydra.kafka.services.TopicBootstrapActor._
import hydra.kafka.services.{StreamsManagerActor, TopicBootstrapActor}
import hydra.kafka.util.KafkaUtils

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (C) 2016 Pluralsight, LLC.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package hydra.kafka.endpoints

import akka.actor.ActorSystem
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import cats.effect.IO
import ch.megard.akka.http.cors.scaladsl.CorsDirectives.cors
import hydra.core.bootstrap.CreateTopicProgram
import hydra.core.http.CorsSupport
import hydra.kafka.model.TopicMetadataV2Request
import hydra.kafka.serializers.TopicMetadataV2Parser

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

final class BootstrapEndpointV2(createTopicProgram: CreateTopicProgram[IO])
(implicit val system: ActorSystem, implicit val e: ExecutionContext) extends CorsSupport {

import TopicMetadataV2Parser._

val route: Route = cors(settings) {
path("v2" / "streams") {
post {
pathEndOrSingleSlash {
entity(as[TopicMetadataV2Request]) { t =>
onComplete(createTopicProgram.createTopic(t.subject.value, t.schemas.key, t.schemas.value).unsafeToFuture()) {
case Success(_) => complete(StatusCodes.OK)
case Failure(e) => complete(StatusCodes.InternalServerError, e)
}
}
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import scalacache._
import scalacache.guava.GuavaCache
import scalacache.modes.scalaFuture._

import scala.collection.JavaConverters._
import scala.collection.immutable.Map
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import spray.json.{DefaultJsonProtocol, DeserializationException, JsArray, JsBoo
import scala.util.matching.Regex
import scala.util.{Failure, Success, Try}

trait TopicMetadataV2Parser extends SprayJsonSupport with DefaultJsonProtocol with TopicMetadataV2Validator {
object TopicMetadataV2Parser extends TopicMetadataV2Parser

sealed trait TopicMetadataV2Parser extends SprayJsonSupport with DefaultJsonProtocol with TopicMetadataV2Validator {
implicit object SubjectFormat extends RootJsonFormat[Subject] {
override def write(obj: Subject): JsValue = {
JsString(obj.value)
Expand Down
Loading

0 comments on commit b00079b

Please sign in to comment.