Skip to content

Commit

Permalink
Update to v1.0.3 of conformance suite (#318)
Browse files Browse the repository at this point in the history
This also fixes a bug in the conformance client with
how cancellation was implemented for the "suspend"
invocation style. Due to nuances of how coroutineScope
works, the previous formulation was incorrect, and was
only revealed by updated test cases in the v1.0.3 suite.
  • Loading branch information
jhump authored Aug 16, 2024
1 parent 972eeea commit 09d80a2
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 104 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ BIN := .tmp/bin
CACHE := .tmp/cache
LICENSE_HEADER_YEAR_RANGE := 2022-2023
LICENSE_HEADER_VERSION := v1.34.0
CONFORMANCE_VERSION := v1.0.2
CONFORMANCE_VERSION := v1.0.3
PROTOC_VERSION ?= $(shell yq '.versions.protobuf' gradle/libs.versions.toml | cut -d'.' -f2-)
ifeq ($(PROTOC_VERSION),)
$(error "Unable to determine protoc version")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.connectrpc.conformance.client

import com.connectrpc.Code
import com.connectrpc.ConnectException
import com.connectrpc.Headers
import com.connectrpc.ProtocolClientConfig
Expand All @@ -22,6 +23,7 @@ import com.connectrpc.ResponseMessage
import com.connectrpc.SerializationStrategy
import com.connectrpc.asConnectException
import com.connectrpc.compression.GzipCompressionPool
import com.connectrpc.conformance.client.ClientArgs.UnaryInvokeStyle
import com.connectrpc.conformance.client.adapt.AnyMessage
import com.connectrpc.conformance.client.adapt.BidiStreamClient
import com.connectrpc.conformance.client.adapt.ClientCompatRequest
Expand All @@ -43,7 +45,10 @@ import com.connectrpc.impl.ProtocolClient
import com.connectrpc.okhttp.ConnectOkHttpClient
import com.connectrpc.protocols.GETConfiguration
import com.google.protobuf.MessageLite
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import okhttp3.OkHttpClient
Expand Down Expand Up @@ -133,23 +138,57 @@ class Client(
}
val msg = fromAny(req.requestMessages[0], client.reqTemplate, requestType)
val resp = CompletableDeferred<ResponseMessage<Resp>>()
val canceler = client.execute(
args.invokeStyle,
msg,
req.requestHeaders,
resp::complete,
)
when (val cancel = req.cancel) {
is Cancel.AfterCloseSendMs -> {
delay(cancel.millis.toLong())
canceler()

return coroutineScope {
val canceler: Cancelable
when (args.unaryInvokeStyle) {
UnaryInvokeStyle.CALLBACK -> {
canceler = client.execute(msg, req.requestHeaders, resp::complete)
}
UnaryInvokeStyle.SUSPEND -> {
val job = launch {
try {
resp.complete(client.execute(msg, req.requestHeaders))
} catch (ex: Throwable) {
val code = if (ex is CancellationException) {
Code.CANCELED
} else {
Code.UNKNOWN
}
resp.complete(
ResponseMessage.Failure(
cause = ConnectException(code, exception = ex),
headers = emptyMap(),
trailers = emptyMap(),
),
)
}
}
canceler = { job.cancel() }
}
UnaryInvokeStyle.BLOCKING -> {
val call = client.blocking(msg, req.requestHeaders)
launch(Dispatchers.IO) {
resp.complete(call.execute())
}
canceler = { call.cancel() }
}
}
else -> {
// We already validated the case above.
// So this case means no cancellation.

when (val cancel = req.cancel) {
is Cancel.AfterCloseSendMs -> {
launch {
delay(cancel.millis.toLong())
canceler()
}
}
else -> {
// We already validated the case above.
// So this case means no cancellation.
}
}
unaryResult(0, resp.await())
}
return unaryResult(0, resp.await())
}

private suspend fun <Req : MessageLite, Resp : MessageLite> handleClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@

package com.connectrpc.conformance.client

import com.connectrpc.conformance.client.adapt.UnaryClient.InvokeStyle

data class ClientArgs(
val invokeStyle: InvokeStyle,
val unaryInvokeStyle: UnaryInvokeStyle,
val verbose: VerbosePrinter,
) {
companion object {
fun parseArgs(args: Array<String>): ClientArgs {
var invokeStyle = InvokeStyle.SUSPEND
var unaryInvokeStyle = UnaryInvokeStyle.SUSPEND
var verbosity = 0
var skip = false
for (i in args.indices) {
Expand All @@ -39,13 +37,13 @@ data class ClientArgs(
val v = args[i + 1]
when (v.lowercase()) {
"suspend" -> {
invokeStyle = InvokeStyle.SUSPEND
unaryInvokeStyle = UnaryInvokeStyle.SUSPEND
}
"callback" -> {
invokeStyle = InvokeStyle.CALLBACK
unaryInvokeStyle = UnaryInvokeStyle.CALLBACK
}
"blocking" -> {
invokeStyle = InvokeStyle.BLOCKING
unaryInvokeStyle = UnaryInvokeStyle.BLOCKING
}
else -> {
throw RuntimeException("value for $arg option should be 'suspend', 'callback', or 'blocking'; instead got '$v'")
Expand Down Expand Up @@ -73,7 +71,40 @@ data class ClientArgs(
}
}
}
return ClientArgs(invokeStyle, VerbosePrinter(verbosity, "* client: "))
return ClientArgs(unaryInvokeStyle, VerbosePrinter(verbosity, "* client: "))
}
}

/**
* The style of invocation, one each for the three different
* ways to invoke a unary RPC.
*/
enum class UnaryInvokeStyle {
/**
* Indicates the callback-based async signature, which
* invokes the method with the following signature:
* ```
* fun execute(Req, Headers, (ResponseMessage<Resp>)->Unit): Cancelable
* ```
*/
CALLBACK,

/**
* Indicates the suspend-based async signature, which
* invokes the method with the following signature:
* ```
* suspend fun execute(Req, Headers): ResponseMessage<Resp>
* ```
*/
SUSPEND,

/**
* Indicates the blocking signature, which invokes the
* method with the following signature:
* ```
* fun blocking(Req, Headers): UnaryBlockingCall<Resp>
* ```
*/
BLOCKING,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ import com.connectrpc.ResponseMessage
import com.connectrpc.UnaryBlockingCall
import com.connectrpc.http.Cancelable
import com.google.protobuf.MessageLite
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch

/**
* The client of a unary RPC operation. This provides multiple ways
Expand All @@ -40,82 +37,4 @@ abstract class UnaryClient<Req : MessageLite, Resp : MessageLite>(
abstract fun execute(req: Req, headers: Headers, onFinish: (ResponseMessage<Resp>) -> Unit): Cancelable

abstract fun blocking(req: Req, headers: Headers): UnaryBlockingCall<Resp>

/**
* Executes the unary RPC using the given invocation style, request
* message, and request headers. The given callback is invoked when
* the operation completes.
*
* This signature resembles the one above that takes a callback, but
* will adapt the call to the suspend or blocking signatures if so
* directed by the given InvokeStyle. This allows a caller to use a
* single shape to invoke the RPC, but actually exercise any/all of
* the above three signatures.
*/
suspend fun execute(
style: InvokeStyle,
req: Req,
headers: Headers,
onFinish: (ResponseMessage<Resp>) -> Unit,
): Cancelable {
when (style) {
InvokeStyle.CALLBACK -> {
return execute(req, headers, onFinish)
}
InvokeStyle.SUSPEND -> {
return coroutineScope {
val job = launch {
onFinish(execute(req, headers))
}
return@coroutineScope {
job.cancel()
}
}
}
InvokeStyle.BLOCKING -> {
val call = blocking(req, headers)
coroutineScope {
launch(Dispatchers.IO) {
onFinish(call.execute())
}
}
return {
call.cancel()
}
}
}
}

/**
* The style of invocation, one each for the three different
* ways to invoke a unary RPC.
*/
enum class InvokeStyle {
/**
* Indicates the callback-based async signature, which
* invokes the method with the following signature:
* ```
* fun execute(Req, Headers, (ResponseMessage<Resp>)->Unit): Cancelable
* ```
*/
CALLBACK,

/**
* Indicates the suspend-based async signature, which
* invokes the method with the following signature:
* ```
* suspend fun execute(Req, Headers): ResponseMessage<Resp>
* ```
*/
SUSPEND,

/**
* Indicates the blocking signature, which invokes the
* method with the following signature:
* ```
* fun blocking(Req, Headers): UnaryBlockingCall<Resp>
* ```
*/
BLOCKING,
}
}

0 comments on commit 09d80a2

Please sign in to comment.