Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dependency Updates #539

Merged
merged 11 commits into from
Jul 23, 2024
124 changes: 56 additions & 68 deletions modules/fs2-pubsub/src/test/scala/fs2/pubsub/PubSubSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import scala.concurrent.duration._
import scala.util.Properties

import cats.effect.IO
import cats.effect.std.Queue
import cats.effect.kernel.Resource
import cats.syntax.all._

import com.dimafeng.testcontainers.GenericContainer
import com.dimafeng.testcontainers.munit.fixtures.TestContainersFixtures
import com.permutive.common.types.gcp.http4s._
import fs2.Chunk
import fs2.pubsub.dsl.client.PubSubClientStep
Expand All @@ -39,7 +38,7 @@ import org.http4s.client.dsl.io._
import org.http4s.ember.client.EmberClientBuilder
import org.testcontainers.containers.wait.strategy.Wait

class PubSubSuite extends CatsEffectSuite with TestContainersFixtures {
class PubSubSuite extends CatsEffectSuite {

@nowarn("msg=deprecated")
val options =
Expand Down Expand Up @@ -109,81 +108,70 @@ class PubSubSuite extends CatsEffectSuite with TestContainersFixtures {
// Fixtures //
//////////////

val projects = List.fill(8)("example-topic:example-subscription").zipWithIndex.map { case (topics, index) =>
s"test-project-${index + 1},$topics"
}

val projectsFixture = ResourceSuiteLocalFixture(
"Projects",
Queue
.unbounded[IO, ProjectId]
.flatTap(queue => projects.map(_.split(",").head).traverse(ProjectId.fromStringF[IO](_).flatMap(queue.offer)))
.toResource
)

def afterProducing(constructor: PubSubClientStep[IO], records: Int, withAckDeadlineSeconds: Int = 10) =
ResourceFunFixture {
IO(projectsFixture().take).flatten.toResource
.product(EmberClientBuilder.default[IO].withHttp2.build)
.evalTap { case (projectId, client) =>
val body = Json.obj(
"subscription" := Json.obj(
"topic" := "example-topic",
val projectId = ProjectId("test-project")

Resource.fromAutoCloseable(IO(container).flatTap(container => IO(container.start()))) >>
EmberClientBuilder
.default[IO]
.withHttp2
.build
.evalTap { client =>
val body = Json.obj(
"topic" := "projects/test-project/topics/example-topic",
"ackDeadlineSeconds" := withAckDeadlineSeconds
),
"updateMask" := "ackDeadlineSeconds"
)

val request =
PATCH(body, container.uri / "v1" / "projects" / projectId / "subscriptions" / "example-subscription")

client.expect[Unit](request)
}
.map { case (projectId, client) =>
val pubSubClient = constructor
.projectId(projectId)
.uri(container.uri)
.httpClient(client)
.noRetry

val publisher = pubSubClient
.publisher[String]
.topic(Topic("example-topic"))

val subscriber = pubSubClient.subscriber
.subscription(Subscription("example-subscription"))
.errorHandler {
case (PubSubSubscriber.Operation.Ack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Nack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Decode(record), t) => IO.println(t) >> record.ack
}
.withDefaults
.decodeTo[String]
.subscribe

(publisher, subscriber)
}
.evalTap {
case (publisher, _) if records === 1 => publisher.publishOne("ping")
case (publisher, _) => publisher.publishMany(List.fill(records)(PubSubRecord.Publisher("ping")))
}
._2F
)

val requests = List(
PUT(container.uri / "v1" / "projects" / projectId / "topics" / "example-topic"),
PUT(body, container.uri / "v1" / "projects" / projectId / "subscriptions" / "example-subscription")
)

requests.traverse_(client.expect[Unit])
}
.map { client =>
val pubSubClient = constructor
.projectId(projectId)
.uri(container.uri)
.httpClient(client)
.noRetry

val publisher = pubSubClient
.publisher[String]
.topic(Topic("example-topic"))

val subscriber = pubSubClient.subscriber
.subscription(Subscription("example-subscription"))
.errorHandler {
case (PubSubSubscriber.Operation.Ack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Nack(_), t) => IO.println(t)
case (PubSubSubscriber.Operation.Decode(record), t) => IO.println(t) >> record.ack
}
.withDefaults
.decodeTo[String]
.subscribe

(publisher, subscriber)
}
.evalTap {
case (publisher, _) if records === 1 => publisher.publishOne("ping")
case (publisher, _) => publisher.publishMany(List.fill(records)(PubSubRecord.Publisher("ping")))
}
._2F
}

case object container
extends GenericContainer(
"thekevjames/gcloud-pubsub-emulator:450.0.0",
exposedPorts = Seq(8681, 8682),
waitStrategy = Wait.forListeningPort().some,
env = projects.zipWithIndex.map { case (project, index) => s"PUBSUB_PROJECT${index + 1}" -> project }.toMap
"google/cloud-sdk:emulators",
command = "gcloud" :: "beta" :: "emulators" :: "pubsub" :: "start" :: "--project=test-project"
:: "--host-port=0.0.0.0:8085" :: Nil,
exposedPorts = Seq(8085),
waitStrategy = Wait.forLogMessage(".*Server started, listening on 8085.*", 1).some
) {

def uri = Uri.unsafeFromString(s"http://localhost:${mappedPort(8681)}")
def uri = Uri.unsafeFromString(s"http://localhost:${mappedPort(8085)}")

}

val containerFixture = new ForAllContainerFixture[GenericContainer](container)

override def munitFixtures = super.munitFixtures :+ projectsFixture :+ containerFixture

}
8 changes: 4 additions & 4 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ object Dependencies {
lazy val `http4s-grpc` = "io.chrisdavenport" %% "http4s-grpc" % "0.0.4"

lazy val grpc = Seq(
"com.google.api.grpc" % "proto-google-cloud-pubsub-v1" % "1.108.6",
"com.google.api.grpc" % "proto-google-common-protos" % "2.39.1",
"com.google.api.grpc" % "proto-google-cloud-pubsub-v1" % "1.113.0",
"com.google.api.grpc" % "proto-google-common-protos" % "2.41.0",
"com.google.protobuf" % "protobuf-java" % "3.25.3"
).map(_ % "protobuf-src" intransitive ()) ++ Seq(
"com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf"
Expand All @@ -25,7 +25,7 @@ object Dependencies {
lazy val `fs2-pubsub` = Seq(
"co.fs2" %% "fs2-core" % "3.10.2",
"com.permutive" %% "common-types-gcp-http4s" % "1.0.0",
"io.circe" %% "circe-parser" % "0.14.7",
"io.circe" %% "circe-parser" % "0.14.9",
"org.http4s" %% "http4s-circe" % "0.23.27",
"org.http4s" %% "http4s-client" % "0.23.27",
"org.http4s" %% "http4s-dsl" % "0.23.27"
Expand All @@ -38,7 +38,7 @@ object Dependencies {
).map(_ % Test)

lazy val `fs2-pubsub-pureconfig` = Seq(
"com.github.pureconfig" %% "pureconfig-http4s" % "0.17.6",
"com.github.pureconfig" %% "pureconfig-http4s" % "0.17.7",
"com.permutive" %% "common-types-gcp-pureconfig" % "1.0.0"
)

Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.9.9
sbt.version=1.10.1
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0")
addSbtPlugin("ch.epfl.scala" % "sbt-version-policy" % "3.2.1")
addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.1")
addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.5.2")
addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.5.4")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2")
addSbtPlugin("io.chrisdavenport" % "sbt-http4s-grpc" % "0.0.4")