Skip to content

Commit

Permalink
fix: Always use coroutine dispatcher as executor for Spanner.
Browse files Browse the repository at this point in the history
The previous behavior involved passing a separate Executor for read-write transactions that was independent of the coroutine context.
  • Loading branch information
SanjayVas committed Oct 21, 2024
1 parent 6f79ccf commit 678c23c
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ kt_jvm_library(
"//imports/kotlin/com/google/type:date_kt_jvm_proto",
"//imports/kotlin/kotlinx/coroutines:core",
"//imports/kotlin/kotlinx/coroutines/guava",
"//src/main/kotlin/org/wfanet/measurement/common/guava",
],
)
65 changes: 28 additions & 37 deletions src/main/kotlin/org/wfanet/measurement/gcloud/common/Futures.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,50 +15,41 @@
package org.wfanet.measurement.gcloud.common

import com.google.api.core.ApiFuture
import com.google.api.core.ForwardingApiFuture
import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.Uninterruptibles
import java.util.concurrent.ExecutionException
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.guava.asListenableFuture
import kotlinx.coroutines.suspendCancellableCoroutine

/**
* Suspends until the [ApiFuture] completes.
*
* @see kotlinx.coroutines.guava.await
*/
suspend fun <T> ApiFuture<T>.await(): T {
try {
if (isDone) return Uninterruptibles.getUninterruptibly(this)
} catch (e: ExecutionException) {
throw e.cause!!
}

return suspendCancellableCoroutine { cont ->
addListener(
{
if (isCancelled) {
cont.cancel()
} else {
try {
cont.resume(Uninterruptibles.getUninterruptibly(this))
} catch (e: ExecutionException) {
cont.resumeWithException(e.cause!!)
}
}
},
MoreExecutors.directExecutor()
)
}
}
import kotlinx.coroutines.guava.await
import org.wfanet.measurement.common.guava.awaitAsync

fun <T> Deferred<T>.asApiFuture(): ApiFuture<T> = asListenableFuture().asApiFuture()

suspend fun <T> ApiFuture<T>.await(): T = asListenableFuture().await()

private class ListenableFutureAdapter<T>(delegate: ListenableFuture<T>) :
SimpleForwardingListenableFuture<T>(delegate), ApiFuture<T>

private fun <T> ListenableFuture<T>.asApiFuture(): ApiFuture<T> = ListenableFutureAdapter(this)
private fun <T> ListenableFuture<T>.asApiFuture(): ApiFuture<T> {
@Suppress("UNCHECKED_CAST") return (this as? ApiFuture<T>) ?: ListenableFutureAdapter(this)
}

private class ApiFutureAdapter<T>(delegate: ApiFuture<T>) :
ForwardingApiFuture<T>(delegate), ListenableFuture<T>

fun <T> ApiFuture<T>.asListenableFuture(): ListenableFuture<T> {
@Suppress("UNCHECKED_CAST") return (this as? ListenableFuture<T>) ?: ApiFutureAdapter<T>(this)
}

/**
* Returns a [Deferred] which observes the [ListenableFuture] returned by [init].
*
* This should only be used in cases where [await] cannot be immediately called on the [ApiFuture].
*
* @see org.wfanet.measurement.common.guava.awaitAsync
*/
inline fun <T> CoroutineScope.awaitAsync(crossinline init: () -> ApiFuture<T>): Deferred<T> =
awaitAsync {
init().asListenableFuture()
}
Loading

0 comments on commit 678c23c

Please sign in to comment.