From fbc1d9ab9ede3106f6f1a486e40e9b92f95fe438 Mon Sep 17 00:00:00 2001 From: Simon Dumas Date: Tue, 13 Aug 2024 16:14:36 +0200 Subject: [PATCH 1/2] Allow to reset event metrics indexing at startup --- .../ElasticSearchPluginModule.scala | 2 ++ .../elasticsearch/EventMetricsProjection.scala | 17 ++++++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala index dfb179a461..03283385f3 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/ElasticSearchPluginModule.scala @@ -152,6 +152,7 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef { metricEncoders: Set[ScopedEventMetricEncoder[_]], xas: Transactors, supervisor: Supervisor, + projections: Projections, client: ElasticSearchClient, config: ElasticSearchViewsConfig, files: ElasticSearchFiles @@ -159,6 +160,7 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef { EventMetricsProjection( metricEncoders, supervisor, + projections, client, xas, config.batch, diff --git a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala index 71c1136189..3d7788fa96 100644 --- a/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala +++ b/delta/plugins/elasticsearch/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/elasticsearch/EventMetricsProjection.scala @@ -2,6 +2,8 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch import cats.data.NonEmptyChain import cats.effect.IO +import cats.effect.std.Env +import ch.epfl.bluebrain.nexus.delta.kernel.Logger import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.ElasticSearchSink @@ -12,6 +14,7 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.config.{BatchConfig, QueryConfig} import ch.epfl.bluebrain.nexus.delta.sourcing.event.EventStreaming import ch.epfl.bluebrain.nexus.delta.sourcing.model.SuccessElemStream import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset +import ch.epfl.bluebrain.nexus.delta.sourcing.projections.Projections import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Operation.Sink import ch.epfl.bluebrain.nexus.delta.sourcing.stream._ import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.AsJson @@ -20,6 +23,8 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.{MultiDecoder, Scope, Transactors} trait EventMetricsProjection object EventMetricsProjection { + private val logger = Logger[EventMetricsProjection] + val projectionMetadata: ProjectionMetadata = ProjectionMetadata("system", "event-metrics", None, None) val eventMetricsIndex: String => IndexLabel = prefix => IndexLabel.unsafe(s"${prefix}_project_metrics") @@ -49,6 +54,7 @@ object EventMetricsProjection { def apply( metricEncoders: Set[ScopedEventMetricEncoder[_]], supervisor: Supervisor, + projections: Projections, client: ElasticSearchClient, xas: Transactors, batchConfig: BatchConfig, @@ -73,7 +79,16 @@ object EventMetricsProjection { val createIndex = client.createIndex(index, Some(metricMappings.value), Some(metricsSettings.value)).void - apply(sink, supervisor, metrics, createIndex) + for { + shouldRestart <- Env[IO].get("RESET_EVENT_METRICS").map(_.getOrElse("false").toBoolean) + _ <- IO.whenA(shouldRestart)( + logger.warn("Resetting event metrics as the env RESET_EVENT_METRICS is set") >> projections.reset( + projectionMetadata.name + ) + ) + metricsProjection <- apply(sink, supervisor, metrics, createIndex) + } yield (metricsProjection) + } else IO.pure(dummy) /** From 84548fa8b9164e91184b86443ecf4442085eee4f Mon Sep 17 00:00:00 2001 From: Simon Dumas Date: Tue, 13 Aug 2024 18:12:27 +0200 Subject: [PATCH 2/2] Scalafmt --- .../tests/kg/SearchConfigIndexingSpec.scala | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/SearchConfigIndexingSpec.scala b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/SearchConfigIndexingSpec.scala index 99dad05d60..ef353070a5 100644 --- a/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/SearchConfigIndexingSpec.scala +++ b/tests/src/test/scala/ch/epfl/bluebrain/nexus/tests/kg/SearchConfigIndexingSpec.scala @@ -22,23 +22,23 @@ class SearchConfigIndexingSpec extends BaseIntegrationSpec { private val projId1 = genId() private val id1 = s"$orgId/$projId1" - private val neuronMorphologyId = "https://bbp.epfl.ch/data/neuron-morphology" - private val simulationReadyNeuronMorphologyId = "https://bbp.epfl.ch/data/simulation-ready-neuron-morphology" - private val neuronDensityId = "https://bbp.epfl.ch/data/neuron-density" - private val traceId = "https://bbp.epfl.ch/data/trace" - private val curatedTraceId = "https://bbp.epfl.ch/data/curated-trace" - private val unassessedTraceId = "https://bbp.epfl.ch/data/unassessed-trace" - private val layerThicknessId = "https://bbp.epfl.ch/data/layer-thickness" - private val boutonDensityId = "https://bbp.epfl.ch/data/bouton-density" - private val simulationCampaignId = "https://bbp.epfl.ch/data/simulation-campaign" - private val simulationId = "https://bbp.epfl.ch/data/simulation" - private val synapseId = "https://bbp.epfl.ch/data/synapse" - private val synapseTwoPathwaysId = "https://bbp.epfl.ch/data/synapse-two-pathways" - private val detailedCircuitId = "https://bbp.epfl.ch/data/detailed-circuit" - private val emodelId = "https://bbp.epfl.ch/data/emodel" - private val memodelId = "https://bbp.epfl.ch/data/memodel" - private val analysisSuitablememodelId = "https://bbp.epfl.ch/data/analysis-suitable-memodel" - private val singleNeuronSimulationId = "https://bbp.epfl.ch/data/synapse/single-neuron-simulation" + private val neuronMorphologyId = "https://bbp.epfl.ch/data/neuron-morphology" + private val simulationReadyNeuronMorphologyId = "https://bbp.epfl.ch/data/simulation-ready-neuron-morphology" + private val neuronDensityId = "https://bbp.epfl.ch/data/neuron-density" + private val traceId = "https://bbp.epfl.ch/data/trace" + private val curatedTraceId = "https://bbp.epfl.ch/data/curated-trace" + private val unassessedTraceId = "https://bbp.epfl.ch/data/unassessed-trace" + private val layerThicknessId = "https://bbp.epfl.ch/data/layer-thickness" + private val boutonDensityId = "https://bbp.epfl.ch/data/bouton-density" + private val simulationCampaignId = "https://bbp.epfl.ch/data/simulation-campaign" + private val simulationId = "https://bbp.epfl.ch/data/simulation" + private val synapseId = "https://bbp.epfl.ch/data/synapse" + private val synapseTwoPathwaysId = "https://bbp.epfl.ch/data/synapse-two-pathways" + private val detailedCircuitId = "https://bbp.epfl.ch/data/detailed-circuit" + private val emodelId = "https://bbp.epfl.ch/data/emodel" + private val memodelId = "https://bbp.epfl.ch/data/memodel" + private val analysisSuitablememodelId = "https://bbp.epfl.ch/data/analysis-suitable-memodel" + private val singleNeuronSimulationId = "https://bbp.epfl.ch/data/synapse/single-neuron-simulation" // the resources that should appear in the search index private val mainResources = List( @@ -398,12 +398,12 @@ class SearchConfigIndexingSpec extends BaseIntegrationSpec { } "have simulationReady field true if annotation is present" in { - val query = queryField(simulationReadyNeuronMorphologyId, "simulationReady") + val query = queryField(simulationReadyNeuronMorphologyId, "simulationReady") - assertOneSource(query) { json => - json shouldBe json"""{ "simulationReady": true }""" + assertOneSource(query) { json => + json shouldBe json"""{ "simulationReady": true }""" + } } - } "have the correct mType property" in { val query = queryField(traceId, "mType")