Skip to content

Commit

Permalink
Initial type filter for composite projections
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski committed Oct 11, 2023
1 parent cfa7f5e commit b27ac89
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewP
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient.Refresh
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.{ElasticSearchClient, IndexLabel}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.indexing.{ElasticSearchSink, GraphResourceToDocument}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.graph.Graph
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.{JsonLdApi, JsonLdJavaApi}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
Expand Down Expand Up @@ -55,6 +56,7 @@ final class Single[SinkFormat](
queryGraph: SingleQueryGraph,
transform: GraphResource => Task[Option[SinkFormat]],
sink: Chunk[Elem[SinkFormat]] => Task[Chunk[Elem[Unit]]],
targetTypes: Set[Iri],
override val chunkSize: Int,
override val maxWindow: FiniteDuration
) extends CompositeSink {
Expand All @@ -68,8 +70,13 @@ final class Single[SinkFormat](
transformed <- graph.flatTraverse(transform)
} yield transformed

private def noMatchingType: GraphResource => Boolean = gr =>
if (targetTypes.isEmpty) false
else !gr.types.exists(targetTypes.contains)

override def apply(elements: Chunk[Elem[GraphResource]]): Task[Chunk[Elem[Unit]]] =
elements
.map { e => e.dropWhen(noMatchingType) }
.traverse {
case e: SuccessElem[GraphResource] => e.evalMapFilter(queryTransform)
case e: DroppedElem => Task.pure(e)
Expand Down Expand Up @@ -99,6 +106,7 @@ final class Batch[SinkFormat](
queryGraph: BatchQueryGraph,
transform: GraphResource => Task[Option[SinkFormat]],
sink: Chunk[Elem[SinkFormat]] => Task[Chunk[Elem[Unit]]],
targetTypes: Set[Iri],
override val chunkSize: Int,
override val maxWindow: FiniteDuration
)(implicit rcr: RemoteContextResolution)
Expand Down Expand Up @@ -128,9 +136,14 @@ final class Batch[SinkFormat](
.map(g => gr.copy(graph = g.replaceRootNode(gr.id)))
}

override def apply(elements: Chunk[Elem[GraphResource]]): Task[Chunk[Elem[Unit]]] =
private def noMatchingType: GraphResource => Boolean = gr =>
if (targetTypes.isEmpty) false
else !gr.types.exists(targetTypes.contains)

override def apply(elements: Chunk[Elem[GraphResource]]): Task[Chunk[Elem[Unit]]] = {
val filteredElements = elements.map { e => e.dropWhen(noMatchingType) }
for {
graph <- query(elements).span("batchQueryGraph")
graph <- query(filteredElements).span("batchQueryGraph")
transformed <- graph match {
case Some(fullGraph) =>
elements.traverse { elem =>
Expand All @@ -143,6 +156,7 @@ final class Batch[SinkFormat](
}
sank <- sink(transformed)
} yield sank
}
}

object CompositeSink {
Expand Down Expand Up @@ -171,6 +185,7 @@ object CompositeSink {
target.query,
GraphResourceToNTriples.graphToNTriples,
BlazegraphSink(blazeClient, cfg.blazegraphBatch, namespace).apply,
target.resourceTypes,
cfg.blazegraphBatch,
cfg.sinkConfig
)
Expand Down Expand Up @@ -211,6 +226,7 @@ object CompositeSink {
target.query,
new GraphResourceToDocument(target.context, target.includeContext).graphToDocument,
esSink.apply,
target.resourceTypes,
cfg.elasticsearchBatch,
cfg.sinkConfig
)
Expand All @@ -222,6 +238,7 @@ object CompositeSink {
query: SparqlConstructQuery,
transform: GraphResource => Task[Option[SinkFormat]],
sink: Chunk[Elem[SinkFormat]] => Task[Chunk[Elem[Unit]]],
types: Set[Iri],
batchConfig: BatchConfig,
sinkConfig: SinkConfig
)(implicit rcr: RemoteContextResolution): CompositeSink = sinkConfig match {
Expand All @@ -230,6 +247,7 @@ object CompositeSink {
new SingleQueryGraph(blazeClient, common, query),
transform,
sink,
types,
batchConfig.maxElements,
batchConfig.maxInterval
)
Expand All @@ -238,6 +256,7 @@ object CompositeSink {
new BatchQueryGraph(blazeClient, common, query),
transform,
sink,
types,
batchConfig.maxElements,
batchConfig.maxInterval
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ sealed trait Elem[+A] extends Product with Serializable {
case e: DroppedElem => e
}

/** Drop a [[SuccessElem]] only if the provided condition is met */
def dropWhen(cond: A => Boolean): Elem[A] = this match {
case e: SuccessElem[A] if cond(e.value) => e.dropped
case e => e
}

/**
* Maps the underlying element value if this is a [[Elem.SuccessElem]] using f.
* @param f
Expand Down

0 comments on commit b27ac89

Please sign in to comment.