From e04b542ef5f76f657c1def7fa5204da9cb3286f5 Mon Sep 17 00:00:00 2001 From: Gavin Bisesi Date: Tue, 5 Sep 2023 12:50:08 -0400 Subject: [PATCH 1/2] Fix incorrect CompletableFuture integration The cancellation logic is much more complex than the previous implementation handled This will work correctly with the changes to Async in CE 3.5+ --- project/TaglessGen.scala | 37 ++++--------------- .../pure/cloudwatch/tagless/Interpreter.scala | 30 ++------------- .../pure/dynamodb/tagless/Interpreter.scala | 30 ++------------- .../pure/kinesis/tagless/Interpreter.scala | 30 ++------------- .../pure/s3/tagless/Interpreter.scala | 30 ++------------- .../pure/sns/tagless/Interpreter.scala | 30 ++------------- .../pure/sqs/tagless/Interpreter.scala | 30 ++------------- 7 files changed, 31 insertions(+), 186 deletions(-) diff --git a/project/TaglessGen.scala b/project/TaglessGen.scala index 4c7a0abd..6e8703fa 100644 --- a/project/TaglessGen.scala +++ b/project/TaglessGen.scala @@ -366,38 +366,15 @@ class TaglessGen( | | ${managed.map(interpreterDef).mkString("\n ")} | // Some methods are common to all interpreters and can be overridden to change behavior globally. - | - | def primitive[J, A](f: J => A): Kleisli[M, J, A] = Kleisli(a => asyncM.blocking(f(a))) - | def primitive1[J, A](f: =>A): M[A] = asyncM.blocking(f) | - | def eff[J, A](fut: J => CompletableFuture[A]): Kleisli[M, J, A] = Kleisli { a => - | asyncM.async_ { cb => - | fut(a).handle[Unit] { (a, x) => - | if (a == null) - | x match { - | case t: CompletionException => cb(Left(t.getCause)) - | case t => cb(Left(t)) - | } - | else - | cb(Right(a)) - | } - | () - | } - | } - | def eff1[J, A](fut: =>CompletableFuture[A]): M[A] = - | asyncM.async_ { cb => - | fut.handle[Unit] { (a, x) => - | if (a == null) - | x match { - | case t: CompletionException => cb(Left(t.getCause)) - | case t => cb(Left(t)) - | } - | else - | cb(Right(a)) - | } - | () - | } + | def primitive[J, A](f: J => A): Kleisli[M, J, A] = Kleisli(j => asyncM.blocking(f(j))) + | def primitive1[J, A](f: => A): M[A] = asyncM.blocking(f) | + | def eff[J, A](fut: J => CompletableFuture[A]): Kleisli[M, J, A] = Kleisli { j => + | asyncM.fromCompletableFuture(asyncM.delay(fut(j))) + | } + | def eff1[J, A](fut: => CompletableFuture[A]): M[A] = + | asyncM.fromCompletableFuture(asyncM.delay(fut)) | | // Interpreters |${managed.map(ClassTag(_)).map(kleisliInterp(_)).mkString("\n")} diff --git a/pure-aws/pure-cloudwatch-tagless/src/main/scala/io/laserdisc/pure/cloudwatch/tagless/Interpreter.scala b/pure-aws/pure-cloudwatch-tagless/src/main/scala/io/laserdisc/pure/cloudwatch/tagless/Interpreter.scala index 486d0069..a08e59da 100644 --- a/pure-aws/pure-cloudwatch-tagless/src/main/scala/io/laserdisc/pure/cloudwatch/tagless/Interpreter.scala +++ b/pure-aws/pure-cloudwatch-tagless/src/main/scala/io/laserdisc/pure/cloudwatch/tagless/Interpreter.scala @@ -32,36 +32,14 @@ trait Interpreter[M[_]] { outer => lazy val CloudWatchAsyncClientInterpreter: CloudWatchAsyncClientInterpreter = new CloudWatchAsyncClientInterpreter {} // Some methods are common to all interpreters and can be overridden to change behavior globally. - def primitive[J, A](f: J => A): Kleisli[M, J, A] = Kleisli(a => asyncM.blocking(f(a))) + def primitive[J, A](f: J => A): Kleisli[M, J, A] = Kleisli(j => asyncM.blocking(f(j))) def primitive1[J, A](f: => A): M[A] = asyncM.blocking(f) - def eff[J, A](fut: J => CompletableFuture[A]): Kleisli[M, J, A] = Kleisli { a => - asyncM.async_ { cb => - fut(a).handle[Unit] { (a, x) => - if (a == null) - x match { - case t: CompletionException => cb(Left(t.getCause)) - case t => cb(Left(t)) - } - else - cb(Right(a)) - } - () - } + def eff[J, A](fut: J => CompletableFuture[A]): Kleisli[M, J, A] = Kleisli { j => + asyncM.fromCompletableFuture(asyncM.delay(fut(j))) } def eff1[J, A](fut: => CompletableFuture[A]): M[A] = - asyncM.async_ { cb => - fut.handle[Unit] { (a, x) => - if (a == null) - x match { - case t: CompletionException => cb(Left(t.getCause)) - case t => cb(Left(t)) - } - else - cb(Right(a)) - } - () - } + asyncM.fromCompletableFuture(asyncM.delay(fut)) // Interpreters trait CloudWatchAsyncClientInterpreter extends CloudWatchAsyncClientOp[Kleisli[M, CloudWatchAsyncClient, *]] { diff --git a/pure-aws/pure-dynamodb-tagless/src/main/scala/io/laserdisc/pure/dynamodb/tagless/Interpreter.scala b/pure-aws/pure-dynamodb-tagless/src/main/scala/io/laserdisc/pure/dynamodb/tagless/Interpreter.scala index 77cde7e9..4dfa1ae9 100644 --- a/pure-aws/pure-dynamodb-tagless/src/main/scala/io/laserdisc/pure/dynamodb/tagless/Interpreter.scala +++ b/pure-aws/pure-dynamodb-tagless/src/main/scala/io/laserdisc/pure/dynamodb/tagless/Interpreter.scala @@ -32,36 +32,14 @@ trait Interpreter[M[_]] { outer => lazy val DynamoDbAsyncClientInterpreter: DynamoDbAsyncClientInterpreter = new DynamoDbAsyncClientInterpreter {} // Some methods are common to all interpreters and can be overridden to change behavior globally. - def primitive[J, A](f: J => A): Kleisli[M, J, A] = Kleisli(a => asyncM.blocking(f(a))) + def primitive[J, A](f: J => A): Kleisli[M, J, A] = Kleisli(j => asyncM.blocking(f(j))) def primitive1[J, A](f: => A): M[A] = asyncM.blocking(f) - def eff[J, A](fut: J => CompletableFuture[A]): Kleisli[M, J, A] = Kleisli { a => - asyncM.async_ { cb => - fut(a).handle[Unit] { (a, x) => - if (a == null) - x match { - case t: CompletionException => cb(Left(t.getCause)) - case t => cb(Left(t)) - } - else - cb(Right(a)) - } - () - } + def eff[J, A](fut: J => CompletableFuture[A]): Kleisli[M, J, A] = Kleisli { j => + asyncM.fromCompletableFuture(asyncM.delay(fut(j))) } def eff1[J, A](fut: => CompletableFuture[A]): M[A] = - asyncM.async_ { cb => - fut.handle[Unit] { (a, x) => - if (a == null) - x match { - case t: CompletionException => cb(Left(t.getCause)) - case t => cb(Left(t)) - } - else - cb(Right(a)) - } - () - } + asyncM.fromCompletableFuture(asyncM.delay(fut)) // Interpreters trait DynamoDbAsyncClientInterpreter extends DynamoDbAsyncClientOp[Kleisli[M, DynamoDbAsyncClient, *]] { diff --git a/pure-aws/pure-kinesis-tagless/src/main/scala/io/laserdisc/pure/kinesis/tagless/Interpreter.scala b/pure-aws/pure-kinesis-tagless/src/main/scala/io/laserdisc/pure/kinesis/tagless/Interpreter.scala index 595e71a9..7460cbf9 100644 --- a/pure-aws/pure-kinesis-tagless/src/main/scala/io/laserdisc/pure/kinesis/tagless/Interpreter.scala +++ b/pure-aws/pure-kinesis-tagless/src/main/scala/io/laserdisc/pure/kinesis/tagless/Interpreter.scala @@ -32,36 +32,14 @@ trait Interpreter[M[_]] { outer => lazy val KinesisAsyncClientInterpreter: KinesisAsyncClientInterpreter = new KinesisAsyncClientInterpreter {} // Some methods are common to all interpreters and can be overridden to change behavior globally. - def primitive[J, A](f: J => A): Kleisli[M, J, A] = Kleisli(a => asyncM.blocking(f(a))) + def primitive[J, A](f: J => A): Kleisli[M, J, A] = Kleisli(j => asyncM.blocking(f(j))) def primitive1[J, A](f: => A): M[A] = asyncM.blocking(f) - def eff[J, A](fut: J => CompletableFuture[A]): Kleisli[M, J, A] = Kleisli { a => - asyncM.async_ { cb => - fut(a).handle[Unit] { (a, x) => - if (a == null) - x match { - case t: CompletionException => cb(Left(t.getCause)) - case t => cb(Left(t)) - } - else - cb(Right(a)) - } - () - } + def eff[J, A](fut: J => CompletableFuture[A]): Kleisli[M, J, A] = Kleisli { j => + asyncM.fromCompletableFuture(asyncM.delay(fut(j))) } def eff1[J, A](fut: => CompletableFuture[A]): M[A] = - asyncM.async_ { cb => - fut.handle[Unit] { (a, x) => - if (a == null) - x match { - case t: CompletionException => cb(Left(t.getCause)) - case t => cb(Left(t)) - } - else - cb(Right(a)) - } - () - } + asyncM.fromCompletableFuture(asyncM.delay(fut)) // Interpreters trait KinesisAsyncClientInterpreter extends KinesisAsyncClientOp[Kleisli[M, KinesisAsyncClient, *]] { diff --git a/pure-aws/pure-s3-tagless/src/main/scala/io/laserdisc/pure/s3/tagless/Interpreter.scala b/pure-aws/pure-s3-tagless/src/main/scala/io/laserdisc/pure/s3/tagless/Interpreter.scala index 6e92b985..1e941e1b 100644 --- a/pure-aws/pure-s3-tagless/src/main/scala/io/laserdisc/pure/s3/tagless/Interpreter.scala +++ b/pure-aws/pure-s3-tagless/src/main/scala/io/laserdisc/pure/s3/tagless/Interpreter.scala @@ -34,36 +34,14 @@ trait Interpreter[M[_]] { outer => lazy val S3AsyncClientInterpreter: S3AsyncClientInterpreter = new S3AsyncClientInterpreter {} // Some methods are common to all interpreters and can be overridden to change behavior globally. - def primitive[J, A](f: J => A): Kleisli[M, J, A] = Kleisli(a => asyncM.blocking(f(a))) + def primitive[J, A](f: J => A): Kleisli[M, J, A] = Kleisli(j => asyncM.blocking(f(j))) def primitive1[J, A](f: => A): M[A] = asyncM.blocking(f) - def eff[J, A](fut: J => CompletableFuture[A]): Kleisli[M, J, A] = Kleisli { a => - asyncM.async_ { cb => - fut(a).handle[Unit] { (a, x) => - if (a == null) - x match { - case t: CompletionException => cb(Left(t.getCause)) - case t => cb(Left(t)) - } - else - cb(Right(a)) - } - () - } + def eff[J, A](fut: J => CompletableFuture[A]): Kleisli[M, J, A] = Kleisli { j => + asyncM.fromCompletableFuture(asyncM.delay(fut(j))) } def eff1[J, A](fut: => CompletableFuture[A]): M[A] = - asyncM.async_ { cb => - fut.handle[Unit] { (a, x) => - if (a == null) - x match { - case t: CompletionException => cb(Left(t.getCause)) - case t => cb(Left(t)) - } - else - cb(Right(a)) - } - () - } + asyncM.fromCompletableFuture(asyncM.delay(fut)) // Interpreters trait S3AsyncClientInterpreter extends S3AsyncClientOp[Kleisli[M, S3AsyncClient, *]] { diff --git a/pure-aws/pure-sns-tagless/src/main/scala/io/laserdisc/pure/sns/tagless/Interpreter.scala b/pure-aws/pure-sns-tagless/src/main/scala/io/laserdisc/pure/sns/tagless/Interpreter.scala index d25b05a0..3b62b666 100644 --- a/pure-aws/pure-sns-tagless/src/main/scala/io/laserdisc/pure/sns/tagless/Interpreter.scala +++ b/pure-aws/pure-sns-tagless/src/main/scala/io/laserdisc/pure/sns/tagless/Interpreter.scala @@ -32,36 +32,14 @@ trait Interpreter[M[_]] { outer => lazy val SnsAsyncClientInterpreter: SnsAsyncClientInterpreter = new SnsAsyncClientInterpreter {} // Some methods are common to all interpreters and can be overridden to change behavior globally. - def primitive[J, A](f: J => A): Kleisli[M, J, A] = Kleisli(a => asyncM.blocking(f(a))) + def primitive[J, A](f: J => A): Kleisli[M, J, A] = Kleisli(j => asyncM.blocking(f(j))) def primitive1[J, A](f: => A): M[A] = asyncM.blocking(f) - def eff[J, A](fut: J => CompletableFuture[A]): Kleisli[M, J, A] = Kleisli { a => - asyncM.async_ { cb => - fut(a).handle[Unit] { (a, x) => - if (a == null) - x match { - case t: CompletionException => cb(Left(t.getCause)) - case t => cb(Left(t)) - } - else - cb(Right(a)) - } - () - } + def eff[J, A](fut: J => CompletableFuture[A]): Kleisli[M, J, A] = Kleisli { j => + asyncM.fromCompletableFuture(asyncM.delay(fut(j))) } def eff1[J, A](fut: => CompletableFuture[A]): M[A] = - asyncM.async_ { cb => - fut.handle[Unit] { (a, x) => - if (a == null) - x match { - case t: CompletionException => cb(Left(t.getCause)) - case t => cb(Left(t)) - } - else - cb(Right(a)) - } - () - } + asyncM.fromCompletableFuture(asyncM.delay(fut)) // Interpreters trait SnsAsyncClientInterpreter extends SnsAsyncClientOp[Kleisli[M, SnsAsyncClient, *]] { diff --git a/pure-aws/pure-sqs-tagless/src/main/scala/io/laserdisc/pure/sqs/tagless/Interpreter.scala b/pure-aws/pure-sqs-tagless/src/main/scala/io/laserdisc/pure/sqs/tagless/Interpreter.scala index bcbaeb01..f3dac0fd 100644 --- a/pure-aws/pure-sqs-tagless/src/main/scala/io/laserdisc/pure/sqs/tagless/Interpreter.scala +++ b/pure-aws/pure-sqs-tagless/src/main/scala/io/laserdisc/pure/sqs/tagless/Interpreter.scala @@ -32,36 +32,14 @@ trait Interpreter[M[_]] { outer => lazy val SqsAsyncClientInterpreter: SqsAsyncClientInterpreter = new SqsAsyncClientInterpreter {} // Some methods are common to all interpreters and can be overridden to change behavior globally. - def primitive[J, A](f: J => A): Kleisli[M, J, A] = Kleisli(a => asyncM.blocking(f(a))) + def primitive[J, A](f: J => A): Kleisli[M, J, A] = Kleisli(j => asyncM.blocking(f(j))) def primitive1[J, A](f: => A): M[A] = asyncM.blocking(f) - def eff[J, A](fut: J => CompletableFuture[A]): Kleisli[M, J, A] = Kleisli { a => - asyncM.async_ { cb => - fut(a).handle[Unit] { (a, x) => - if (a == null) - x match { - case t: CompletionException => cb(Left(t.getCause)) - case t => cb(Left(t)) - } - else - cb(Right(a)) - } - () - } + def eff[J, A](fut: J => CompletableFuture[A]): Kleisli[M, J, A] = Kleisli { j => + asyncM.fromCompletableFuture(asyncM.delay(fut(j))) } def eff1[J, A](fut: => CompletableFuture[A]): M[A] = - asyncM.async_ { cb => - fut.handle[Unit] { (a, x) => - if (a == null) - x match { - case t: CompletionException => cb(Left(t.getCause)) - case t => cb(Left(t)) - } - else - cb(Right(a)) - } - () - } + asyncM.fromCompletableFuture(asyncM.delay(fut)) // Interpreters trait SqsAsyncClientInterpreter extends SqsAsyncClientOp[Kleisli[M, SqsAsyncClient, *]] { From 6193d06cad1ec411bb5f1c0eb0d827993fd418bf Mon Sep 17 00:00:00 2001 From: "dmytro.obodowsky" Date: Thu, 21 Sep 2023 14:09:36 -0500 Subject: [PATCH 2/2] fix branch build --- .../io/laserdisc/pure/cloudwatch/tagless/Interpreter.scala | 2 -- .../scala/io/laserdisc/pure/dynamodb/tagless/Interpreter.scala | 2 -- .../scala/io/laserdisc/pure/kinesis/tagless/Interpreter.scala | 2 -- .../main/scala/io/laserdisc/pure/s3/tagless/Interpreter.scala | 2 -- .../main/scala/io/laserdisc/pure/sns/tagless/Interpreter.scala | 2 -- .../main/scala/io/laserdisc/pure/sqs/tagless/Interpreter.scala | 2 -- 6 files changed, 12 deletions(-) diff --git a/pure-aws/pure-cloudwatch-tagless/src/main/scala/io/laserdisc/pure/cloudwatch/tagless/Interpreter.scala b/pure-aws/pure-cloudwatch-tagless/src/main/scala/io/laserdisc/pure/cloudwatch/tagless/Interpreter.scala index a08e59da..07747f8f 100644 --- a/pure-aws/pure-cloudwatch-tagless/src/main/scala/io/laserdisc/pure/cloudwatch/tagless/Interpreter.scala +++ b/pure-aws/pure-cloudwatch-tagless/src/main/scala/io/laserdisc/pure/cloudwatch/tagless/Interpreter.scala @@ -5,8 +5,6 @@ import cats.data.Kleisli import cats.effect.{Async, Resource} import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder -import java.util.concurrent.CompletionException - // Types referenced import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient import software.amazon.awssdk.services.cloudwatch.model.* diff --git a/pure-aws/pure-dynamodb-tagless/src/main/scala/io/laserdisc/pure/dynamodb/tagless/Interpreter.scala b/pure-aws/pure-dynamodb-tagless/src/main/scala/io/laserdisc/pure/dynamodb/tagless/Interpreter.scala index 4dfa1ae9..8880d926 100644 --- a/pure-aws/pure-dynamodb-tagless/src/main/scala/io/laserdisc/pure/dynamodb/tagless/Interpreter.scala +++ b/pure-aws/pure-dynamodb-tagless/src/main/scala/io/laserdisc/pure/dynamodb/tagless/Interpreter.scala @@ -5,8 +5,6 @@ import cats.data.Kleisli import cats.effect.{Async, Resource} import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder -import java.util.concurrent.CompletionException - // Types referenced import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import software.amazon.awssdk.services.dynamodb.model.* diff --git a/pure-aws/pure-kinesis-tagless/src/main/scala/io/laserdisc/pure/kinesis/tagless/Interpreter.scala b/pure-aws/pure-kinesis-tagless/src/main/scala/io/laserdisc/pure/kinesis/tagless/Interpreter.scala index 7460cbf9..d13568b0 100644 --- a/pure-aws/pure-kinesis-tagless/src/main/scala/io/laserdisc/pure/kinesis/tagless/Interpreter.scala +++ b/pure-aws/pure-kinesis-tagless/src/main/scala/io/laserdisc/pure/kinesis/tagless/Interpreter.scala @@ -5,8 +5,6 @@ import cats.data.Kleisli import cats.effect.{Async, Resource} import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder -import java.util.concurrent.CompletionException - // Types referenced import software.amazon.awssdk.services.kinesis.KinesisAsyncClient import software.amazon.awssdk.services.kinesis.model.* diff --git a/pure-aws/pure-s3-tagless/src/main/scala/io/laserdisc/pure/s3/tagless/Interpreter.scala b/pure-aws/pure-s3-tagless/src/main/scala/io/laserdisc/pure/s3/tagless/Interpreter.scala index 1e941e1b..16415c9f 100644 --- a/pure-aws/pure-s3-tagless/src/main/scala/io/laserdisc/pure/s3/tagless/Interpreter.scala +++ b/pure-aws/pure-s3-tagless/src/main/scala/io/laserdisc/pure/s3/tagless/Interpreter.scala @@ -6,8 +6,6 @@ import cats.effect.{Async, Resource} import software.amazon.awssdk.services.s3.S3AsyncClientBuilder import software.amazon.awssdk.services.s3.model.* -import java.util.concurrent.CompletionException - // Types referenced import software.amazon.awssdk.core.async.{AsyncRequestBody, AsyncResponseTransformer} import software.amazon.awssdk.services.s3.S3AsyncClient diff --git a/pure-aws/pure-sns-tagless/src/main/scala/io/laserdisc/pure/sns/tagless/Interpreter.scala b/pure-aws/pure-sns-tagless/src/main/scala/io/laserdisc/pure/sns/tagless/Interpreter.scala index 3b62b666..e752a4c9 100644 --- a/pure-aws/pure-sns-tagless/src/main/scala/io/laserdisc/pure/sns/tagless/Interpreter.scala +++ b/pure-aws/pure-sns-tagless/src/main/scala/io/laserdisc/pure/sns/tagless/Interpreter.scala @@ -5,8 +5,6 @@ import cats.data.Kleisli import cats.effect.{Async, Resource} import software.amazon.awssdk.services.sns.SnsAsyncClientBuilder -import java.util.concurrent.CompletionException - // Types referenced import software.amazon.awssdk.services.sns.SnsAsyncClient import software.amazon.awssdk.services.sns.model.* diff --git a/pure-aws/pure-sqs-tagless/src/main/scala/io/laserdisc/pure/sqs/tagless/Interpreter.scala b/pure-aws/pure-sqs-tagless/src/main/scala/io/laserdisc/pure/sqs/tagless/Interpreter.scala index f3dac0fd..ea5a3dd8 100644 --- a/pure-aws/pure-sqs-tagless/src/main/scala/io/laserdisc/pure/sqs/tagless/Interpreter.scala +++ b/pure-aws/pure-sqs-tagless/src/main/scala/io/laserdisc/pure/sqs/tagless/Interpreter.scala @@ -5,8 +5,6 @@ import cats.data.Kleisli import cats.effect.{Async, Resource} import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder -import java.util.concurrent.CompletionException - // Types referenced import software.amazon.awssdk.services.sqs.SqsAsyncClient import software.amazon.awssdk.services.sqs.model.*