diff --git a/modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala b/modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala index fc8f59c4..1c3eb9cc 100644 --- a/modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala +++ b/modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala @@ -21,11 +21,10 @@ import scala.concurrent.duration._ import scala.util.Properties import cats.effect.IO -import cats.effect.std.Queue +import cats.effect.kernel.Resource import cats.syntax.all._ import com.dimafeng.testcontainers.GenericContainer -import com.dimafeng.testcontainers.munit.fixtures.TestContainersFixtures import com.permutive.common.types.gcp.http4s._ import fs2.Chunk import fs2.pubsub.dsl.client.PubSubClientStep @@ -39,7 +38,7 @@ import org.http4s.client.dsl.io._ import org.http4s.ember.client.EmberClientBuilder import org.testcontainers.containers.wait.strategy.Wait -class PubSubSuite extends CatsEffectSuite with TestContainersFixtures { +class PubSubSuite extends CatsEffectSuite { @nowarn("msg=deprecated") val options = @@ -109,81 +108,70 @@ class PubSubSuite extends CatsEffectSuite with TestContainersFixtures { // Fixtures // ////////////// - val projects = List.fill(8)("example-topic:example-subscription").zipWithIndex.map { case (topics, index) => - s"test-project-${index + 1},$topics" - } - - val projectsFixture = ResourceSuiteLocalFixture( - "Projects", - Queue - .unbounded[IO, ProjectId] - .flatTap(queue => projects.map(_.split(",").head).traverse(ProjectId.fromStringF[IO](_).flatMap(queue.offer))) - .toResource - ) - def afterProducing(constructor: PubSubClientStep[IO], records: Int, withAckDeadlineSeconds: Int = 10) = ResourceFunFixture { - IO(projectsFixture().take).flatten.toResource - .product(EmberClientBuilder.default[IO].withHttp2.build) - .evalTap { case (projectId, client) => - val body = Json.obj( - "subscription" := Json.obj( - "topic" := "example-topic", + val projectId = ProjectId("test-project") + + Resource.fromAutoCloseable(IO(container).flatTap(container => IO(container.start()))) >> + EmberClientBuilder + .default[IO] + .withHttp2 + .build + .evalTap { client => + val body = Json.obj( + "topic" := "projects/test-project/topics/example-topic", "ackDeadlineSeconds" := withAckDeadlineSeconds - ), - "updateMask" := "ackDeadlineSeconds" - ) - - val request = - PATCH(body, container.uri / "v1" / "projects" / projectId / "subscriptions" / "example-subscription") - - client.expect[Unit](request) - } - .map { case (projectId, client) => - val pubSubClient = constructor - .projectId(projectId) - .uri(container.uri) - .httpClient(client) - .noRetry - - val publisher = pubSubClient - .publisher[String] - .topic(Topic("example-topic")) - - val subscriber = pubSubClient.subscriber - .subscription(Subscription("example-subscription")) - .errorHandler { - case (PubSubSubscriber.Operation.Ack(_), t) => IO.println(t) - case (PubSubSubscriber.Operation.Nack(_), t) => IO.println(t) - case (PubSubSubscriber.Operation.Decode(record), t) => IO.println(t) >> record.ack - } - .withDefaults - .decodeTo[String] - .subscribe - - (publisher, subscriber) - } - .evalTap { - case (publisher, _) if records === 1 => publisher.publishOne("ping") - case (publisher, _) => publisher.publishMany(List.fill(records)(PubSubRecord.Publisher("ping"))) - } - ._2F + ) + + val requests = List( + PUT(container.uri / "v1" / "projects" / projectId / "topics" / "example-topic"), + PUT(body, container.uri / "v1" / "projects" / projectId / "subscriptions" / "example-subscription") + ) + + requests.traverse_(client.expect[Unit]) + } + .map { client => + val pubSubClient = constructor + .projectId(projectId) + .uri(container.uri) + .httpClient(client) + .noRetry + + val publisher = pubSubClient + .publisher[String] + .topic(Topic("example-topic")) + + val subscriber = pubSubClient.subscriber + .subscription(Subscription("example-subscription")) + .errorHandler { + case (PubSubSubscriber.Operation.Ack(_), t) => IO.println(t) + case (PubSubSubscriber.Operation.Nack(_), t) => IO.println(t) + case (PubSubSubscriber.Operation.Decode(record), t) => IO.println(t) >> record.ack + } + .withDefaults + .decodeTo[String] + .subscribe + + (publisher, subscriber) + } + .evalTap { + case (publisher, _) if records === 1 => publisher.publishOne("ping") + case (publisher, _) => publisher.publishMany(List.fill(records)(PubSubRecord.Publisher("ping"))) + } + ._2F } case object container extends GenericContainer( - "thekevjames/gcloud-pubsub-emulator:450.0.0", - exposedPorts = Seq(8681, 8682), - waitStrategy = Wait.forListeningPort().some, - env = projects.zipWithIndex.map { case (project, index) => s"PUBSUB_PROJECT${index + 1}" -> project }.toMap + "google/cloud-sdk:emulators", + command = "gcloud" :: "beta" :: "emulators" :: "pubsub" :: "start" :: "--project=test-project" + :: "--host-port=0.0.0.0:8085" :: Nil, + exposedPorts = Seq(8085), + waitStrategy = Wait.forLogMessage(".*Server started, listening on 8085.*", 1).some ) { - def uri = Uri.unsafeFromString(s"http://localhost:${mappedPort(8681)}") + def uri = Uri.unsafeFromString(s"http://localhost:${mappedPort(8085)}") } - val containerFixture = new ForAllContainerFixture[GenericContainer](container) - - override def munitFixtures = super.munitFixtures :+ projectsFixture :+ containerFixture - } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 45b5ec08..76a926af 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -15,8 +15,8 @@ object Dependencies { lazy val `http4s-grpc` = "io.chrisdavenport" %% "http4s-grpc" % "0.0.4" lazy val grpc = Seq( - "com.google.api.grpc" % "proto-google-cloud-pubsub-v1" % "1.108.6", - "com.google.api.grpc" % "proto-google-common-protos" % "2.39.1", + "com.google.api.grpc" % "proto-google-cloud-pubsub-v1" % "1.113.0", + "com.google.api.grpc" % "proto-google-common-protos" % "2.41.0", "com.google.protobuf" % "protobuf-java" % "3.25.3" ).map(_ % "protobuf-src" intransitive ()) ++ Seq( "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf" @@ -25,7 +25,7 @@ object Dependencies { lazy val `fs2-pubsub` = Seq( "co.fs2" %% "fs2-core" % "3.10.2", "com.permutive" %% "common-types-gcp-http4s" % "1.0.0", - "io.circe" %% "circe-parser" % "0.14.7", + "io.circe" %% "circe-parser" % "0.14.9", "org.http4s" %% "http4s-circe" % "0.23.27", "org.http4s" %% "http4s-client" % "0.23.27", "org.http4s" %% "http4s-dsl" % "0.23.27" @@ -38,7 +38,7 @@ object Dependencies { ).map(_ % Test) lazy val `fs2-pubsub-pureconfig` = Seq( - "com.github.pureconfig" %% "pureconfig-http4s" % "0.17.6", + "com.github.pureconfig" %% "pureconfig-http4s" % "0.17.7", "com.permutive" %% "common-types-gcp-pureconfig" % "1.0.0" ) diff --git a/project/build.properties b/project/build.properties index 04267b14..ee4c672c 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.9.9 +sbt.version=1.10.1 diff --git a/project/plugins.sbt b/project/plugins.sbt index 8f04a4e0..6ed83423 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -12,6 +12,6 @@ addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1. addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0") addSbtPlugin("ch.epfl.scala" % "sbt-version-policy" % "3.2.1") addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.1") -addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.5.2") +addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.5.4") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2") addSbtPlugin("io.chrisdavenport" % "sbt-http4s-grpc" % "0.0.4")