diff --git a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeSink.scala b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeSink.scala index 911ec085e0..b46dc583b2 100644 --- a/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeSink.scala +++ b/delta/plugins/composite-views/src/main/scala/ch/epfl/bluebrain/nexus/delta/plugins/compositeviews/CompositeSink.scala @@ -13,6 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewP import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel} import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.{ElasticSearchSink, GraphResourceToDocument} +import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri import ch.epfl.bluebrain.nexus.delta.rdf.graph.Graph import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi} import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution} @@ -55,6 +56,7 @@ final class Single[SinkFormat]( queryGraph: SingleQueryGraph, transform: GraphResource => Task[Option[SinkFormat]], sink: Chunk[Elem[SinkFormat]] => Task[Chunk[Elem[Unit]]], + targetTypes: Set[Iri], override val chunkSize: Int, override val maxWindow: FiniteDuration ) extends CompositeSink { @@ -68,8 +70,13 @@ final class Single[SinkFormat]( transformed <- graph.flatTraverse(transform) } yield transformed + private def noMatchingType: GraphResource => Boolean = gr => + if (targetTypes.isEmpty) false + else !gr.types.exists(targetTypes.contains) + override def apply(elements: Chunk[Elem[GraphResource]]): Task[Chunk[Elem[Unit]]] = elements + .map { e => e.dropWhen(noMatchingType) } .traverse { case e: SuccessElem[GraphResource] => e.evalMapFilter(queryTransform) case e: DroppedElem => Task.pure(e) @@ -99,6 +106,7 @@ final class Batch[SinkFormat]( queryGraph: BatchQueryGraph, transform: GraphResource => Task[Option[SinkFormat]], sink: Chunk[Elem[SinkFormat]] => Task[Chunk[Elem[Unit]]], + targetTypes: Set[Iri], override val chunkSize: Int, override val maxWindow: FiniteDuration )(implicit rcr: RemoteContextResolution) @@ -128,9 +136,14 @@ final class Batch[SinkFormat]( .map(g => gr.copy(graph = g.replaceRootNode(gr.id))) } - override def apply(elements: Chunk[Elem[GraphResource]]): Task[Chunk[Elem[Unit]]] = + private def noMatchingType: GraphResource => Boolean = gr => + if (targetTypes.isEmpty) false + else !gr.types.exists(targetTypes.contains) + + override def apply(elements: Chunk[Elem[GraphResource]]): Task[Chunk[Elem[Unit]]] = { + val filteredElements = elements.map { e => e.dropWhen(noMatchingType) } for { - graph <- query(elements).span("batchQueryGraph") + graph <- query(filteredElements).span("batchQueryGraph") transformed <- graph match { case Some(fullGraph) => elements.traverse { elem => @@ -143,6 +156,7 @@ final class Batch[SinkFormat]( } sank <- sink(transformed) } yield sank + } } object CompositeSink { @@ -171,6 +185,7 @@ object CompositeSink { target.query, GraphResourceToNTriples.graphToNTriples, BlazegraphSink(blazeClient, cfg.blazegraphBatch, namespace).apply, + target.resourceTypes, cfg.blazegraphBatch, cfg.sinkConfig ) @@ -211,6 +226,7 @@ object CompositeSink { target.query, new GraphResourceToDocument(target.context, target.includeContext).graphToDocument, esSink.apply, + target.resourceTypes, cfg.elasticsearchBatch, cfg.sinkConfig ) @@ -222,6 +238,7 @@ object CompositeSink { query: SparqlConstructQuery, transform: GraphResource => Task[Option[SinkFormat]], sink: Chunk[Elem[SinkFormat]] => Task[Chunk[Elem[Unit]]], + types: Set[Iri], batchConfig: BatchConfig, sinkConfig: SinkConfig )(implicit rcr: RemoteContextResolution): CompositeSink = sinkConfig match { @@ -230,6 +247,7 @@ object CompositeSink { new SingleQueryGraph(blazeClient, common, query), transform, sink, + types, batchConfig.maxElements, batchConfig.maxInterval ) @@ -238,6 +256,7 @@ object CompositeSink { new BatchQueryGraph(blazeClient, common, query), transform, sink, + types, batchConfig.maxElements, batchConfig.maxInterval ) diff --git a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala index 63d05219e1..ff253c0d87 100644 --- a/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala +++ b/delta/sourcing-psql/src/main/scala/ch/epfl/bluebrain/nexus/delta/sourcing/stream/Elem.scala @@ -85,6 +85,12 @@ sealed trait Elem[+A] extends Product with Serializable { case e: DroppedElem => e } + /** Drop a [[SuccessElem]] only if the provided condition is met */ + def dropWhen(cond: A => Boolean): Elem[A] = this match { + case e: SuccessElem[A] if cond(e.value) => e.dropped + case e => e + } + /** * Maps the underlying element value if this is a [[Elem.SuccessElem]] using f. * @param f