Skip to content

Commit

Permalink
Fixes issue with passing stream as a request body
Browse files Browse the repository at this point in the history
* Instead of passing the stream as req.body it is necessary to use the Request.streamBody method (when we deal with non-blocking/async streams. For this reason, the pattern match distinguishing request types has been enhanced to check if the request body is supposed to be a stream and if yes, the .streamBody method is called to set it.
  • Loading branch information
Michal Stopyra committed Feb 7, 2025
1 parent 30fa5b7 commit 9ed2a31
Showing 1 changed file with 27 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,27 @@ private[sttp] class EndpointToSttpClient[R](clientOptions: SttpClientOptions, ws
case other => other
}

(bodyIsStream(e.output), isWebSocket) match {
case (Some(streams), _) =>
(bodyIsStream(e.input), bodyIsStream(e.output), isWebSocket) match {
case (Some(streamsIn), None, _) => // request body is a stream
req3
.streamBody(streamsIn)(iParams.asInstanceOf[streamsIn.BinaryStream])
.asInstanceOf[GenericRequest[DecodeResult[Either[E, O]], Any]]
case (None, Some(streamsOut), _) => // response is a stream
req3
.response(
asStreamAlwaysUnsafe(streams).mapWithMetadata(mapWithMetadataF).map(mapF)
asStreamAlwaysUnsafe(streamsOut).mapWithMetadata(mapWithMetadataF).map(mapF)
)
.asInstanceOf[GenericRequest[DecodeResult[Either[E, O]], Any]]
case (_, true) =>
case (Some(streamsIn), Some(streamsOut), _) => { // both request body and response are streams
req3
.streamBody(streamsIn)(iParams.asInstanceOf[streamsIn.BinaryStream])
.asInstanceOf[StreamRequest[Any, Any]]
.response(
asStreamAlwaysUnsafe(streamsOut).mapWithMetadata(mapWithMetadataF).map(mapF)
)
.asInstanceOf[GenericRequest[DecodeResult[Either[E, O]], Any]]
}
case (None, None, true) =>
req3
.response(
async
Expand All @@ -70,7 +83,7 @@ private[sttp] class EndpointToSttpClient[R](clientOptions: SttpClientOptions, ws
.map(mapF)
)
.asInstanceOf[GenericRequest[DecodeResult[Either[E, O]], Any]]
case (None, false) =>
case (None, None, false) =>
val response = fromMetadata(
outToResponseAs(e.errorOutput),
ConditionalResponseAs(isSuccess, outToResponseAs(e.output))
Expand Down Expand Up @@ -123,10 +136,8 @@ private[sttp] class EndpointToSttpClient[R](clientOptions: SttpClientOptions, ws
) =>
val req2 = req.body(value.asInstanceOf[InputStream])
(uri, req2)
case EndpointIO.OneOfBody(Nil, _) => throw new RuntimeException("One of body without variants")
case EndpointIO.StreamBodyWrapper(StreamBodyIO(streams, _, _, _, _)) =>
val req2 = req.body(value.asInstanceOf[InputStream])
(uri, req2)
case EndpointIO.OneOfBody(Nil, _) => throw new RuntimeException("One of body without variants")
case EndpointIO.StreamBodyWrapper(StreamBodyIO(streams, _, _, _, _)) => (uri, req)
case EndpointIO.Header(name, codec, _) =>
val req2 = codec
.encode(value)
Expand Down Expand Up @@ -248,6 +259,13 @@ private[sttp] class EndpointToSttpClient[R](clientOptions: SttpClientOptions, ws
}.headOption
}

private def bodyIsStream[I](in: EndpointInput[I]): Option[Streams[_]] = {
in.traverseInputs {
case EndpointIO.StreamBodyWrapper(StreamBodyIO(streams, _, _, _, _)) => Vector(streams)
case EndpointIO.OneOfBody(variants, _) => variants.flatMap(_.body.toOption).map(_.wrapped.streams).toVector
}.headOption
}

private def bodyIsWebSocket[I](out: EndpointOutput[I]): Boolean = {
out.traverseOutputs { case EndpointOutput.WebSocketBodyWrapper(_) =>
Vector(())
Expand Down

0 comments on commit 9ed2a31

Please sign in to comment.