Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/create stream v2 endpoint #254

Merged
merged 15 commits into from
Feb 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import akka.actor.Props
* A trait that can be extended by classes providing services (actors) to be loaded during
* boot time for Hydra.
*
* The [[hydra.core.bootstrap.BootstrappingSupport]] trait scans the classpath for objects that extend this interface
* The [[BootstrappingSupport]] trait scans the classpath for objects that extend this interface
* and adds the services provided to the list of services to be managed by Hydra.
*
*/
Expand Down
2 changes: 2 additions & 0 deletions ingest/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ akka {
}

hydra {
v2.create-topic.enabled = false
v2.create-topic.enabled = ${?HYDRA_V2_CREATE_TOPIC_ENABLED}
monitoring.prometheus {
enable = false
enable = ${?MONITORING_PROMETHEUS_ENABLE}
Expand Down
4 changes: 3 additions & 1 deletion ingest/src/main/scala/hydra.ingest/app/Main.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package hydra.ingest.app

import akka.actor.ActorSystem
import cats.effect.{ExitCode, IO, IOApp}
import cats.implicits._
import configs.syntax._
import hydra.common.logging.LoggingAdapter
import hydra.core.bootstrap.BootstrappingSupport
import hydra.ingest.bootstrap.BootstrappingSupport
import hydra.ingest.modules.{Algebras, Bootstrap, Programs}
import io.chrisdavenport.log4cats.SelfAwareStructuredLogger
import io.chrisdavenport.log4cats.slf4j.Slf4jLogger
Expand All @@ -16,6 +17,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
// $COVERAGE-OFF$Disabling highlighting by default until a workaround for https://issues.scala-lang.org/browse/SI-8596 is found
object Main extends IOApp with BootstrappingSupport with LoggingAdapter {

private implicit val system: ActorSystem = containerService.system
private implicit val catsLogger: SelfAwareStructuredLogger[IO] = Slf4jLogger.getLogger[IO]

private val oldBoostrap = IO(try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,66 @@
package hydra.core.bootstrap
package hydra.ingest.bootstrap

import java.lang.reflect.Modifier

import akka.actor.Props
import akka.actor.{ActorSystem, Props}
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.directives.RouteDirectives
import cats.effect.{IO, Timer}
import com.github.vonnagy.service.container.ContainerBuilder
import com.github.vonnagy.service.container.http.routing.RoutedEndpoints
import com.github.vonnagy.service.container.listener.ContainerLifecycleListener
import com.github.vonnagy.service.container.service.ContainerService
import com.pluralsight.hydra.reflect.DoNotScan
import com.typesafe.config.ConfigFactory
import hydra.avro.registry.SchemaRegistry
import hydra.common.config.ConfigSupport
import hydra.common.logging.LoggingAdapter
import hydra.common.reflect.{ComponentInstantiator, ReflectionUtils}
import hydra.core.bootstrap.{CreateTopicProgram, ReflectionsWrapper, ServiceProvider}
import hydra.kafka.endpoints.BootstrapEndpointV2
import io.chrisdavenport.log4cats.Logger
import io.chrisdavenport.log4cats.slf4j.Slf4jLogger
import retry.{RetryPolicies, RetryPolicy}

import scala.concurrent.ExecutionContext
import scala.util.Try

class BootstrapEndpoints(implicit val system: ActorSystem, implicit val ec: ExecutionContext) extends RoutedEndpoints {

private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
private implicit val logger: Logger[IO] = Slf4jLogger.getLogger

private val schemaRegistryUrl = ConfigFactory.load().getString("hydra.schema.registry.url")

private val schemaRegistry = SchemaRegistry.live[IO](schemaRegistryUrl, 100).unsafeRunSync()

private val isBootstrapV2Enabled = ConfigFactory.load().getBoolean("hydra.v2.create-topic.enabled")

private val bootstrapV2Endpoint = {
if (isBootstrapV2Enabled) {
val retryPolicy: RetryPolicy[IO] = RetryPolicies.alwaysGiveUp
new BootstrapEndpointV2(new CreateTopicProgram[IO](schemaRegistry, retryPolicy)).route
} else {
RouteDirectives.reject
}
}

override def route: Route = bootstrapV2Endpoint
}

trait BootstrappingSupport extends ConfigSupport with LoggingAdapter {

import ReflectionsWrapper._

import scala.collection.JavaConverters._
import scala.util.control.Exception._

private def scanFor[T](clazz: Class[T]): Seq[Class[_ <: T]] = {
reflections.getSubTypesOf(clazz)
.asScala
.filterNot(c => Modifier.isAbstract(c.getModifiers))
.filterNot(c => c.isAnnotationPresent(classOf[DoNotScan])).toSeq
}

private val exceptionLogger = handling(classOf[Exception]) by { ex =>
log.error("Could not instantiate class.", ex); None
Expand All @@ -39,13 +79,6 @@ trait BootstrappingSupport extends ConfigSupport with LoggingAdapter {
exceptionLogger(Some(ComponentInstantiator.instantiate(clz, List(applicationConfig)).get))
}

private def scanFor[T](clazz: Class[T]): Seq[Class[_ <: T]] = {
reflections.getSubTypesOf(clazz)
.asScala
.filterNot(c => Modifier.isAbstract(c.getModifiers))
.filterNot(c => c.isAnnotationPresent(classOf[DoNotScan])).toSeq
}

def containerService: ContainerService = {
log.info(s"The following services will be started: ${services.map(_._1).mkString(", ")}")
ContainerBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package hydra.ingest.http
import akka.actor._
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.server.{ExceptionHandler, Rejection, Route}
import akka.stream.ActorMaterializer
import com.github.vonnagy.service.container.http.routing.RoutedEndpoints
import configs.syntax._
import hydra.common.logging.LoggingAdapter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import hydra.common.config.ConfigSupport
import hydra.common.logging.LoggingAdapter
import hydra.core.http.HydraDirectives
import hydra.ingest.bootstrap.HydraIngestorRegistryClient
import hydra.ingest.services.IngestorRegistry.{ FindAll, LookupResult }
import hydra.ingest.services.IngestorRegistry.{FindAll, LookupResult}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.concurrent.duration.{FiniteDuration, _}

/**
* Created by alexsilva on 12/22/15.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import akka.http.scaladsl.server.{ExceptionHandler, Route}
import akka.pattern.ask
import akka.util.Timeout
import ch.megard.akka.http.cors.scaladsl.CorsDirectives._
import com.github.vonnagy.service.container.http.routing.RoutedEndpoints
import hydra.avro.resource.SchemaResource
import hydra.common.config.ConfigSupport
import hydra.common.logging.LoggingAdapter
Expand All @@ -33,6 +32,8 @@ import hydra.core.http.CorsSupport
import hydra.core.marshallers.{GenericServiceResponse, HydraJsonSupport}
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException
import org.apache.avro.SchemaParseException
import akka.http.scaladsl.server.Directives._
import com.github.vonnagy.service.container.http.routing.RoutedEndpoints

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,62 @@
package hydra.core.bootstrap
package hydra.ingest.bootstrap

import akka.actor.{ActorSystem, Props}
import akka.http.scaladsl.server.Route
import akka.testkit.TestKit
import com.github.vonnagy.service.container.http.routing.RoutedEndpoints
import com.github.vonnagy.service.container.listener.ContainerLifecycleListener
import com.github.vonnagy.service.container.service.ContainerService
import hydra.core.ingest.TestIngestorDefault
import hydra.core.bootstrap.ServiceProvider
import hydra.core.ingest.{HydraRequest, Ingestor}
import hydra.core.transport.{AckStrategy, HydraRecord, RecordFactory}
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}

import scala.concurrent.ExecutionContext
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._

class TestIngestorDefault extends Ingestor {


/**
* This will _not_ override; instead it will use the default value of 1.second. We'll test it.
*/
override val initTimeout = 2.millisecond

val to = context.receiveTimeout

ingest {
case "hello" => sender ! "hi!"
case "timeout" => sender ! to
}

override val recordFactory = TestRecordFactory
}

object TestRecordFactory extends RecordFactory[String, String] {
override def build(r: HydraRequest)(implicit ec: ExecutionContext) = {
val timeout = r.metadataValueEquals("timeout", "true")
if (timeout) {
Future.successful(TimeoutRecord("test-topic", r.correlationId.toString, r.payload,
r.ackStrategy))
}
else {
Future.successful(TestRecord("test-topic", r.correlationId.toString, r.payload,
r.ackStrategy))
}
}
}

case class TestRecord(destination: String,
key: String,
payload: String,
ackStrategy: AckStrategy) extends HydraRecord[String, String]


case class TimeoutRecord(destination: String,
key: String,
payload: String,
ackStrategy: AckStrategy) extends HydraRecord[String, String]

/**
* Created by alexsilva on 3/7/17.
*/
class BootstrappingSupportSpec extends Matchers with FlatSpecLike with BootstrappingSupport with BeforeAndAfterAll {

val conf =
Expand All @@ -40,7 +83,7 @@ class BootstrappingSupportSpec extends Matchers with FlatSpecLike with Bootstrap

"The BootstrappingSupport trait" should
"load endpoints" in {
endpoints shouldBe Seq(classOf[DummyEndpoint])
endpoints should contain (classOf[DummyEndpoint])
}

it should "load listeners" in {
Expand All @@ -60,7 +103,6 @@ class BootstrappingSupportSpec extends Matchers with FlatSpecLike with Bootstrap
Seq("test" -> Props[TestIngestorDefault], "test2" -> Props[TestIngestorDefault])
, Seq(new DummyListener), "hydra_test")(container.system)
csvc.name shouldBe container.name
csvc.registeredRoutes shouldBe container.registeredRoutes
csvc.listeners.map(_.getClass) should contain(classOf[DummyListener])
csvc.name shouldBe container.name
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,22 @@ package hydra.ingest.http.mock

import akka.actor.ActorSystem
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{ExceptionHandler, Route}
import com.github.vonnagy.service.container.http.routing.RoutedEndpoints
import hydra.ingest.http.SchemasEndpoint
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException

import scala.concurrent.{ExecutionContext, Future}

class MockEndpoint(implicit system: ActorSystem, implicit val e: ExecutionContext)
extends RoutedEndpoints {
class MockEndpoint(implicit system: ActorSystem, implicit val e: ExecutionContext) {

def throwRestClientException(statusCode: Int, errorCode: Int, errorMessage: String): Future[Any] = {
throw new RestClientException(errorMessage, statusCode, errorCode)
}

val schemaRouteExceptionHandler: ExceptionHandler = new SchemasEndpoint().excptHandler

override def route: Route = {
def route: Route = {
pathPrefix("throwRestClientException") {
handleExceptions(schemaRouteExceptionHandler) {
get {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,14 @@ import akka.actor._
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Route
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.util.Timeout
import ch.megard.akka.http.cors.scaladsl.CorsDirectives._
import com.github.vonnagy.service.container.http.routing.RoutedEndpoints
import hydra.avro.registry.ConfluentSchemaRegistry
import hydra.common.logging.LoggingAdapter
import hydra.core.akka.SchemaRegistryActor
import hydra.core.http.{CorsSupport, HydraDirectives}
import hydra.core.marshallers.TopicMetadataRequest
import hydra.kafka.model.TopicMetadataAdapter
import hydra.kafka.services.TopicBootstrapActor._
import hydra.kafka.services.{StreamsManagerActor, TopicBootstrapActor}
import hydra.kafka.util.KafkaUtils

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (C) 2016 Pluralsight, LLC.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package hydra.kafka.endpoints

import akka.actor.ActorSystem
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import cats.effect.IO
import ch.megard.akka.http.cors.scaladsl.CorsDirectives.cors
import hydra.core.bootstrap.CreateTopicProgram
import hydra.core.http.CorsSupport
import hydra.kafka.model.TopicMetadataV2Request
import hydra.kafka.serializers.TopicMetadataV2Parser

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

final class BootstrapEndpointV2(createTopicProgram: CreateTopicProgram[IO])
(implicit val system: ActorSystem, implicit val e: ExecutionContext) extends CorsSupport {

import TopicMetadataV2Parser._

val route: Route = cors(settings) {
path("v2" / "streams") {
post {
pathEndOrSingleSlash {
entity(as[TopicMetadataV2Request]) { t =>
onComplete(createTopicProgram.createTopic(t.subject.value, t.schemas.key, t.schemas.value).unsafeToFuture()) {
case Success(_) => complete(StatusCodes.OK)
case Failure(e) => complete(StatusCodes.InternalServerError, e)
}
}
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import scalacache._
import scalacache.guava.GuavaCache
import scalacache.modes.scalaFuture._

import scala.collection.JavaConverters._
import scala.collection.immutable.Map
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import spray.json.{DefaultJsonProtocol, DeserializationException, JsArray, JsBoo
import scala.util.matching.Regex
import scala.util.{Failure, Success, Try}

trait TopicMetadataV2Parser extends SprayJsonSupport with DefaultJsonProtocol with TopicMetadataV2Validator {
object TopicMetadataV2Parser extends TopicMetadataV2Parser

sealed trait TopicMetadataV2Parser extends SprayJsonSupport with DefaultJsonProtocol with TopicMetadataV2Validator {
implicit object SubjectFormat extends RootJsonFormat[Subject] {
override def write(obj: Subject): JsValue = {
JsString(obj.value)
Expand Down
Loading