diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/service/InternalVectorPipelineIngestionService.java b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/service/InternalVectorPipelineIngestionService.java index 065a89223..bc4aad843 100644 --- a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/service/InternalVectorPipelineIngestionService.java +++ b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/service/InternalVectorPipelineIngestionService.java @@ -82,48 +82,68 @@ Uni send(SendRequest sendRequest) { var vScheduleId = scheduleId + VectorPipeline.VECTOR_PIPELINE_SUFFIX; return QuarkusCacheUtil.getAsync( - cache, - new CompositeCacheKey(scheduleId), - sessionFactory.withStatelessSession(tenantId, (s -> s - .createNamedQuery(VectorIndex.FETCH_BY_SCHEDULE_ID, VectorIndex.class) - .setParameter("scheduleId", scheduleId) - .getSingleResult() - .flatMap(vectorIndex -> s.createNamedQuery( - EmbeddingModel.FETCH_CURRENT, - EmbeddingModel.class + cache, + new CompositeCacheKey(scheduleId), + sessionFactory.withStatelessSession(tenantId, s -> s + .createNamedQuery( + VectorIndex.FETCH_BY_SCHEDULE_ID, VectorIndex.class) + .setParameter("scheduleId", scheduleId) + .getSingleResult() + .flatMap(ignore -> s.createNamedQuery( + EmbeddingModel.FETCH_CURRENT, + EmbeddingModel.class + ) + .getSingleResult() + ) ) - .getSingleResult() + .map(ignore -> Status.ENABLED) + .onFailure().recoverWithItem(Status.DISABLED) ) - )) - ).onItem().invoke(() -> { + .invoke((status) -> { - log.infof("VectorIndex is active for scheduleId %s", scheduleId); + switch (status) { + case ENABLED -> { - var payload = sendRequest.payload(); + log.infof("VectorIndex is active for scheduleId %s", scheduleId); - var dataPayload = Json.decodeValue(Buffer.buffer(payload), DataPayload.class); - var ingestionPayload = ingestionPayloadMapper.map(dataPayload); + var payload = sendRequest.payload(); - var metadata = Metadata.of( - OutgoingRabbitMQMetadata - .builder() - .withRoutingKey(ShardingKey.asString(tenantId, vScheduleId)) - .withDeliveryMode(2) - .build() - ); + var dataPayload = Json.decodeValue( + Buffer.buffer(payload), DataPayload.class); - var ingestionIndexWriterPayload = IngestionIndexWriterPayload.builder() - .ingestionPayload(ingestionPayload) - .build(); + var ingestionPayload = ingestionPayloadMapper.map(dataPayload); - emitter.send(Message.of(ingestionIndexWriterPayload, metadata)); + var metadata = Metadata.of( + OutgoingRabbitMQMetadata + .builder() + .withRoutingKey(ShardingKey.asString(tenantId, vScheduleId)) + .withDeliveryMode(2) + .build() + ); - }).onFailure().invoke((throwable) -> - log.infof("No active vector index for scheduleId %s", scheduleId) - ).replaceWithVoid(); + var ingestionIndexWriterPayload = IngestionIndexWriterPayload.builder() + .ingestionPayload(ingestionPayload) + .build(); + + emitter.send(Message.of(ingestionIndexWriterPayload, metadata)); + + } + case DISABLED -> log.infof( + "No active vector index for scheduleId %s", scheduleId); + } + }) + .onFailure() + .invoke(throwable -> log.errorf( + throwable, "Cannot send payload to VectorPipeline for scheduleId %s", scheduleId) + ) + .replaceWithVoid(); } private record SendRequest(ShardingKey shardingKey, byte[] payload) {} + private enum Status { + ENABLED, + DISABLED + } }