Skip to content

Commit

Permalink
Enable compression for outgoing requests (#4279)
Browse files Browse the repository at this point in the history
* Enable compression for outgoing requests

---------

Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored Sep 19, 2023
1 parent 86a4463 commit 46d110a
Show file tree
Hide file tree
Showing 15 changed files with 55 additions and 33 deletions.
12 changes: 11 additions & 1 deletion delta/app/src/main/resources/app.conf
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ app {
}

defaults {
http-client {
http-client-compression {
# the retry strategy for the http client
retry = ${app.defaults.constant-retry-strategy}
# the strategy to decide if it is worth retrying when an Http error occurs.
Expand All @@ -320,6 +320,16 @@ app {
compression = true
}

http-client-no-compression {
# the retry strategy for the http client
retry = ${app.defaults.constant-retry-strategy}
# the strategy to decide if it is worth retrying when an Http error occurs.
# allowed strategies are 'always', 'never' or 'onServerError'.
is-worth-retrying = "onServerError"
# Flag to decide whether or not to support compression
compression = false
}

# default query configuration
query {
batch-size = 30
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object RealmsModule extends ModuleDef {
}

make[HttpClient].named("realm").from { (as: ActorSystem[Nothing], sc: Scheduler) =>
HttpClient.noRetry()(as.classicSystem, sc)
HttpClient.noRetry(compression = false)(as.classicSystem, sc)
}

many[SseEncoder[_]].add { base: BaseUri => RealmEvent.sseEncoder(base) }
Expand Down
4 changes: 2 additions & 2 deletions delta/plugins/blazegraph/src/main/resources/blazegraph.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ plugins.blazegraph {
#}

# configuration of the indexing Blazegraph client
indexing-client = ${app.defaults.http-client}
indexing-client = ${app.defaults.http-client-no-compression}

# configuration of the query Blazegraph client
query-client = ${app.defaults.http-client}
query-client = ${app.defaults.http-client-no-compression}
query-client.is-worth-retrying = "never"

# Blazegraph query timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object BlazegraphClientSetup {

def resource()(implicit s: Scheduler): Resource[Task, BlazegraphClient] = {
for {
(httpClient, actorSystem) <- HttpClientSetup()
(httpClient, actorSystem) <- HttpClientSetup(compression = false)
container <- BlazegraphContainer.resource()
} yield {
implicit val as: ActorSystem = actorSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class BlazegraphViewsQuerySpec(docker: BlazegraphDocker)
}

implicit private val sc: Scheduler = Scheduler.global
implicit private val httpConfig: HttpClientConfig = HttpClientConfig(AlwaysGiveUp, HttpClientWorthRetry.never, true)
implicit private val httpConfig: HttpClientConfig = HttpClientConfig(AlwaysGiveUp, HttpClientWorthRetry.never, false)
implicit private val baseUri: BaseUri = BaseUri("http://localhost", Label.unsafe("v1"))

implicit private val uuidF: UUIDF = UUIDF.random
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ plugins.composite-views {
pagination = ${app.defaults.pagination}
# the HTTP client configuration for a remote source
remote-source-client {
http = ${app.defaults.http-client}
http = ${app.defaults.http-client-compression}
retry-delay = 1 minute
# the maximum batching size, corresponding to the maximum number of Blazegraph documents uploaded on a bulk request.
# in this window, duplicated persistence ids are discarded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ plugins.elasticsearch {
# password= "password"
# }
# configuration of the Elasticsearch client
client = ${app.defaults.http-client}
client = ${app.defaults.http-client-compression}
# the elasticsearch event log configuration
event-log = ${app.defaults.event-log}
# the elasticsearch pagination config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object ElasticSearchClientSetup extends CirceLiteral {

def resource()(implicit s: Scheduler): Resource[Task, ElasticSearchClient] = {
for {
(httpClient, actorSystem) <- HttpClientSetup()
(httpClient, actorSystem) <- HttpClientSetup(compression = true)
container <- ElasticSearchContainer.resource()
} yield {
implicit val as: ActorSystem = actorSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
make[StorageTypeConfig].from { cfg: StoragePluginConfig => cfg.storages.storageTypeConfig }

make[HttpClient].named("storage").from { (as: ActorSystem[Nothing], sc: Scheduler) =>
HttpClient.noRetry()(as.classicSystem, sc)
HttpClient.noRetry(compression = false)(as.classicSystem, sc)
}

make[Storages]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ object HttpClient {
/**
* Construct an Http client using an underlying akka http client which will not retry on failures
*/
final def noRetry()(implicit as: ActorSystem, scheduler: Scheduler): HttpClient = {
implicit val config: HttpClientConfig = HttpClientConfig.noRetry
final def noRetry(compression: Boolean)(implicit as: ActorSystem, scheduler: Scheduler): HttpClient = {
implicit val config: HttpClientConfig = HttpClientConfig.noRetry(compression)
apply()
}

Expand Down Expand Up @@ -119,11 +119,16 @@ object HttpClient {
override def apply[A](
req: HttpRequest
)(handleResponse: PartialFunction[HttpResponse, HttpResult[A]]): HttpResult[A] = {
val reqCompressionSupport = if (httpConfig.compression) req.addHeader(acceptEncoding) else req
val reqCompressionSupport =
if (httpConfig.compression) {
Coders.Gzip.encodeMessage(req).addHeader(acceptEncoding)
} else
req.addHeader(acceptEncoding)

for {
encodedResp <- client.execute(reqCompressionSupport).mapError(toHttpError(req))
resp <- decodeResponse(req, encodedResp)
a <- handleResponse.applyOrElse(resp, resp => consumeEntity[A](req, resp))
encodedResp <- client.execute(reqCompressionSupport).mapError(toHttpError(reqCompressionSupport))
resp <- decodeResponse(reqCompressionSupport, encodedResp)
a <- handleResponse.applyOrElse(resp, resp => consumeEntity[A](reqCompressionSupport, resp))
} yield a
}.retry(httpConfig.strategy)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ch.epfl.bluebrain.nexus.delta.sdk.http

import ch.epfl.bluebrain.nexus.delta.kernel.{RetryStrategy, RetryStrategyConfig}
import com.typesafe.scalalogging.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.{Logger, RetryStrategy, RetryStrategyConfig}
import pureconfig.ConfigReader
import pureconfig.error.CannotConvert
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClientConfig.logger
Expand Down Expand Up @@ -38,8 +37,8 @@ object HttpClientConfig {

private[http] val logger: Logger = Logger[HttpClientConfig]

val noRetry: HttpClientConfig =
HttpClientConfig(RetryStrategyConfig.AlwaysGiveUp, HttpClientWorthRetry.never, compression = true)
def noRetry(compression: Boolean): HttpClientConfig =
HttpClientConfig(RetryStrategyConfig.AlwaysGiveUp, HttpClientWorthRetry.never, compression = compression)

@nowarn("cat=unused")
implicit private val httpClientWorthRetryConverter: ConfigReader[HttpClientWorthRetry] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ trait ConfigFixtures {
)

def httpClientConfig: HttpClientConfig =
HttpClientConfig(RetryStrategyConfig.AlwaysGiveUp, HttpClientWorthRetry.never, true)
HttpClientConfig(RetryStrategyConfig.AlwaysGiveUp, HttpClientWorthRetry.never, false)

def fusionConfig: FusionConfig = FusionConfig(Uri("https://bbp.epfl.ch/nexus/web/"), enableRedirects = true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import monix.execution.Scheduler

object HttpClientSetup {

def apply()(implicit s: Scheduler): Resource[Task, (HttpClient, ActorSystem)] = {
def apply(compression: Boolean)(implicit s: Scheduler): Resource[Task, (HttpClient, ActorSystem)] = {
implicit val httpConfig: HttpClientConfig =
HttpClientConfig(RetryStrategyConfig.AlwaysGiveUp, HttpClientWorthRetry.never, compression = true)
HttpClientConfig(RetryStrategyConfig.AlwaysGiveUp, HttpClientWorthRetry.never, compression = compression)
Resource
.make[Task, ActorSystem](Task.delay(ActorSystem()))((as: ActorSystem) => Task.delay(as.terminate()).void)
.map { implicit as =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,15 @@ class HttpClientSpec
private val value2 = Value("second", 2, deprecated = true)

private val baseUri = Uri("http://localhost/v1")
private val reqGetValue = HttpRequest(uri = baseUri / s"values/first")
private val getUri = baseUri / s"values/first"
private val reqGetValue = HttpRequest(uri = getUri)
private val count = Count()
private val reqStreamValues = HttpRequest(uri = baseUri / "values/events")
private val reqClientError = HttpRequest(uri = baseUri / "values/errors/client")
private val reqServerError = HttpRequest(uri = baseUri / "values/errors/server")
private val streamUri = baseUri / "values/events"
private val reqStreamValues = HttpRequest(uri = streamUri)
private val clientErrorUri = baseUri / "values/errors/client"
private val reqClientError = HttpRequest(uri = clientErrorUri)
private val serverErrorUri = baseUri / "values/errors/server"
private val reqServerError = HttpRequest(uri = serverErrorUri)

private def toSource(values: List[Json]): AkkaSource =
Source(values.map(j => ByteString(j.noSpaces)))
Expand All @@ -63,22 +67,22 @@ class HttpClientSpec

val httpSingleReq = new HttpSingleRequest {
override def execute(request: HttpRequest): Task[HttpResponse] =
request match {
case `reqGetValue` =>
request.uri match {
case `getUri` =>
Task.delay(count.reqGetValue.incrementAndGet()) >>
Task(response(HttpEntity(`application/json`, value1.asJson.noSpaces)))
case `reqStreamValues` =>
case `streamUri` =>
Task.delay(count.reqStreamValues.incrementAndGet()) >>
Task(response(HttpEntity(`application/octet-stream`, toSource(List(value1.asJson, value2.asJson)))))
case `reqClientError` =>
case `clientErrorUri` =>
Task.delay(count.reqClientError.incrementAndGet()) >>
Task(response(HttpEntity(`application/json`, json"""{"error": "client"}""".noSpaces), BadRequest))
case `reqServerError` =>
case `serverErrorUri` =>
Task.delay(count.reqServerError.incrementAndGet()) >>
Task(
response(HttpEntity(`application/json`, json"""{"error": "server"}""".noSpaces), InternalServerError)
)
case _ =>
case _ =>
Task.delay(count.reqOtherError.incrementAndGet()) >>
Task.raiseError(new IllegalArgumentException("wrong request"))
}
Expand Down
4 changes: 4 additions & 0 deletions docs/src/main/paradox/docs/releases/v1.9-release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ To improve indexing performance, the types defined in the
are filtered in PostgreSQL rather than in Nexus Delta.
This avoids querying for data just to discard it straight away.

#### Compressing requests to Elasticsearch

The different requests to Elasticsearch are now compressed by default allowing to reduce the I/Os especially during indexing.

### Composite views

To enhance performance of indexing of composite views, Nexus Delta introduces the following features.
Expand Down

0 comments on commit 46d110a

Please sign in to comment.