diff --git a/.github/Dockerfile b/.github/Dockerfile index 91436a344b..9f222c04c3 100644 --- a/.github/Dockerfile +++ b/.github/Dockerfile @@ -15,6 +15,8 @@ COPY ./ldes-server-ingest/ldes-server-ingest-rest/target/ldes-server-ingest-rest COPY ./ldes-server-fetch/ldes-server-fetch-rest/target/ldes-server-fetch-rest-jar-with-dependencies.jar ./lib/ COPY ./ldes-server-admin/ldes-server-admin-rest/target/ldes-server-admin-rest-jar-with-dependencies.jar ./lib/ +COPY ./ldes-server-ingest/ldes-server-ingest-kafka/target/ldes-server-ingest-kafka-jar-with-dependencies.jar ./lib/ + # Plugin Fragmentations COPY ./ldes-server-fragmentation/ldes-server-fragmentation-geospatial/target/ldes-server-fragmentation-geospatial-jar-with-dependencies.jar ./lib/ COPY ./ldes-server-fragmentation/ldes-server-fragmentation-timebased-hierarchical/target/ldes-server-fragmentation-timebased-hierarchical-jar-with-dependencies.jar ./lib/ diff --git a/README.md b/README.md index 34749faa0a..69103ce10a 100644 --- a/README.md +++ b/README.md @@ -52,18 +52,19 @@ Each maven profile represents a different functionality of the LDES server that The default exported image contains all the profiles, but a custom image can be created with only the needed dependencies. -| Profile | Dependencies | Description | -|----------------------------|--------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------| -| `fragmentation` | `postgres-pagination-repository` | Allows basic fragmentation (pagination) | -| `maintenance` | `postgres-maintenance-repository` | Allows the LDES server to perform maintenance operations on its Event Streams and Members: compaction, retention, deletion. | -| **Interfaces** | | | -| `http-admin` | `ldes-server-admin-rest`,`postgres-admin-repository`* | Gives access to REST API to create and manage Event Streams and Views. | -| `http-ingest` | `ldes-server-ingest-rest`,`postgres-ingest-repository` | Gives access to REST API to ingest members into the LDES. | -| `http-fetch` | `ldes-server-fetch-rest`,`postgres-fetch-repository` | Gives access to REST API to fetch Event Streams, its Views and pages. | -| **Plugin Fragmentations** | | | -| `fragmentation-timebased` | `ldes-server-fragmentation-timebased-hierarchical` | Allows fragmentation in based on a timebased property. | -| `fragmentation-geospatial` | `ldes-server-fragmentation-geospatial` | Allows fragmentation in based on a geospatial property. | -| `fragmentation-reference` | `ldes-server-fragmentation-reference` | Allows fragmentation in based on a textual property. | +| Profile | Dependencies | Description | +|----------------------------|---------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------| +| `fragmentation` | `postgres-pagination-repository` | Allows basic fragmentation (pagination) | +| `maintenance` | `postgres-maintenance-repository` | Allows the LDES server to perform maintenance operations on its Event Streams and Members: compaction, retention, deletion. | +| **Interfaces** | | | +| `http-admin` | `ldes-server-admin-rest`,`postgres-admin-repository`* | Gives access to REST API to create and manage Event Streams and Views. | +| `http-ingest` | `ldes-server-ingest-rest`,`postgres-ingest-repository` | Gives access to REST API to ingest members into the LDES. | +| `kafka-ingest` | `ldes-server-ingest-kafka`,`postgres-ingest-repository` | Allows Kafka member ingestion into the LDES. | +| `http-fetch` | `ldes-server-fetch-rest`,`postgres-fetch-repository` | Gives access to REST API to fetch Event Streams, its Views and pages. | +| **Plugin Fragmentations** | | | +| `fragmentation-timebased` | `ldes-server-fragmentation-timebased-hierarchical` | Allows fragmentation in based on a timebased property. | +| `fragmentation-geospatial` | `ldes-server-fragmentation-geospatial` | Allows fragmentation in based on a geospatial property. | +| `fragmentation-reference` | `ldes-server-fragmentation-reference` | Allows fragmentation in based on a textual property. | *: The `postgres-admin-repository`, as shown by the dependency graph, will be loaded in by the other above-mentioned functionality profiles. diff --git a/content/dependency-graph.dot b/content/dependency-graph.dot index e7a0e1f7e5..6b49bdb304 100644 --- a/content/dependency-graph.dot +++ b/content/dependency-graph.dot @@ -14,6 +14,7 @@ digraph "ldes-server" { "be.vlaanderen.informatievlaanderen.vsds:ldes-server-ingest-common:jar:compile"[label=] "be.vlaanderen.informatievlaanderen.vsds:ldes-server-fetch-rest:jar:compile"[label=, fillcolor="#87CEFA", style="filled"] "be.vlaanderen.informatievlaanderen.vsds:ldes-server-fetch-common:jar:compile"[label=] + "be.vlaanderen.informatievlaanderen.vsds:ldes-server-ingest-kafka:jar:compile"[label=, fillcolor="#87CEFA", style="filled"] "be.vlaanderen.informatievlaanderen.vsds:ldes-server-retention:jar:compile"[label=] "be.vlaanderen.informatievlaanderen.vsds:ldes-server-maintenance-common:jar:compile"[label=] "be.vlaanderen.informatievlaanderen.vsds:ldes-server-compaction:jar:compile"[label=] @@ -33,6 +34,7 @@ digraph "ldes-server" { "be.vlaanderen.informatievlaanderen.vsds:ldes-server-fragmentation-reference:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-fragmentation-common:jar:compile" "be.vlaanderen.informatievlaanderen.vsds:ldes-server-admin-rest:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-admin-common:jar:compile" "be.vlaanderen.informatievlaanderen.vsds:ldes-server-ingest-rest:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-ingest-common:jar:compile" + "be.vlaanderen.informatievlaanderen.vsds:ldes-server-ingest-kafka:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-ingest-common:jar:compile" "be.vlaanderen.informatievlaanderen.vsds:ldes-server-fetch-rest:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-fetch-common:jar:compile" "be.vlaanderen.informatievlaanderen.vsds:ldes-server-fetch-rest:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-admin-common:jar:compile" "be.vlaanderen.informatievlaanderen.vsds:ldes-server-retention:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-maintenance-common:jar:compile" @@ -52,6 +54,7 @@ digraph "ldes-server" { "be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:postgres-liquibase:jar:compile" [label="Database Changes"] "be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-ingest-rest:jar:compile" [label="Ingest REST API"] "be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:postgres-ingest-repository:jar:compile" [label="Ingest Implementation"] + "be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-ingest-kafka:jar:compile" [label="Ingest Kafka"] "be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-fetch-rest:jar:compile" [label="Fetch REST API"] "be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:postgres-fetch-repository:jar:compile" [label="Fetch Implementation"] "be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:postgres-pagination-repository:jar:compile" [label="Basic Fragmentation Implementation"] @@ -60,4 +63,5 @@ digraph "ldes-server" { "be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-fragmentation-reference:jar:compile" [label="Plugin Fragmentation"] "be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-admin-rest:jar:compile" [label="Admin REST API"] "be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:postgres-maintenance-repository:jar:compile" [label="Maintenance Implementation"] + } \ No newline at end of file diff --git a/content/ldes-server-graph.png b/content/ldes-server-graph.png index 8ea0f4d5c8..70238a1b32 100644 Binary files a/content/ldes-server-graph.png and b/content/ldes-server-graph.png differ diff --git a/docker-compose/kafka/docker-compose.yml b/docker-compose/kafka/docker-compose.yml new file mode 100644 index 0000000000..97385ae8de --- /dev/null +++ b/docker-compose/kafka/docker-compose.yml @@ -0,0 +1,71 @@ +version: "3.3" +services: + ldes-server: + container_name: basic_ldes-server + image: ldes/ldes-server + environment: + - SPRING_CONFIG_LOCATION=/config/ + - PYROSCOPE_CONFIGURATION_FILE=/config/pyroscope.properties + volumes: + - ./docker-compose/server.config.yml:/config/application.yml:ro + - ./docker-compose/pyroscope.properties:/config/pyroscope.properties:ro + ports: + - "8080:8080" + - "8087:8087" + - "8088:8088" + - "8089:8089" + networks: + - ldes + depends_on: + - postgres + postgres: + container_name: ldes-postgres + image: postgres:14-alpine + ports: + - 5432:5432 + environment: + - POSTGRES_PASSWORD=admin + - POSTGRES_USER=admin + - POSTGRES_DB=test + networks: + - ldes + pyroscope: + image: grafana/pyroscope:latest + ports: + - 4040:4040 + networks: + - ldes + + zookeeper: + image: zookeeper:3.7.0 + ports: + - "2181:2181" + + kafka: + image: confluentinc/cp-kafka:7.3.2 + ports: + - "9092:9092" + - "29092:29092" + depends_on: + - zookeeper + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://0.0.0.0:29092 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + ALLOW_PLAINTEXT_LISTENER: 'yes' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + + kafka-ui: + image: provectuslabs/kafka-ui:latest + depends_on: + - kafka + ports: + - "9200:8080" + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 + KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181 +networks: + ldes: \ No newline at end of file diff --git a/docs/_configuration/event-stream.md b/docs/_configuration/event-stream.md index 6b3fe92f66..044144c488 100644 --- a/docs/_configuration/event-stream.md +++ b/docs/_configuration/event-stream.md @@ -64,6 +64,10 @@ server:generic-eventstream a ldes:EventStream ; genericES:shape a sh:NodeShape . ```` +## Configuring an Event Stream with a Kafka source + +To configure an Event Stream that ingests members from a Kafka topic, please visit the [Kafka Ingest documentation](../ingest/kafka). + ## Configuring a SHACL Shape [SHACL (Shapes Constraint Language)](https://www.w3.org./TR/shacl/) is a standard for validating RDF data and ensuring diff --git a/docs/_ingest/kafka.md b/docs/_ingest/kafka.md new file mode 100644 index 0000000000..4c9bf7e578 --- /dev/null +++ b/docs/_ingest/kafka.md @@ -0,0 +1,70 @@ +--- +layout: default +title: Ingest Members With Kafka +nav_order: 0 +--- + +# Ingest Members With Kafka + +Ingesting members into an Event Stream without too much overhead? +That's now possible via ingestion over Apache Kafka. + +## Getting Started + +To get started with ingesting members via Kafka, you need to have the following: +* Kafka consumer configuration (in the Application Properties) +* Event Stream configuration pointing to a Kafka topic (in the Admin API) + +### Application Properties + +The Kafka consumer configuration can be set in the `application.properties` file. + +The most basic properties that are needed are: +````yaml +spring.kafka.consumer.bootstrap-servers=localhost:9092 +spring.kafka.consumer.group-id=my-group +```` + +To guarantee that the Kafka consumer will always read from the beginning of the topic, you can add the following property: +````yaml +spring.kafka.consumer.auto-offset-reset=earliest +```` + +For more advanced options to configure advanced Kafka connections, please refer to the [Spring Kafka documentation](https://docs.spring.io/spring-boot/appendix/application-properties/index.html). + +### Event Stream Configuration + +To configure a new Event Stream that uses Kafka as the ingestion method, +you need to create an Event Stream configuration that points to a Kafka topic. \ +This can be done by adding a `https://w3id.org/ldes#KafkaEventStream` object to the Event Stream configuration. + +This object should contain the following properties: +* `ldes:topic` - The Kafka topic to which the members should be ingested. +* `ldes:mimeType` - The mime type in which the data of your topic will be. This is used to parse your member to a model. \ + This can be `application/ld+json`, `application/json`, `text/turtle`, ... \ + All members in your topic need to therefor be in one mime type. + +#### Example + +Creating a generic Event Stream named "event-stream" that uses Kafka as the ingestion method. + +````turtle +@prefix ldes: . +@prefix dcterms: . +@prefix prov: . +@prefix tree: . +@prefix sh: . +@prefix server: . +@prefix xsd: . +@prefix event-stream: . + +server:event-stream a ldes:EventStream ; + ldes:timestampPath dcterms:created ; + ldes:versionOfPath dcterms:isVersionOf ; + tree:shape [ a sh:NodeShape ] ; + ldes:kafkaSource [ + ldes:topic "testTopic" ; + ldes:mimeType "application/n-quads" ; + ] . + +```` \ No newline at end of file diff --git a/ldes-server-admin/ldes-server-admin-common/pom.xml b/ldes-server-admin/ldes-server-admin-common/pom.xml index 5c19354984..d579b52474 100644 --- a/ldes-server-admin/ldes-server-admin-common/pom.xml +++ b/ldes-server-admin/ldes-server-admin-common/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server-admin - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT ldes-server-admin-common diff --git a/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/domain/eventsource/services/EventSourceServiceImpl.java b/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/domain/eventsource/services/EventSourceServiceImpl.java index 7ab31b5bd0..d41d5ce1af 100644 --- a/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/domain/eventsource/services/EventSourceServiceImpl.java +++ b/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/domain/eventsource/services/EventSourceServiceImpl.java @@ -44,6 +44,6 @@ public void initViews() { repository .getAllEventSources() .forEach(eventSource -> eventPublisher - .publishEvent(new DeletionPolicyChangedEvent(eventSource.getCollectionName(), eventSource.getRetentionPolicies()))); + .publishEvent(new DeletionPolicyChangedEvent(eventSource.collectionName(), eventSource.retentionPolicies()))); } } diff --git a/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/domain/eventstream/repository/EventStreamRepository.java b/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/domain/eventstream/repository/EventStreamRepository.java index ef9f344e0f..aa48159fae 100644 --- a/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/domain/eventstream/repository/EventStreamRepository.java +++ b/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/domain/eventstream/repository/EventStreamRepository.java @@ -15,7 +15,7 @@ public interface EventStreamRepository { Optional retrieveEventStreamTO(String collectionName); - void saveEventStream(EventStreamTO eventStreamTO); + Integer saveEventStream(EventStreamTO eventStreamTO); int deleteEventStream(String collectionName); diff --git a/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/domain/eventstream/services/EventStreamServiceImpl.java b/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/domain/eventstream/services/EventStreamServiceImpl.java index 6e2bc0896d..9cf6cbd644 100644 --- a/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/domain/eventstream/services/EventStreamServiceImpl.java +++ b/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/domain/eventstream/services/EventStreamServiceImpl.java @@ -3,11 +3,13 @@ import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.dcat.dcatserver.services.DcatServerService; import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.eventsource.services.EventSourceService; import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.eventstream.repository.EventStreamRepository; +import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.kafkasource.KafkaSourceRepository; import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.view.service.ViewValidator; import be.vlaanderen.informatievlaanderen.ldes.server.admin.spi.EventStreamTO; import be.vlaanderen.informatievlaanderen.ldes.server.domain.events.admin.*; import be.vlaanderen.informatievlaanderen.ldes.server.domain.exceptions.MissingResourceException; import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.EventStream; +import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.KafkaSourceProperties; import org.apache.jena.rdf.model.Model; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationEventPublisher; @@ -15,20 +17,23 @@ import org.springframework.stereotype.Service; import java.util.List; +import java.util.Optional; @Service public class EventStreamServiceImpl implements EventStreamService { public static final String RESOURCE_TYPE = "eventstream"; private final EventStreamRepository eventStreamRepository; + private final KafkaSourceRepository kafkaSourceRepository; private final DcatServerService dcatServerService; private final EventSourceService eventSourceService; private final ApplicationEventPublisher eventPublisher; private final ViewValidator viewValidator; - public EventStreamServiceImpl(EventStreamRepository eventStreamRepository, + public EventStreamServiceImpl(EventStreamRepository eventStreamRepository, KafkaSourceRepository kafkaSourceRepository, DcatServerService dcatServerService, EventSourceService eventSourceService, ApplicationEventPublisher eventPublisher, ViewValidator viewValidator) { this.eventStreamRepository = eventStreamRepository; + this.kafkaSourceRepository = kafkaSourceRepository; this.dcatServerService = dcatServerService; this.eventSourceService = eventSourceService; this.eventPublisher = eventPublisher; @@ -60,8 +65,9 @@ public EventStreamTO createEventStream(EventStreamTO eventStreamTO) { checkCollectionDoesNotYetExist(eventStreamTO.getCollection()); eventStreamTO.getViews().forEach(viewValidator::validateView); - eventStreamRepository.saveEventStream(eventStreamTO); + var eventStreamId = eventStreamRepository.saveEventStream(eventStreamTO); publishEventStreamTOCreatedEvents(eventStreamTO); + publishKafkaSource(eventStreamTO.getKafkaSourceProperties(), eventStreamId); return eventStreamTO; } @@ -105,6 +111,9 @@ public void initEventStream() { eventStreamRepository.retrieveAllEventStreams().stream() .map(EventStreamCreatedEvent::new) .forEach(eventPublisher::publishEvent); + kafkaSourceRepository.getAll().stream() + .map(KafkaSourceAddedEvent::new) + .forEach(eventPublisher::publishEvent); } private void publishEventStreamTOCreatedEvents(EventStreamTO eventStreamTO) { @@ -113,4 +122,11 @@ private void publishEventStreamTOCreatedEvents(EventStreamTO eventStreamTO) { eventPublisher.publishEvent(new DeletionPolicyChangedEvent(eventStreamTO.getCollection(), eventStreamTO.getEventSourceRetentionPolicies())); } + private void publishKafkaSource(Optional kafkaSourceProperties, Integer eventStreamId) { + if (kafkaSourceProperties.isPresent()) { + kafkaSourceRepository.save(kafkaSourceProperties.get(), eventStreamId); + eventPublisher.publishEvent(new KafkaSourceAddedEvent(kafkaSourceProperties.get())); + } + } + } diff --git a/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/domain/kafkasource/KafkaSourceRepository.java b/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/domain/kafkasource/KafkaSourceRepository.java new file mode 100644 index 0000000000..213d21debf --- /dev/null +++ b/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/domain/kafkasource/KafkaSourceRepository.java @@ -0,0 +1,10 @@ +package be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.kafkasource; + +import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.KafkaSourceProperties; + +import java.util.List; + +public interface KafkaSourceRepository { + void save(KafkaSourceProperties kafkaSource, Integer eventStreamId); + List getAll(); +} diff --git a/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/EventStreamReader.java b/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/EventStreamReader.java index ea35f30d23..372102b9d8 100644 --- a/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/EventStreamReader.java +++ b/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/EventStreamReader.java @@ -1,6 +1,7 @@ package be.vlaanderen.informatievlaanderen.ldes.server.admin.spi; import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.eventstream.exceptions.MissingStatementException; +import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.KafkaSourceProperties; import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.ViewSpecification; import org.apache.jena.rdf.model.*; import org.springframework.stereotype.Component; @@ -18,15 +19,20 @@ public class EventStreamReader { private final ViewSpecificationConverter viewSpecificationConverter; private final RetentionModelExtractor retentionModelExtractor; + private final KafkaSourceReader kafkaSourceReader; - public EventStreamReader(ViewSpecificationConverter viewSpecificationConverter, RetentionModelExtractor retentionModelExtractor) { + public EventStreamReader(ViewSpecificationConverter viewSpecificationConverter, RetentionModelExtractor retentionModelExtractor, KafkaSourceReader kafkaSourceReader) { this.viewSpecificationConverter = viewSpecificationConverter; this.retentionModelExtractor = retentionModelExtractor; + this.kafkaSourceReader = kafkaSourceReader; } public EventStreamTO read(Model model) { final String collection = getIdentifier(model, createResource(EVENT_STREAM_TYPE)).map(Resource::getLocalName) .orElseThrow(() -> new MissingStatementException("Not blank node with type " + EVENT_STREAM_TYPE)); + + final KafkaSourceProperties kafkaSourceProperties = kafkaSourceReader.readKafkaSourceProperties(collection, model); + return new EventStreamTO.Builder() .withCollection(collection) .withTimestampPath(getResource(model, LDES_TIMESTAMP_PATH).orElse(null)) @@ -36,6 +42,7 @@ public EventStreamTO read(Model model) { .withViews(getViews(model, collection)) .withShacl(getShaclFromModel(model)) .withEventSourceRetentionPolicies(getEventSourceRetentionPolicies(model)) + .withKafkaSourceProperties(kafkaSourceProperties) .build(); } diff --git a/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/EventStreamTO.java b/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/EventStreamTO.java index 739364a162..0aa8c3f0ad 100644 --- a/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/EventStreamTO.java +++ b/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/EventStreamTO.java @@ -3,6 +3,7 @@ import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.dcat.dcatdataset.entities.DcatDataset; import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.eventstream.exceptions.InvalidSkolemisationDomainException; import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.EventStream; +import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.KafkaSourceProperties; import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.VersionCreationProperties; import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.ViewSpecification; import org.apache.jena.rdf.model.Model; @@ -10,6 +11,7 @@ import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -18,6 +20,7 @@ public class EventStreamTO { private final String timestampPath; private final String versionOfPath; private final VersionCreationProperties versionCreationProperties; + private final Optional kafkaSourceProperties; private final boolean closed; private final String skolemizationDomain; private final List views; @@ -36,6 +39,7 @@ private EventStreamTO(Builder builder) { shacl = builder.shacl; eventSourceRetentionPolicies = builder.eventSourceRetentionPolicies; dcatDataset = builder.dcatDataset != null ? builder.dcatDataset : new DcatDataset(builder.collection); + kafkaSourceProperties = Optional.ofNullable(builder.kafkaSourceProperties); } public String getCollection() { @@ -58,6 +62,10 @@ public boolean isVersionCreationEnabled() { return versionCreationProperties.isVersionCreationEnabled(); } + public Optional getKafkaSourceProperties() { + return kafkaSourceProperties; + } + public boolean isClosed() { return closed; } @@ -114,6 +122,7 @@ public static final class Builder { private String timestampPath; private String versionOfPath; private VersionCreationProperties versionCreationProperties = VersionCreationProperties.disabled(); + private KafkaSourceProperties kafkaSourceProperties; private boolean closed = false; private String skolemizationDomain; private List views = List.of(); @@ -146,6 +155,11 @@ public Builder withVersionOfPath(String val) { return this; } + public Builder withKafkaSourceProperties(KafkaSourceProperties val) { + kafkaSourceProperties = val; + return this; + } + public Builder withVersionDelimiter(String val) { versionCreationProperties = VersionCreationProperties.ofNullableDelimiter(val); return this; diff --git a/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/KafkaSourceReader.java b/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/KafkaSourceReader.java new file mode 100644 index 0000000000..e511ba6f2b --- /dev/null +++ b/ldes-server-admin/ldes-server-admin-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/KafkaSourceReader.java @@ -0,0 +1,57 @@ +package be.vlaanderen.informatievlaanderen.ldes.server.admin.spi; + +import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.KafkaSourceProperties; +import org.apache.jena.rdf.model.Model; +import org.springframework.beans.factory.support.DefaultListableBeanFactory; +import org.springframework.stereotype.Component; + +import java.util.NoSuchElementException; + +import static be.vlaanderen.informatievlaanderen.ldes.server.domain.constants.RdfConstants.LDES; +import static org.apache.jena.rdf.model.ResourceFactory.createProperty; + +@Component +public class KafkaSourceReader { + private static final String KAFKA_SOURCE = "kafkaSource"; + private static final String KAFKA_BEAN = "kafkaListenerContainerManager"; + private static final String KAFKA_TOPIC = "topic"; + private static final String KAFKA_MIME_TYPE = "mimeType"; + private final DefaultListableBeanFactory beanFactory; + + public KafkaSourceReader(DefaultListableBeanFactory beanFactory) { + this.beanFactory = beanFactory; + } + + public KafkaSourceProperties readKafkaSourceProperties(String collection, Model model) { + + var kafkaSourceStmts = model.listObjectsOfProperty(null, createProperty(LDES, KAFKA_SOURCE)); + if (!kafkaSourceStmts.hasNext()) { + return null; + } + + checkKafkaIngestModuleEnabled(); + + try { + var kafkaSource = kafkaSourceStmts.next(); + String topic = model.listObjectsOfProperty(kafkaSource.asResource(), createProperty(LDES, KAFKA_TOPIC)) + .next() + .asLiteral() + .getString(); + String mimeType = model.listObjectsOfProperty(kafkaSource.asResource(), createProperty(LDES, KAFKA_MIME_TYPE)) + .next() + .asLiteral() + .getString(); + + return new KafkaSourceProperties(collection, topic, mimeType); + } + catch (NoSuchElementException e) { + throw new IllegalArgumentException("KafkaSource properties are missing"); + } + } + + private void checkKafkaIngestModuleEnabled() { + if (!beanFactory.containsBean(KAFKA_BEAN)) { + throw new IllegalStateException("Kafka Ingest module is not enabled"); + } + } +} diff --git a/ldes-server-admin/ldes-server-admin-common/src/main/java/module-info.java b/ldes-server-admin/ldes-server-admin-common/src/main/java/module-info.java index 832f2ac6ea..5a8da52148 100644 --- a/ldes-server-admin/ldes-server-admin-common/src/main/java/module-info.java +++ b/ldes-server-admin/ldes-server-admin-common/src/main/java/module-info.java @@ -12,6 +12,7 @@ exports be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.eventstream.exceptions; exports be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.eventstream.repository; exports be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.eventstream.services; + exports be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.kafkasource; exports be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.shacl.entities; exports be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.shacl.repository; exports be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.shacl.services; diff --git a/ldes-server-admin/ldes-server-admin-common/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/domain/eventstream/services/EventStreamServiceImplTest.java b/ldes-server-admin/ldes-server-admin-common/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/domain/eventstream/services/EventStreamServiceImplTest.java index b86af54224..70680b4171 100644 --- a/ldes-server-admin/ldes-server-admin-common/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/domain/eventstream/services/EventStreamServiceImplTest.java +++ b/ldes-server-admin/ldes-server-admin-common/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/domain/eventstream/services/EventStreamServiceImplTest.java @@ -5,6 +5,7 @@ import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.dcat.dcatserver.services.DcatServerService; import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.eventsource.services.EventSourceService; import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.eventstream.repository.EventStreamRepository; +import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.kafkasource.KafkaSourceRepository; import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.shacl.services.ShaclShapeService; import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.view.exception.DuplicateRetentionException; import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.view.service.ViewValidator; @@ -56,6 +57,8 @@ class EventStreamServiceImplTest { @Mock private EventStreamRepository eventStreamRepository; @Mock + private KafkaSourceRepository kafkaSourceRepository; + @Mock private ApplicationEventPublisher eventPublisher; @Captor ArgumentCaptor deletedEventArgumentCaptor; diff --git a/ldes-server-admin/ldes-server-admin-common/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/EventStreamReaderTest.java b/ldes-server-admin/ldes-server-admin-common/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/EventStreamReaderTest.java index 19b40e0de6..3b3e0b3aa3 100644 --- a/ldes-server-admin/ldes-server-admin-common/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/EventStreamReaderTest.java +++ b/ldes-server-admin/ldes-server-admin-common/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/EventStreamReaderTest.java @@ -22,6 +22,7 @@ import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; class EventStreamReaderTest { public static final String TIMESTAMP_PATH = "http://purl.org/dc/terms/created"; @@ -36,7 +37,8 @@ void setUp() { RetentionModelExtractor retentionModelExtractor = new RetentionModelExtractor(); ViewSpecificationConverter viewSpecificationConverter = new ViewSpecificationConverter(retentionModelExtractor, new FragmentationConfigExtractor(), prefixConstructor); - eventStreamReader = new EventStreamReader(viewSpecificationConverter, retentionModelExtractor); + KafkaSourceReader kafkaSourceReader = mock(KafkaSourceReader.class); + eventStreamReader = new EventStreamReader(viewSpecificationConverter, retentionModelExtractor, kafkaSourceReader); shacl = RDFDataMgr.loadModel("shacl/collection-shape.ttl"); } diff --git a/ldes-server-admin/ldes-server-admin-common/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/EventStreamTOTest.java b/ldes-server-admin/ldes-server-admin-common/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/EventStreamTOTest.java index 142d2ed734..44a8a482a5 100644 --- a/ldes-server-admin/ldes-server-admin-common/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/EventStreamTOTest.java +++ b/ldes-server-admin/ldes-server-admin-common/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/EventStreamTOTest.java @@ -4,6 +4,7 @@ import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.eventstream.exceptions.InvalidSkolemisationDomainException; import be.vlaanderen.informatievlaanderen.ldes.server.domain.constants.RdfConstants; import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.EventStream; +import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.KafkaSourceProperties; import org.apache.jena.rdf.model.ModelFactory; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtensionContext; @@ -19,6 +20,8 @@ import static org.apache.jena.rdf.model.ResourceFactory.createResource; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; class EventStreamTOTest { private static final String COLLECTION = "collection"; @@ -65,6 +68,19 @@ void test_invalid_skolem_domain() { .hasMessage("Invalid Skolemisation Domain. Should be URI. Provided skolemizationDomain : example.com"); } + @Test + void test_kafka_props() { + var eventStreamTO = getBaseBuilder() + .withKafkaSourceProperties(new KafkaSourceProperties("collection", "topic", "mimeType")) + .build(); + + assertTrue(eventStreamTO.getKafkaSourceProperties().isPresent()); + var kafkaSourceProperties = eventStreamTO.getKafkaSourceProperties().get(); + assertEquals("collection", kafkaSourceProperties.collection()); + assertEquals("topic", kafkaSourceProperties.topic()); + assertEquals("mimeType", kafkaSourceProperties.mimeType()); + } + static class EventStreamResponseArgumentsProvider implements ArgumentsProvider { @Override diff --git a/ldes-server-admin/ldes-server-admin-common/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/KafkaSourceReaderTest.java b/ldes-server-admin/ldes-server-admin-common/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/KafkaSourceReaderTest.java new file mode 100644 index 0000000000..7188036086 --- /dev/null +++ b/ldes-server-admin/ldes-server-admin-common/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/spi/KafkaSourceReaderTest.java @@ -0,0 +1,57 @@ +package be.vlaanderen.informatievlaanderen.ldes.server.admin.spi; + +import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.KafkaSourceProperties; +import org.apache.jena.rdf.model.Model; +import org.apache.jena.rdf.model.ModelFactory; +import org.apache.jena.riot.RDFDataMgr; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.springframework.beans.factory.support.DefaultListableBeanFactory; + +import static org.junit.jupiter.api.Assertions.*; + +class KafkaSourceReaderTest { + + private KafkaSourceReader kafkaSourceReader; + private DefaultListableBeanFactory beanFactory; + + @BeforeEach + void setup() { + beanFactory = new DefaultListableBeanFactory(); + kafkaSourceReader = new KafkaSourceReader(beanFactory); + } + + @Test + void readKafkaSourceProperties_NoKafkaSourcePresent() { + KafkaSourceProperties kafkaSourceProperties = kafkaSourceReader.readKafkaSourceProperties("collection", ModelFactory.createDefaultModel()); + assertNull(kafkaSourceProperties); + } + + @Test + void readKafkaSourceProperties_KafkaIngestNotEnabled() { + final Model eventStreamModel = RDFDataMgr.loadModel("eventstream/streams/with-kafka-source/ldes-with-kafkaES.ttl"); + assertThrows(IllegalStateException.class, ()-> kafkaSourceReader.readKafkaSourceProperties("collection", eventStreamModel)); + } + + @Test + void readKafkaSourceProperties_KafkaIngestEnabled() { + beanFactory.registerSingleton("kafkaListenerContainerManager", Object.class); + final Model eventStreamModel = RDFDataMgr.loadModel("eventstream/streams/with-kafka-source/ldes-with-kafkaES.ttl"); + KafkaSourceProperties kafkaSourceProperties = kafkaSourceReader.readKafkaSourceProperties("collection", eventStreamModel); + assertNotNull(kafkaSourceProperties); + assertEquals("testTopic", kafkaSourceProperties.topic()); + assertEquals("application/n-quads", kafkaSourceProperties.mimeType()); + assertEquals("collection", kafkaSourceProperties.collection()); + } + + @ParameterizedTest() + @ValueSource(strings = {"ldes-with-kafkaES-noTopic.ttl", "ldes-with-kafkaES-noMime.ttl"}) + void readKafkaSourceProperties_PropertyMissing(String fileName) { + beanFactory.registerSingleton("kafkaListenerContainerManager", Object.class); + final Model eventStreamModel = RDFDataMgr.loadModel("eventstream/streams/with-kafka-source/" + fileName); + assertThrows(IllegalArgumentException.class, ()-> kafkaSourceReader.readKafkaSourceProperties("collection", eventStreamModel)); + } + +} \ No newline at end of file diff --git a/ldes-server-admin/ldes-server-admin-common/src/test/resources/eventstream/streams/with-kafka-source/ldes-with-kafkaES-noMime.ttl b/ldes-server-admin/ldes-server-admin-common/src/test/resources/eventstream/streams/with-kafka-source/ldes-with-kafkaES-noMime.ttl new file mode 100644 index 0000000000..652fee5444 --- /dev/null +++ b/ldes-server-admin/ldes-server-admin-common/src/test/resources/eventstream/streams/with-kafka-source/ldes-with-kafkaES-noMime.ttl @@ -0,0 +1,8 @@ +@prefix ns0: . +@prefix dc: . + + + a ; + ns0:timestampPath dc:created ; + ns0:versionOfPath dc:isVersionOf ; + ns0:kafkaSource [ ns0:topic "testTopic" ] . \ No newline at end of file diff --git a/ldes-server-admin/ldes-server-admin-common/src/test/resources/eventstream/streams/with-kafka-source/ldes-with-kafkaES-noTopic.ttl b/ldes-server-admin/ldes-server-admin-common/src/test/resources/eventstream/streams/with-kafka-source/ldes-with-kafkaES-noTopic.ttl new file mode 100644 index 0000000000..0d6155e7c1 --- /dev/null +++ b/ldes-server-admin/ldes-server-admin-common/src/test/resources/eventstream/streams/with-kafka-source/ldes-with-kafkaES-noTopic.ttl @@ -0,0 +1,8 @@ +@prefix ns0: . +@prefix dc: . + + + a ; + ns0:timestampPath dc:created ; + ns0:versionOfPath dc:isVersionOf ; + ns0:kafkaSource [ ns0:mimeType "application/n-quads" ] . \ No newline at end of file diff --git a/ldes-server-admin/ldes-server-admin-common/src/test/resources/eventstream/streams/with-kafka-source/ldes-with-kafkaES.ttl b/ldes-server-admin/ldes-server-admin-common/src/test/resources/eventstream/streams/with-kafka-source/ldes-with-kafkaES.ttl new file mode 100644 index 0000000000..6939939f6a --- /dev/null +++ b/ldes-server-admin/ldes-server-admin-common/src/test/resources/eventstream/streams/with-kafka-source/ldes-with-kafkaES.ttl @@ -0,0 +1,9 @@ +@prefix ns0: . +@prefix dc: . + + + a ; + ns0:timestampPath dc:created ; + ns0:versionOfPath dc:isVersionOf ; + ns0:kafkaSource [ ns0:topic "testTopic" ; + ns0:mimeType "application/n-quads" ] . \ No newline at end of file diff --git a/ldes-server-admin/ldes-server-admin-rest/pom.xml b/ldes-server-admin/ldes-server-admin-rest/pom.xml index 5ac2fc9a8d..fbea691f5a 100644 --- a/ldes-server-admin/ldes-server-admin-rest/pom.xml +++ b/ldes-server-admin/ldes-server-admin-rest/pom.xml @@ -5,7 +5,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server-admin - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT diff --git a/ldes-server-admin/ldes-server-admin-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/rest/config/SpringIntegrationTest.java b/ldes-server-admin/ldes-server-admin-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/rest/config/SpringIntegrationTest.java index f773dee878..c4a75dedf1 100644 --- a/ldes-server-admin/ldes-server-admin-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/rest/config/SpringIntegrationTest.java +++ b/ldes-server-admin/ldes-server-admin-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/rest/config/SpringIntegrationTest.java @@ -5,6 +5,7 @@ import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.eventsource.repository.EventSourceRepository; import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.eventsource.services.EventSourceServiceImpl; import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.eventstream.repository.EventStreamRepository; +import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.kafkasource.KafkaSourceRepository; import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.shacl.repository.ShaclShapeRepository; import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.view.repository.DcatViewRepository; import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.view.repository.ViewRepository; @@ -76,6 +77,9 @@ public class SpringIntegrationTest { @MockBean protected ShaclShapeRepository shaclShapeRepository; + @MockBean + protected KafkaSourceRepository kafkaSourceRepository; + @Autowired protected ApplicationEventPublisher eventPublisher; diff --git a/ldes-server-admin/ldes-server-admin-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/rest/controllers/AdminEventStreamsRestControllerTest.java b/ldes-server-admin/ldes-server-admin-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/rest/controllers/AdminEventStreamsRestControllerTest.java index 68bd4356ba..61f532d08e 100644 --- a/ldes-server-admin/ldes-server-admin-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/rest/controllers/AdminEventStreamsRestControllerTest.java +++ b/ldes-server-admin/ldes-server-admin-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/rest/controllers/AdminEventStreamsRestControllerTest.java @@ -59,7 +59,7 @@ @ActiveProfiles({"test", "rest"}) @ContextConfiguration(classes = {AdminEventStreamsRestController.class, HttpModelConverter.class, EventStreamListHttpConverter.class, EventStreamHttpConverter.class, - EventStreamWriter.class, EventStreamReader.class, + EventStreamWriter.class, EventStreamReader.class, KafkaSourceReader.class, ViewSpecificationConverter.class, PrefixAdderImpl.class, ValidatorsConfig.class, AdminRestResponseEntityExceptionHandler.class, RetentionModelExtractor.class, CharsetEncodingConfig.class, FragmentationConfigExtractor.class, PrefixConstructor.class, RdfModelConverter.class, AdminVersionHeaderControllerAdvice.class}) diff --git a/ldes-server-admin/pom.xml b/ldes-server-admin/pom.xml index 58cfe60fcf..ad947b9d71 100644 --- a/ldes-server-admin/pom.xml +++ b/ldes-server-admin/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT pom diff --git a/ldes-server-application/pom.xml b/ldes-server-application/pom.xml index d06ac875f5..8fcbaccce0 100644 --- a/ldes-server-application/pom.xml +++ b/ldes-server-application/pom.xml @@ -5,7 +5,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT ldes-server-application @@ -125,6 +125,21 @@ + + kafka-ingest + + + be.vlaanderen.informatievlaanderen.vsds + ldes-server-ingest-kafka + ${project.version} + + + be.vlaanderen.informatievlaanderen.vsds + postgres-ingest-repository + ${project.version} + + + http-fetch diff --git a/ldes-server-application/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/OpenApiConfig.java b/ldes-server-application/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/OpenApiConfig.java index 42ace0fec1..e1fa690c89 100644 --- a/ldes-server-application/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/OpenApiConfig.java +++ b/ldes-server-application/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/OpenApiConfig.java @@ -2,38 +2,50 @@ import io.swagger.v3.oas.models.info.Info; import org.springdoc.core.models.GroupedOpenApi; +import org.springframework.boot.info.BuildProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class OpenApiConfig { - private static final String VERSION = "3.1.0"; - + private static final String ADMIN_PACKAGE = "be.vlaanderen.informatievlaanderen.ldes.server.admin.rest"; + private static final String KAFKA_DEBUG_PACKAGE = "be.vlaanderen.informatievlaanderen.ldes.server.ingest.kafka"; @Bean - public GroupedOpenApi adminGroup() { - String[] packages = {"be.vlaanderen.informatievlaanderen.ldes.server.admin.rest"}; + public GroupedOpenApi adminGroup(BuildProperties buildProperties) { return GroupedOpenApi.builder() .group("admin") .addOpenApiCustomizer(openApi -> openApi.info(new Info() .title("LDES Server Admin API") - .version(VERSION) + .version(buildProperties.getVersion()) .description("This API makes it possible to manage an LDES Server") )) - .packagesToScan(packages) + .packagesToScan(ADMIN_PACKAGE) + .build(); + } + + @Bean + public GroupedOpenApi kafkaDebugGroup(BuildProperties buildProperties) { + return GroupedOpenApi.builder() + .group("kafka-debug") + .addOpenApiCustomizer(openApi -> openApi.info(new Info() + .title("LDES Server Kafka Debug API") + .version(buildProperties.getVersion()) + .description("Purely meant as a developer debugging tool") + )) + .packagesToScan(KAFKA_DEBUG_PACKAGE) .build(); } @Bean - public GroupedOpenApi defaultGroup() { - String[] packages = {"be.vlaanderen.informatievlaanderen.ldes.server.admin.rest"}; + public GroupedOpenApi defaultGroup(BuildProperties buildProperties) { return GroupedOpenApi.builder() .group("base") .addOpenApiCustomizer(openApi -> openApi.info(new Info() .title("Ingest and Fetch endpoints") - .version(VERSION) + .version(buildProperties.getVersion()) .description("These API endpoints are available to ingest and fetch linked data") )) - .packagesToExclude(packages) + .packagesToExclude(ADMIN_PACKAGE, KAFKA_DEBUG_PACKAGE) .build(); } } diff --git a/ldes-server-domain/pom.xml b/ldes-server-domain/pom.xml index 91adc29ce1..954451851b 100644 --- a/ldes-server-domain/pom.xml +++ b/ldes-server-domain/pom.xml @@ -5,7 +5,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT ldes-server-domain diff --git a/ldes-server-domain/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/domain/events/admin/KafkaSourceAddedEvent.java b/ldes-server-domain/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/domain/events/admin/KafkaSourceAddedEvent.java new file mode 100644 index 0000000000..13ba088f37 --- /dev/null +++ b/ldes-server-domain/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/domain/events/admin/KafkaSourceAddedEvent.java @@ -0,0 +1,6 @@ +package be.vlaanderen.informatievlaanderen.ldes.server.domain.events.admin; + +import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.KafkaSourceProperties; + +public record KafkaSourceAddedEvent(KafkaSourceProperties kafkaSource) { +} diff --git a/ldes-server-domain/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/domain/model/EventSource.java b/ldes-server-domain/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/domain/model/EventSource.java index ca0cd91c65..4a0ceb3159 100644 --- a/ldes-server-domain/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/domain/model/EventSource.java +++ b/ldes-server-domain/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/domain/model/EventSource.java @@ -5,22 +5,7 @@ import java.util.List; import java.util.Objects; -public class EventSource { - private final String collectionName; - private final List retentionPolicies; - - public EventSource(String collectionName, List retentionPolicies) { - this.collectionName = collectionName; - this.retentionPolicies = retentionPolicies; - } - - public String getCollectionName() { - return collectionName; - } - - public List getRetentionPolicies() { - return retentionPolicies; - } +public record EventSource(String collectionName, List retentionPolicies) { @Override public boolean equals(Object o) { @@ -29,7 +14,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; EventSource eventSource = (EventSource) o; - return collectionName.equals(eventSource.getCollectionName()); + return collectionName.equals(eventSource.collectionName()); } @Override diff --git a/ldes-server-domain/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/domain/model/KafkaSourceProperties.java b/ldes-server-domain/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/domain/model/KafkaSourceProperties.java new file mode 100644 index 0000000000..81cb50cd9f --- /dev/null +++ b/ldes-server-domain/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/domain/model/KafkaSourceProperties.java @@ -0,0 +1,4 @@ +package be.vlaanderen.informatievlaanderen.ldes.server.domain.model; + +public record KafkaSourceProperties (String collection, String topic, String mimeType) { +} diff --git a/ldes-server-fetch/ldes-server-fetch-common/pom.xml b/ldes-server-fetch/ldes-server-fetch-common/pom.xml index 7d86b3ec8d..5562679dd4 100644 --- a/ldes-server-fetch/ldes-server-fetch-common/pom.xml +++ b/ldes-server-fetch/ldes-server-fetch-common/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server-fetch - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT ldes-server-fetch-common diff --git a/ldes-server-fetch/ldes-server-fetch-rest/pom.xml b/ldes-server-fetch/ldes-server-fetch-rest/pom.xml index 344da0dd9b..97350ec920 100644 --- a/ldes-server-fetch/ldes-server-fetch-rest/pom.xml +++ b/ldes-server-fetch/ldes-server-fetch-rest/pom.xml @@ -5,7 +5,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server-fetch - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT ldes-server-fetch-rest diff --git a/ldes-server-fetch/ldes-server-fetch-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/rest/eventstream/EventStreamControllerTest.java b/ldes-server-fetch/ldes-server-fetch-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/rest/eventstream/EventStreamControllerTest.java index 99a86d731d..4f1e839221 100644 --- a/ldes-server-fetch/ldes-server-fetch-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/rest/eventstream/EventStreamControllerTest.java +++ b/ldes-server-fetch/ldes-server-fetch-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/rest/eventstream/EventStreamControllerTest.java @@ -63,7 +63,7 @@ @ActiveProfiles({"test", "rest"}) @Import({EventStreamControllerTest.EventStreamControllerTestConfiguration.class}) @ContextConfiguration(classes = {EventStreamController.class, RestConfig.class, - RestResponseEntityExceptionHandler.class, EventStreamWriter.class, EventStreamReader.class, + RestResponseEntityExceptionHandler.class, EventStreamWriter.class, EventStreamReader.class, KafkaSourceReader.class, ViewSpecificationConverter.class, PrefixAdderImpl.class, EventStreamResponseHttpConverter.class, RetentionModelExtractor.class, HttpModelConverter.class, FragmentationConfigExtractor.class, PrefixConstructor.class, RdfModelConverter.class, VersionHeaderControllerAdvice.class diff --git a/ldes-server-fetch/pom.xml b/ldes-server-fetch/pom.xml index a25422d5b0..f58a4b0de0 100644 --- a/ldes-server-fetch/pom.xml +++ b/ldes-server-fetch/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT pom ldes-server-fetch diff --git a/ldes-server-fragmentation/ldes-server-fragmentation-common/pom.xml b/ldes-server-fragmentation/ldes-server-fragmentation-common/pom.xml index 3d9ecede3f..bccd16d102 100644 --- a/ldes-server-fragmentation/ldes-server-fragmentation-common/pom.xml +++ b/ldes-server-fragmentation/ldes-server-fragmentation-common/pom.xml @@ -5,7 +5,7 @@ ldes-server-fragmentation be.vlaanderen.informatievlaanderen.vsds - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT 4.0.0 diff --git a/ldes-server-fragmentation/ldes-server-fragmentation-geospatial/pom.xml b/ldes-server-fragmentation/ldes-server-fragmentation-geospatial/pom.xml index 52cbe74d8b..4ca2f0d30a 100644 --- a/ldes-server-fragmentation/ldes-server-fragmentation-geospatial/pom.xml +++ b/ldes-server-fragmentation/ldes-server-fragmentation-geospatial/pom.xml @@ -3,7 +3,7 @@ ldes-server-fragmentation be.vlaanderen.informatievlaanderen.vsds - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT 4.0.0 diff --git a/ldes-server-fragmentation/ldes-server-fragmentation-reference/pom.xml b/ldes-server-fragmentation/ldes-server-fragmentation-reference/pom.xml index 2076ca036f..83509025a8 100644 --- a/ldes-server-fragmentation/ldes-server-fragmentation-reference/pom.xml +++ b/ldes-server-fragmentation/ldes-server-fragmentation-reference/pom.xml @@ -5,7 +5,7 @@ ldes-server-fragmentation be.vlaanderen.informatievlaanderen.vsds - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT 4.0.0 jar diff --git a/ldes-server-fragmentation/ldes-server-fragmentation-timebased-hierarchical/pom.xml b/ldes-server-fragmentation/ldes-server-fragmentation-timebased-hierarchical/pom.xml index 5ffc84b11d..21dc3cd052 100644 --- a/ldes-server-fragmentation/ldes-server-fragmentation-timebased-hierarchical/pom.xml +++ b/ldes-server-fragmentation/ldes-server-fragmentation-timebased-hierarchical/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server-fragmentation - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT ldes-server-fragmentation-timebased-hierarchical diff --git a/ldes-server-fragmentation/ldes-server-pagination/pom.xml b/ldes-server-fragmentation/ldes-server-pagination/pom.xml index 2d20f0414f..1a988d21c9 100644 --- a/ldes-server-fragmentation/ldes-server-pagination/pom.xml +++ b/ldes-server-fragmentation/ldes-server-pagination/pom.xml @@ -6,7 +6,7 @@ ldes-server-fragmentation be.vlaanderen.informatievlaanderen.vsds - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT ldes-server-pagination diff --git a/ldes-server-fragmentation/pom.xml b/ldes-server-fragmentation/pom.xml index ad6635d5f2..d3937ff53d 100644 --- a/ldes-server-fragmentation/pom.xml +++ b/ldes-server-fragmentation/pom.xml @@ -5,7 +5,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT ldes-server-fragmentation diff --git a/ldes-server-infra-postgres/pom.xml b/ldes-server-infra-postgres/pom.xml index 081d2f02dd..cb0cf250e1 100644 --- a/ldes-server-infra-postgres/pom.xml +++ b/ldes-server-infra-postgres/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT pom diff --git a/ldes-server-infra-postgres/postgres-admin-repository/pom.xml b/ldes-server-infra-postgres/postgres-admin-repository/pom.xml index b626ed90ae..69714d21c1 100644 --- a/ldes-server-infra-postgres/postgres-admin-repository/pom.xml +++ b/ldes-server-infra-postgres/postgres-admin-repository/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server-infra-postgres - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT postgres-admin-repository diff --git a/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventsource/EventSourcePostgresRepository.java b/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventsource/EventSourcePostgresRepository.java index e429b519f7..e1b945875c 100644 --- a/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventsource/EventSourcePostgresRepository.java +++ b/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventsource/EventSourcePostgresRepository.java @@ -23,10 +23,10 @@ public EventSourcePostgresRepository(EventSourceEntityRepository eventSourceEnti @Override public void saveEventSource(EventSource eventSource) { - eventSourceEntityRepository.findByCollectionName(eventSource.getCollectionName()) - .or(() -> eventStreamEntityRepository.findByName(eventSource.getCollectionName()).map(EventSourceEntity::new)) + eventSourceEntityRepository.findByCollectionName(eventSource.collectionName()) + .or(() -> eventStreamEntityRepository.findByName(eventSource.collectionName()).map(EventSourceEntity::new)) .ifPresent(eventSourceEntity -> { - eventSourceEntity.setRetentionPolicies(eventSource.getRetentionPolicies()); + eventSourceEntity.setRetentionPolicies(eventSource.retentionPolicies()); eventSourceEntityRepository.save(eventSourceEntity); }); } diff --git a/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventstream/EventStreamPostgresRepository.java b/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventstream/EventStreamPostgresRepository.java index 44b610f5ff..4b60dcb992 100644 --- a/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventstream/EventStreamPostgresRepository.java +++ b/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventstream/EventStreamPostgresRepository.java @@ -47,8 +47,9 @@ public Optional retrieveEventStreamTO(String collectionName) { @Override @Transactional - public void saveEventStream(EventStreamTO eventStreamTO) { - repository.save(EventStreamMapper.toEntity(eventStreamTO)); + public Integer saveEventStream(EventStreamTO eventStreamTO) { + var es = repository.save(EventStreamMapper.toEntity(eventStreamTO)); + return es.getId(); } @Override diff --git a/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventstream/entity/EventStreamEntity.java b/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventstream/entity/EventStreamEntity.java index a6c8cb6da5..e9c8f2cc13 100644 --- a/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventstream/entity/EventStreamEntity.java +++ b/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventstream/entity/EventStreamEntity.java @@ -115,4 +115,8 @@ public void setShaclShapeEntity(ShaclShapeEntity shaclShapeEntity) { public void setEventSourceEntity(EventSourceEntity eventSourceEntity) { this.eventSourceEntity = eventSourceEntity; } + + public void setId(Integer id) { + this.id = id; + } } diff --git a/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/kafkasource/KafkaSourcePostgresRepository.java b/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/kafkasource/KafkaSourcePostgresRepository.java new file mode 100644 index 0000000000..01622a36f0 --- /dev/null +++ b/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/kafkasource/KafkaSourcePostgresRepository.java @@ -0,0 +1,33 @@ +package be.vlaanderen.informatievlaanderen.ldes.server.admin.postgres.kafkasource; + +import be.vlaanderen.informatievlaanderen.ldes.server.admin.domain.kafkasource.KafkaSourceRepository; +import be.vlaanderen.informatievlaanderen.ldes.server.admin.postgres.kafkasource.entity.KafkaSourceEntity; +import be.vlaanderen.informatievlaanderen.ldes.server.admin.postgres.kafkasource.repository.KafkaSourceEntityRepository; +import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.KafkaSourceProperties; +import org.springframework.stereotype.Repository; + +import java.util.List; + +@Repository +public class KafkaSourcePostgresRepository implements KafkaSourceRepository { + private final KafkaSourceEntityRepository repository; + + public KafkaSourcePostgresRepository(KafkaSourceEntityRepository repository) { + this.repository = repository; + } + + @Override + public void save(KafkaSourceProperties kafkaSource, Integer eventStreamId) { + repository.save(new KafkaSourceEntity(eventStreamId, kafkaSource.collection(), kafkaSource.topic(), kafkaSource.mimeType())); + } + + @Override + public List getAll() { + return repository.findAll().stream() + .map(kafkaSourceEntity -> + new KafkaSourceProperties(kafkaSourceEntity.getCollection(), + kafkaSourceEntity.getTopic(), + kafkaSourceEntity.getMimeType())) + .toList(); + } +} diff --git a/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/kafkasource/entity/KafkaSourceEntity.java b/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/kafkasource/entity/KafkaSourceEntity.java new file mode 100644 index 0000000000..f44f2dbaa8 --- /dev/null +++ b/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/kafkasource/entity/KafkaSourceEntity.java @@ -0,0 +1,42 @@ +package be.vlaanderen.informatievlaanderen.ldes.server.admin.postgres.kafkasource.entity; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import jakarta.persistence.Table; + +@Entity +@Table(name = "collection_kafka_sources") +public class KafkaSourceEntity { + @Id + @Column(name = "collection_id") + private Integer collectionId; + @Column(name = "collection") + private String collection; + @Column(name = "topic") + private String topic; + @Column(name = "mime_type") + private String mimeType; + + public KafkaSourceEntity() { + } + + public KafkaSourceEntity(Integer collectionId, String collection, String topic, String mimeType) { + this.collectionId = collectionId; + this.collection = collection; + this.topic = topic; + this.mimeType = mimeType; + } + + public String getCollection() { + return collection; + } + + public String getTopic() { + return topic; + } + + public String getMimeType() { + return mimeType; + } +} diff --git a/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/kafkasource/repository/KafkaSourceEntityRepository.java b/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/kafkasource/repository/KafkaSourceEntityRepository.java new file mode 100644 index 0000000000..d6b0b08151 --- /dev/null +++ b/ldes-server-infra-postgres/postgres-admin-repository/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/kafkasource/repository/KafkaSourceEntityRepository.java @@ -0,0 +1,10 @@ +package be.vlaanderen.informatievlaanderen.ldes.server.admin.postgres.kafkasource.repository; + +import be.vlaanderen.informatievlaanderen.ldes.server.admin.postgres.kafkasource.entity.KafkaSourceEntity; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +@Repository +public interface KafkaSourceEntityRepository extends JpaRepository { + +} diff --git a/ldes-server-infra-postgres/postgres-admin-repository/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventsource/EventSourcePostgresRepositoryTest.java b/ldes-server-infra-postgres/postgres-admin-repository/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventsource/EventSourcePostgresRepositoryTest.java index b24e5428b8..2684c4f5f1 100644 --- a/ldes-server-infra-postgres/postgres-admin-repository/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventsource/EventSourcePostgresRepositoryTest.java +++ b/ldes-server-infra-postgres/postgres-admin-repository/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventsource/EventSourcePostgresRepositoryTest.java @@ -68,7 +68,7 @@ void given_MultipleEventSources_when_GetAllEventSources_then_ReturnList() { final var result = eventSourcePostgresRepository.getAllEventSources(); assertThat(result) - .map(EventSource::getCollectionName) + .map(EventSource::collectionName) .containsExactlyInAnyOrder(COLLECTION_NAME, otherCollectionName); } diff --git a/ldes-server-infra-postgres/postgres-admin-repository/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventstream/EventStreamPostgresRepositoryTest.java b/ldes-server-infra-postgres/postgres-admin-repository/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventstream/EventStreamPostgresRepositoryTest.java index a5d40906f8..b3c1127aa3 100644 --- a/ldes-server-infra-postgres/postgres-admin-repository/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventstream/EventStreamPostgresRepositoryTest.java +++ b/ldes-server-infra-postgres/postgres-admin-repository/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/admin/postgres/eventstream/EventStreamPostgresRepositoryTest.java @@ -23,6 +23,7 @@ import java.util.Optional; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.assertArg; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -179,6 +180,7 @@ void when_emptyDbQueried_then_returnEmptyOptional() { @Test void test_insertion() { final EventStreamEntity expectedEntity = createEventStreamEntity(COLLECTION_NAME); + when(eventStreamEntityRepository.save(any())).thenReturn(expectedEntity); repository.saveEventStream(EVENT_STREAM_TO); @@ -204,6 +206,7 @@ private static EventStreamEntity createEventStreamEntity(String collection) { false, SKOLEMIZATION_DOMAIN ); + eventStreamEntity.setId(1); eventStreamEntity.setShaclShapeEntity(new ShaclShapeEntity(eventStreamEntity, ModelFactory.createDefaultModel())); eventStreamEntity.setViews(List.of()); eventStreamEntity.setEventSourceEntity(new EventSourceEntity(eventStreamEntity, List.of())); diff --git a/ldes-server-infra-postgres/postgres-fetch-repository/pom.xml b/ldes-server-infra-postgres/postgres-fetch-repository/pom.xml index 7dc8d274fb..f7ce9c3eb5 100644 --- a/ldes-server-infra-postgres/postgres-fetch-repository/pom.xml +++ b/ldes-server-infra-postgres/postgres-fetch-repository/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server-infra-postgres - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT postgres-fetch-repository @@ -20,7 +20,7 @@ be.vlaanderen.informatievlaanderen.vsds postgres-admin-repository - 3.5.0-SNAPSHOT + ${project.version} compile diff --git a/ldes-server-infra-postgres/postgres-fragmentation-repository/pom.xml b/ldes-server-infra-postgres/postgres-fragmentation-repository/pom.xml index e63b14d8ae..4d37d69d72 100644 --- a/ldes-server-infra-postgres/postgres-fragmentation-repository/pom.xml +++ b/ldes-server-infra-postgres/postgres-fragmentation-repository/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server-infra-postgres - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT postgres-fragmentation-repository diff --git a/ldes-server-infra-postgres/postgres-ingest-repository/pom.xml b/ldes-server-infra-postgres/postgres-ingest-repository/pom.xml index 197593f17a..f060934c29 100644 --- a/ldes-server-infra-postgres/postgres-ingest-repository/pom.xml +++ b/ldes-server-infra-postgres/postgres-ingest-repository/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server-infra-postgres - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT postgres-ingest-repository diff --git a/ldes-server-infra-postgres/postgres-liquibase/pom.xml b/ldes-server-infra-postgres/postgres-liquibase/pom.xml index 587cae58f8..7f9c8544a8 100644 --- a/ldes-server-infra-postgres/postgres-liquibase/pom.xml +++ b/ldes-server-infra-postgres/postgres-liquibase/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server-infra-postgres - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT postgres-liquibase diff --git a/ldes-server-infra-postgres/postgres-liquibase/src/main/resources/db/changelog/3_6_0/init-collection_kafka_sources.xml b/ldes-server-infra-postgres/postgres-liquibase/src/main/resources/db/changelog/3_6_0/init-collection_kafka_sources.xml new file mode 100644 index 0000000000..24da24e447 --- /dev/null +++ b/ldes-server-infra-postgres/postgres-liquibase/src/main/resources/db/changelog/3_6_0/init-collection_kafka_sources.xml @@ -0,0 +1,28 @@ + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/ldes-server-infra-postgres/postgres-liquibase/src/main/resources/db/changelog/3_6_0/master.xml b/ldes-server-infra-postgres/postgres-liquibase/src/main/resources/db/changelog/3_6_0/master.xml new file mode 100644 index 0000000000..e460f8ee1c --- /dev/null +++ b/ldes-server-infra-postgres/postgres-liquibase/src/main/resources/db/changelog/3_6_0/master.xml @@ -0,0 +1,8 @@ + + + + + \ No newline at end of file diff --git a/ldes-server-infra-postgres/postgres-liquibase/src/main/resources/db/changelog/master.xml b/ldes-server-infra-postgres/postgres-liquibase/src/main/resources/db/changelog/master.xml index 5d260ea502..c337d8291b 100644 --- a/ldes-server-infra-postgres/postgres-liquibase/src/main/resources/db/changelog/master.xml +++ b/ldes-server-infra-postgres/postgres-liquibase/src/main/resources/db/changelog/master.xml @@ -10,5 +10,6 @@ + \ No newline at end of file diff --git a/ldes-server-infra-postgres/postgres-maintenance-repository/pom.xml b/ldes-server-infra-postgres/postgres-maintenance-repository/pom.xml index 88190c284d..2febcae196 100644 --- a/ldes-server-infra-postgres/postgres-maintenance-repository/pom.xml +++ b/ldes-server-infra-postgres/postgres-maintenance-repository/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server-infra-postgres - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT postgres-maintenance-repository diff --git a/ldes-server-infra-postgres/postgres-pagination-repository/pom.xml b/ldes-server-infra-postgres/postgres-pagination-repository/pom.xml index dec00259bb..0da9b5435d 100644 --- a/ldes-server-infra-postgres/postgres-pagination-repository/pom.xml +++ b/ldes-server-infra-postgres/postgres-pagination-repository/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server-infra-postgres - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT postgres-pagination-repository diff --git a/ldes-server-ingest/ldes-server-ingest-common/pom.xml b/ldes-server-ingest/ldes-server-ingest-common/pom.xml index 2813270599..fe859def38 100644 --- a/ldes-server-ingest/ldes-server-ingest-common/pom.xml +++ b/ldes-server-ingest/ldes-server-ingest-common/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server-ingest - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT ldes-server-ingest-common diff --git a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/IngestValidator.java b/ldes-server-ingest/ldes-server-ingest-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/validators/IngestValidator.java similarity index 62% rename from ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/IngestValidator.java rename to ldes-server-ingest/ldes-server-ingest-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/validators/IngestValidator.java index 7284f1d444..3250cd2714 100644 --- a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/IngestValidator.java +++ b/ldes-server-ingest/ldes-server-ingest-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/validators/IngestValidator.java @@ -1,4 +1,4 @@ -package be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.validators; +package be.vlaanderen.informatievlaanderen.ldes.server.ingest.validators; import org.apache.jena.rdf.model.Model; diff --git a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/ingestreportvalidator/BlankNodesValidator.java b/ldes-server-ingest/ldes-server-ingest-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/validators/ingestreportvalidator/BlankNodesValidator.java similarity index 96% rename from ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/ingestreportvalidator/BlankNodesValidator.java rename to ldes-server-ingest/ldes-server-ingest-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/validators/ingestreportvalidator/BlankNodesValidator.java index 5c5b7569a7..9adcbaed81 100644 --- a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/ingestreportvalidator/BlankNodesValidator.java +++ b/ldes-server-ingest/ldes-server-ingest-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/validators/ingestreportvalidator/BlankNodesValidator.java @@ -1,4 +1,4 @@ -package be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.validators.ingestreportvalidator; +package be.vlaanderen.informatievlaanderen.ldes.server.ingest.validators.ingestreportvalidator; import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.EventStream; import org.apache.jena.rdf.model.Model; diff --git a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/ingestreportvalidator/IngestReportValidator.java b/ldes-server-ingest/ldes-server-ingest-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/validators/ingestreportvalidator/IngestReportValidator.java similarity index 73% rename from ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/ingestreportvalidator/IngestReportValidator.java rename to ldes-server-ingest/ldes-server-ingest-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/validators/ingestreportvalidator/IngestReportValidator.java index c1ea43f9a6..bce036caae 100644 --- a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/ingestreportvalidator/IngestReportValidator.java +++ b/ldes-server-ingest/ldes-server-ingest-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/validators/ingestreportvalidator/IngestReportValidator.java @@ -1,4 +1,4 @@ -package be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.validators.ingestreportvalidator; +package be.vlaanderen.informatievlaanderen.ldes.server.ingest.validators.ingestreportvalidator; import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.EventStream; import org.apache.jena.rdf.model.Model; diff --git a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/ingestreportvalidator/PathsValidator.java b/ldes-server-ingest/ldes-server-ingest-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/validators/ingestreportvalidator/PathsValidator.java similarity index 97% rename from ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/ingestreportvalidator/PathsValidator.java rename to ldes-server-ingest/ldes-server-ingest-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/validators/ingestreportvalidator/PathsValidator.java index 5c47edfbfd..981301b9d8 100644 --- a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/ingestreportvalidator/PathsValidator.java +++ b/ldes-server-ingest/ldes-server-ingest-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/validators/ingestreportvalidator/PathsValidator.java @@ -1,4 +1,4 @@ -package be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.validators.ingestreportvalidator; +package be.vlaanderen.informatievlaanderen.ldes.server.ingest.validators.ingestreportvalidator; import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.EventStream; import org.apache.jena.datatypes.RDFDatatype; diff --git a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/ingestreportvalidator/ShaclReportManager.java b/ldes-server-ingest/ldes-server-ingest-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/validators/ingestreportvalidator/ShaclReportManager.java similarity index 93% rename from ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/ingestreportvalidator/ShaclReportManager.java rename to ldes-server-ingest/ldes-server-ingest-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/validators/ingestreportvalidator/ShaclReportManager.java index 7b93d5059a..b10afb748b 100644 --- a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/ingestreportvalidator/ShaclReportManager.java +++ b/ldes-server-ingest/ldes-server-ingest-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/validators/ingestreportvalidator/ShaclReportManager.java @@ -1,4 +1,4 @@ -package be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.validators.ingestreportvalidator; +package be.vlaanderen.informatievlaanderen.ldes.server.ingest.validators.ingestreportvalidator; import org.apache.jena.graph.NodeFactory; import org.apache.jena.rdf.model.Resource; diff --git a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/memberingestvalidator/MemberIngestValidator.java b/ldes-server-ingest/ldes-server-ingest-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/validators/memberingestvalidator/MemberIngestValidator.java similarity index 88% rename from ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/memberingestvalidator/MemberIngestValidator.java rename to ldes-server-ingest/ldes-server-ingest-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/validators/memberingestvalidator/MemberIngestValidator.java index 4e9d98c730..e4246cc4fa 100644 --- a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/memberingestvalidator/MemberIngestValidator.java +++ b/ldes-server-ingest/ldes-server-ingest-common/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/validators/memberingestvalidator/MemberIngestValidator.java @@ -1,4 +1,4 @@ -package be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.validators.memberingestvalidator; +package be.vlaanderen.informatievlaanderen.ldes.server.ingest.validators.memberingestvalidator; import be.vlaanderen.informatievlaanderen.ldes.server.domain.converter.RdfModelConverter; import be.vlaanderen.informatievlaanderen.ldes.server.domain.events.admin.EventStreamClosedEvent; @@ -6,9 +6,9 @@ import be.vlaanderen.informatievlaanderen.ldes.server.domain.events.admin.EventStreamDeletedEvent; import be.vlaanderen.informatievlaanderen.ldes.server.domain.exceptions.ShaclValidationException; import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.EventStream; -import be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.validators.IngestValidator; -import be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.validators.ingestreportvalidator.IngestReportValidator; -import be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.validators.ingestreportvalidator.ShaclReportManager; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.validators.IngestValidator; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.validators.ingestreportvalidator.IngestReportValidator; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.validators.ingestreportvalidator.ShaclReportManager; import org.apache.jena.rdf.model.Model; import org.apache.jena.riot.Lang; import org.apache.jena.shacl.ValidationReport; diff --git a/ldes-server-ingest/ldes-server-ingest-kafka/pom.xml b/ldes-server-ingest/ldes-server-ingest-kafka/pom.xml new file mode 100644 index 0000000000..1babd76530 --- /dev/null +++ b/ldes-server-ingest/ldes-server-ingest-kafka/pom.xml @@ -0,0 +1,44 @@ + + + 4.0.0 + + be.vlaanderen.informatievlaanderen.vsds + ldes-server-ingest + 3.6.0-SNAPSHOT + + + ldes-server-ingest-kafka + + + + be.vlaanderen.informatievlaanderen.vsds + ldes-server-ingest-common + ${project.version} + + + org.springframework.kafka + spring-kafka + ${spring.kafka.version} + + + + + org.junit.jupiter + junit-jupiter + test + + + + + ${project.artifactId} + + + org.apache.maven.plugins + maven-assembly-plugin + + + + + \ No newline at end of file diff --git a/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/KafkaListenerContainerManager.java b/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/KafkaListenerContainerManager.java new file mode 100644 index 0000000000..d4948eca46 --- /dev/null +++ b/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/KafkaListenerContainerManager.java @@ -0,0 +1,89 @@ +package be.vlaanderen.informatievlaanderen.ldes.server.ingest.kafka; + +import be.vlaanderen.informatievlaanderen.ldes.server.domain.events.admin.EventStreamDeletedEvent; +import be.vlaanderen.informatievlaanderen.ldes.server.domain.events.admin.KafkaSourceAddedEvent; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.MemberIngester; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.kafka.listener.IngestListener; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.validators.IngestValidator; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.RDFLanguages; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.event.EventListener; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpoint; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.config.MethodKafkaListenerEndpoint; +import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; +import org.springframework.stereotype.Component; + +import java.util.Collection; +import java.util.Optional; +import java.util.UUID; + +@Component +public class KafkaListenerContainerManager { + private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; + private final KafkaListenerContainerFactory> kafkaListenerContainerFactory; + private final KafkaProperties kafkaProperties; + private final IngestValidator ingestValidator; + private final MemberIngester memberIngester; + + public KafkaListenerContainerManager(KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry, + KafkaListenerContainerFactory> kafkaListenerContainerFactory, + KafkaProperties kafkaProperties, IngestValidator ingestValidator, + MemberIngester memberIngester) { + this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry; + this.kafkaListenerContainerFactory = kafkaListenerContainerFactory; + this.kafkaProperties = kafkaProperties; + this.ingestValidator = ingestValidator; + this.memberIngester = memberIngester; + } + + public void registerListener(String listenerId, String collection, String topic, String mimeType) throws NoSuchMethodException { + kafkaListenerEndpointRegistry.registerListenerContainer( + createKafkaListenerEndpoint(listenerId, collection, topic, mimeType), kafkaListenerContainerFactory, true + ); + } + + public Collection listContainers() { + return kafkaListenerEndpointRegistry.getListenerContainers(); + } + + public Optional getContainer(String listenerId) { + return Optional.ofNullable(kafkaListenerEndpointRegistry.getListenerContainer(listenerId)); + } + + public void unregisterListener(String listenerId) { + kafkaListenerEndpointRegistry.unregisterListenerContainer(listenerId); + } + + @EventListener(KafkaSourceAddedEvent.class) + public void onKafkaSourceAdded(KafkaSourceAddedEvent event) throws NoSuchMethodException { + registerListener(UUID.randomUUID().toString(), event.kafkaSource().collection(), event.kafkaSource().topic(), event.kafkaSource().mimeType()); + } + + @EventListener(EventStreamDeletedEvent.class) + public void onEventStreamDeleted(EventStreamDeletedEvent event) { + unregisterListener(event.collectionName()); + } + + private KafkaListenerEndpoint createKafkaListenerEndpoint(String listenerId, String collection, String topic, String mimeType) throws NoSuchMethodException { + MethodKafkaListenerEndpoint kafkaListenerEndpoint = new MethodKafkaListenerEndpoint<>(); + kafkaListenerEndpoint.setId(listenerId); + kafkaListenerEndpoint.setGroupId(kafkaProperties.getConsumer().getGroupId()); + kafkaListenerEndpoint.setAutoStartup(true); + kafkaListenerEndpoint.setTopics(topic); + kafkaListenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory()); + + Lang lang = RDFLanguages.contentTypeToLang(mimeType); + + kafkaListenerEndpoint.setBean(new IngestListener(collection, lang, ingestValidator, memberIngester)); + + kafkaListenerEndpoint.setMethod(IngestListener.class.getMethod("onMessage", ConsumerRecord.class, Acknowledgment.class)); + return kafkaListenerEndpoint; + } +} diff --git a/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/config/KafkaConfig.java b/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/config/KafkaConfig.java new file mode 100644 index 0000000000..11edc2fd72 --- /dev/null +++ b/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/config/KafkaConfig.java @@ -0,0 +1,21 @@ +package be.vlaanderen.informatievlaanderen.ldes.server.ingest.kafka.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; + +@Configuration +public class KafkaConfig { + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); + return factory; + } +} \ No newline at end of file diff --git a/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/controller/KafkaConsumerController.java b/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/controller/KafkaConsumerController.java new file mode 100644 index 0000000000..8fa17dbdc3 --- /dev/null +++ b/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/controller/KafkaConsumerController.java @@ -0,0 +1,127 @@ +package be.vlaanderen.informatievlaanderen.ldes.server.ingest.kafka.controller; + +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.kafka.KafkaListenerContainerManager; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.kafka.exception.KafkaConsumerException; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.kafka.model.KafkaConsumerAssignment; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.kafka.model.KafkaConsumerRequest; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.kafka.model.KafkaConsumerResponse; +import org.apache.kafka.common.TopicPartition; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.web.bind.annotation.*; + +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +@RestController +@RequestMapping(path = "/consumers") +public class KafkaConsumerController { + private final KafkaListenerContainerManager kafkaListenerContainerManager; + + public KafkaConsumerController(KafkaListenerContainerManager kafkaListenerContainerManager) { + this.kafkaListenerContainerManager = kafkaListenerContainerManager; + } + + @PostMapping + public void create(@RequestBody KafkaConsumerRequest req) throws NoSuchMethodException { + kafkaListenerContainerManager.registerListener( + UUID.randomUUID().toString(), + req.collection(), + req.topic(), + req.mimeType() + ); + } + + @GetMapping + public List list() { + return kafkaListenerContainerManager.listContainers() + .stream() + .map(this::createKafkaConsumerResponse) + .toList(); + } + + @GetMapping(path="/{listenerId}") + public KafkaConsumerResponse get(@PathVariable String listenerId) { + return createKafkaConsumerResponse(getListenerContainer(listenerId)); + } + + @PutMapping(path = "/{listenerId}/activate") + public void activate(@PathVariable String listenerId) { + MessageListenerContainer listenerContainer = getListenerContainer(listenerId); + + if (listenerContainer.isRunning()) { + throw new KafkaConsumerException("Consumer is already running : " + listenerId); + } + + listenerContainer.start(); + } + + @PutMapping(path = "/{listenerId}/pause") + public void pause(@PathVariable String listenerId) { + MessageListenerContainer listenerContainer = getListenerContainer(listenerId); + if (!listenerContainer.isRunning()) { + throw new KafkaConsumerException("Consumer is not running: " + listenerId); + } else if (listenerContainer.isContainerPaused()) { + throw new KafkaConsumerException("Consumer is already paused: " + listenerId); + } else if (listenerContainer.isPauseRequested()) { + throw new KafkaConsumerException("Consumer pause is already requested: " + listenerId); + } + listenerContainer.pause(); + } + + @PutMapping(path = "/{listenerId}/resume") + public void resume(@PathVariable String listenerId) { + MessageListenerContainer listenerContainer = getListenerContainer(listenerId); + if (!listenerContainer.isRunning()) { + throw new KafkaConsumerException("Consumer is not running: " + listenerId); + } else if (!listenerContainer.isContainerPaused()) { + throw new KafkaConsumerException("Consumer is not paused: " + listenerId); + } + listenerContainer.resume(); + } + + @PutMapping(path = "/{listenerId}/stop") + public void stop(@PathVariable String listenerId) { + MessageListenerContainer listenerContainer = getListenerContainer(listenerId); + if (!listenerContainer.isRunning()) { + throw new KafkaConsumerException("Consumer is already stopped: " + listenerId); + } + listenerContainer.stop(); + } + + @DeleteMapping(path = "{listenerId}") + public void delete(@PathVariable String listenerId) { + MessageListenerContainer listenerContainer = getListenerContainer(listenerId); + listenerContainer.stop(); + kafkaListenerContainerManager.unregisterListener(listenerId); + } + + private MessageListenerContainer getListenerContainer(String listenerId) { + Optional listenerContainerOpt = kafkaListenerContainerManager.getContainer(listenerId); + if (listenerContainerOpt.isEmpty()) { + throw new KafkaConsumerException("No such consumer: " + listenerId); + } + + return listenerContainerOpt.get(); + } + + private KafkaConsumerResponse createKafkaConsumerResponse(MessageListenerContainer listenerContainer) { + return KafkaConsumerResponse.builder() + .groupId(listenerContainer.getGroupId()) + .listenerId(listenerContainer.getListenerId()) + .active(listenerContainer.isRunning()) + .assignments(Optional.ofNullable(listenerContainer.getAssignedPartitions()) + .map(topicPartitions -> topicPartitions.stream() + .map(this::createKafkaConsumerAssignmentResponse) + .toList()) + .orElse(null)) + .build(); + } + + private KafkaConsumerAssignment createKafkaConsumerAssignmentResponse(TopicPartition topicPartition) { + return KafkaConsumerAssignment.builder() + .topic(topicPartition.topic()) + .partition(topicPartition.partition()) + .build(); + } +} diff --git a/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/exception/KafkaConsumerException.java b/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/exception/KafkaConsumerException.java new file mode 100644 index 0000000000..2361ee8196 --- /dev/null +++ b/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/exception/KafkaConsumerException.java @@ -0,0 +1,8 @@ +package be.vlaanderen.informatievlaanderen.ldes.server.ingest.kafka.exception; + +public class KafkaConsumerException extends RuntimeException { + public KafkaConsumerException(String message) { + super(message); + + } +} diff --git a/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/listener/IngestListener.java b/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/listener/IngestListener.java new file mode 100644 index 0000000000..6d37e70062 --- /dev/null +++ b/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/listener/IngestListener.java @@ -0,0 +1,41 @@ +package be.vlaanderen.informatievlaanderen.ldes.server.ingest.kafka.listener; + +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.MemberIngester; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.validators.IngestValidator; +import org.apache.jena.riot.Lang; +import org.apache.jena.riot.RDFParser; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.listener.AcknowledgingMessageListener; +import org.springframework.kafka.support.Acknowledgment; + +public class IngestListener implements AcknowledgingMessageListener { + Logger log = LoggerFactory.getLogger(IngestListener.class); + + private final String collection; + private final Lang lang; + private final IngestValidator validator; + private final MemberIngester memberIngester; + + public IngestListener(String collection, Lang lang, IngestValidator validator, MemberIngester memberIngester) { + this.collection = collection; + this.lang = lang; + this.validator = validator; + this.memberIngester = memberIngester; + } + + @Override + public void onMessage(ConsumerRecord data, Acknowledgment ack) { + try { + var model = RDFParser.fromString(data.value(), lang).toModel(); + + validator.validate(model, collection); + memberIngester.ingest(collection, model); + + ack.acknowledge(); + } catch (Exception e) { + log.error(e.toString()); + } + } +} diff --git a/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/model/KafkaConsumerAssignment.java b/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/model/KafkaConsumerAssignment.java new file mode 100644 index 0000000000..2dafc00c9d --- /dev/null +++ b/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/model/KafkaConsumerAssignment.java @@ -0,0 +1,26 @@ +package be.vlaanderen.informatievlaanderen.ldes.server.ingest.kafka.model; + +public record KafkaConsumerAssignment(String topic, Integer partition) { + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private String topic; + private Integer partition; + + public Builder topic(String topic) { + this.topic = topic; + return this; + } + + public Builder partition(Integer partition) { + this.partition = partition; + return this; + } + + public KafkaConsumerAssignment build() { + return new KafkaConsumerAssignment(topic, partition); + } + } +} diff --git a/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/model/KafkaConsumerRequest.java b/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/model/KafkaConsumerRequest.java new file mode 100644 index 0000000000..022ad92cd4 --- /dev/null +++ b/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/model/KafkaConsumerRequest.java @@ -0,0 +1,4 @@ +package be.vlaanderen.informatievlaanderen.ldes.server.ingest.kafka.model; + +public record KafkaConsumerRequest(String collection, String topic, String mimeType) { +} diff --git a/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/model/KafkaConsumerResponse.java b/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/model/KafkaConsumerResponse.java new file mode 100644 index 0000000000..76e5c97559 --- /dev/null +++ b/ldes-server-ingest/ldes-server-ingest-kafka/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/model/KafkaConsumerResponse.java @@ -0,0 +1,40 @@ +package be.vlaanderen.informatievlaanderen.ldes.server.ingest.kafka.model; + +import java.util.List; + +public record KafkaConsumerResponse(String listenerId, String groupId, Boolean active, List assignments) { + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private String listenerId; + private String groupId; + private Boolean active; + private List assignments; + + public Builder listenerId(String listenerId) { + this.listenerId = listenerId; + return this; + } + + public Builder groupId(String groupId) { + this.groupId = groupId; + return this; + } + + public Builder active(Boolean active) { + this.active = active; + return this; + } + + public Builder assignments(List assignments) { + this.assignments = assignments; + return this; + } + + public KafkaConsumerResponse build() { + return new KafkaConsumerResponse(listenerId, groupId, active, assignments); + } + } +} diff --git a/ldes-server-ingest/ldes-server-ingest-kafka/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/controller/KafkaConsumerControllerTest.java b/ldes-server-ingest/ldes-server-ingest-kafka/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/controller/KafkaConsumerControllerTest.java new file mode 100644 index 0000000000..1e5cbfdf8c --- /dev/null +++ b/ldes-server-ingest/ldes-server-ingest-kafka/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/kafka/controller/KafkaConsumerControllerTest.java @@ -0,0 +1,100 @@ +package be.vlaanderen.informatievlaanderen.ldes.server.ingest.kafka.controller; + +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.kafka.KafkaListenerContainerManager; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.kafka.model.KafkaConsumerRequest; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.kafka.model.KafkaConsumerResponse; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.listener.MessageListenerContainer; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.*; + +class KafkaConsumerControllerTest { + + private KafkaListenerContainerManager kafkaListenerContainerManager; + private KafkaConsumerController kafkaConsumerController; + + @BeforeEach + void setUp() { + kafkaListenerContainerManager = mock(KafkaListenerContainerManager.class); + kafkaConsumerController = new KafkaConsumerController(kafkaListenerContainerManager); + } + + @Test + void create() throws NoSuchMethodException { + KafkaConsumerRequest request = new KafkaConsumerRequest("collection", "topic", "mimeType"); + kafkaConsumerController.create(request); + verify(kafkaListenerContainerManager, times(1)).registerListener(anyString(), eq("collection"), eq("topic"), eq("mimeType")); + } + + @Test + void list() { + when(kafkaListenerContainerManager.listContainers()).thenReturn(Collections.emptyList()); + List response = kafkaConsumerController.list(); + assertTrue(response.isEmpty()); + } + + @Test + void get() { + MessageListenerContainer container = mock(MessageListenerContainer.class); + when(container.getListenerId()).thenReturn(UUID.randomUUID().toString()); + when(kafkaListenerContainerManager.getContainer(anyString())).thenReturn(Optional.of(container)); + KafkaConsumerResponse response = kafkaConsumerController.get("listenerId"); + assertNotNull(response); + } + + @Test + void activate() { + MessageListenerContainer container = mock(MessageListenerContainer.class); + when(container.isRunning()).thenReturn(false); + when(kafkaListenerContainerManager.getContainer(anyString())).thenReturn(Optional.of(container)); + kafkaConsumerController.activate("listenerId"); + verify(container, times(1)).start(); + } + + @Test + void pause() { + MessageListenerContainer container = mock(MessageListenerContainer.class); + when(container.isRunning()).thenReturn(true); + when(container.isContainerPaused()).thenReturn(false); + when(container.isPauseRequested()).thenReturn(false); + when(kafkaListenerContainerManager.getContainer(anyString())).thenReturn(Optional.of(container)); + kafkaConsumerController.pause("listenerId"); + verify(container, times(1)).pause(); + } + + @Test + void resume() { + MessageListenerContainer container = mock(MessageListenerContainer.class); + when(container.isRunning()).thenReturn(true); + when(container.isContainerPaused()).thenReturn(true); + when(kafkaListenerContainerManager.getContainer(anyString())).thenReturn(Optional.of(container)); + kafkaConsumerController.resume("listenerId"); + verify(container, times(1)).resume(); + } + + @Test + void stop() { + MessageListenerContainer container = mock(MessageListenerContainer.class); + when(container.isRunning()).thenReturn(true); + when(kafkaListenerContainerManager.getContainer(anyString())).thenReturn(Optional.of(container)); + kafkaConsumerController.stop("listenerId"); + verify(container, times(1)).stop(); + } + + @Test + void delete() { + MessageListenerContainer container = mock(MessageListenerContainer.class); + when(kafkaListenerContainerManager.getContainer(anyString())).thenReturn(Optional.of(container)); + kafkaConsumerController.delete("listenerId"); + verify(container, times(1)).stop(); + verify(kafkaListenerContainerManager, times(1)).unregisterListener("listenerId"); + } +} \ No newline at end of file diff --git a/ldes-server-ingest/ldes-server-ingest-rest/pom.xml b/ldes-server-ingest/ldes-server-ingest-rest/pom.xml index 38761e76fb..a5eb880371 100644 --- a/ldes-server-ingest/ldes-server-ingest-rest/pom.xml +++ b/ldes-server-ingest/ldes-server-ingest-rest/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server-ingest - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT ldes-server-ingest-rest diff --git a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/MemberIngestController.java b/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/MemberIngestController.java index 6463ddd9f3..87d03cfe9c 100644 --- a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/MemberIngestController.java +++ b/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/MemberIngestController.java @@ -1,7 +1,7 @@ package be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest; import be.vlaanderen.informatievlaanderen.ldes.server.ingest.MemberIngester; -import be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.validators.IngestValidator; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.validators.IngestValidator; import io.micrometer.observation.annotation.Observed; import org.apache.jena.rdf.model.Model; import org.springframework.http.HttpStatus; diff --git a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/collection/VersionOfPathCollection.java b/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/collection/VersionOfPathCollection.java deleted file mode 100644 index b9336ed0c5..0000000000 --- a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/collection/VersionOfPathCollection.java +++ /dev/null @@ -1,32 +0,0 @@ -package be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.collection; - -import be.vlaanderen.informatievlaanderen.ldes.server.domain.events.admin.EventStreamCreatedEvent; -import be.vlaanderen.informatievlaanderen.ldes.server.domain.events.admin.EventStreamDeletedEvent; -import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.EventStream; -import org.springframework.context.event.EventListener; -import org.springframework.stereotype.Component; - -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - -@Component -public class VersionOfPathCollection { - private final Map versionOfPaths = new HashMap<>(); - - @EventListener - public void handleEventStreamCreatedEvent(EventStreamCreatedEvent event) { - EventStream eventStream = event.eventStream(); - versionOfPaths.put(eventStream.getCollection(), eventStream.getVersionOfPath()); - } - - @EventListener - public void handleEventStreamDeletedEvent(EventStreamDeletedEvent event) { - versionOfPaths.remove(event.collectionName()); - } - - public Optional getVersionOfPath(String collectionName) { - return Optional.ofNullable(versionOfPaths.get(collectionName)); - } -} - diff --git a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/exception/IngestionRestResponseEntityExceptionHandler.java b/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/exception/IngestionRestResponseEntityExceptionHandler.java index 0ca41da82f..238daa60de 100644 --- a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/exception/IngestionRestResponseEntityExceptionHandler.java +++ b/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/exception/IngestionRestResponseEntityExceptionHandler.java @@ -16,14 +16,6 @@ @ControllerAdvice public class IngestionRestResponseEntityExceptionHandler extends ResponseEntityExceptionHandler { - @ExceptionHandler(value = { MemberIdNotFoundException.class }) - protected ResponseEntity handleGeneralException( - RuntimeException ex, WebRequest request) { - logger.error(ex.getMessage()); - String bodyOfResponse = ex.getMessage(); - return handleExceptionInternal(ex, bodyOfResponse, new HttpHeaders(), HttpStatus.BAD_REQUEST, request); - } - @ExceptionHandler(value = { ShaclValidationException.class }) protected ResponseEntity handleShaclValidationException( ShaclValidationException ex, WebRequest request) { diff --git a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/exception/MemberIdNotFoundException.java b/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/exception/MemberIdNotFoundException.java deleted file mode 100644 index ef4fec890c..0000000000 --- a/ldes-server-ingest/ldes-server-ingest-rest/src/main/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/exception/MemberIdNotFoundException.java +++ /dev/null @@ -1,21 +0,0 @@ -package be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.exception; - -import org.apache.jena.rdf.model.Model; -import org.apache.jena.riot.Lang; -import org.apache.jena.riot.RDFWriter; - -public class MemberIdNotFoundException extends RuntimeException { - - private final transient Model model; - - public MemberIdNotFoundException(Model model) { - this.model = model; - } - - @Override - public String getMessage() { - final String modelAsString = RDFWriter.source(model).lang(Lang.TURTLE).asString(); - return "Member id could not be extracted from model: %n%n%s".formatted(modelAsString); - } - -} diff --git a/ldes-server-ingest/ldes-server-ingest-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/MemberIngestControllerTest.java b/ldes-server-ingest/ldes-server-ingest-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/MemberIngestControllerTest.java index 48d276b2b0..11bdcbf754 100644 --- a/ldes-server-ingest/ldes-server-ingest-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/MemberIngestControllerTest.java +++ b/ldes-server-ingest/ldes-server-ingest-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/MemberIngestControllerTest.java @@ -7,12 +7,12 @@ import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.EventStream; import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.VersionCreationProperties; import be.vlaanderen.informatievlaanderen.ldes.server.ingest.MemberIngester; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.exceptions.MemberSubjectNotFoundException; import be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.converters.IngestedModelConverter; import be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.exception.IngestionRestResponseEntityExceptionHandler; -import be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.exception.MemberIdNotFoundException; -import be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.validators.ingestreportvalidator.BlankNodesValidator; -import be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.validators.ingestreportvalidator.PathsValidator; -import be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.validators.memberingestvalidator.MemberIngestValidator; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.validators.ingestreportvalidator.BlankNodesValidator; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.validators.ingestreportvalidator.PathsValidator; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.validators.memberingestvalidator.MemberIngestValidator; import org.apache.jena.rdf.model.Model; import org.apache.jena.rdf.model.ModelFactory; import org.apache.jena.riot.Lang; @@ -90,7 +90,7 @@ void when_POSTRequestIsPerformedWithMultipleNamedNodes_ThrowException() throws E byte[] ldesMemberBytes = readLdesMemberDataFromFile("example-ldes-member-with-multiple-nodes.nq", Lang.NQUADS); when(memberIngester.ingest(eq("mobility-hindrances"), any())) - .thenThrow(new MemberIdNotFoundException(ModelFactory.createDefaultModel())); + .thenThrow(new MemberSubjectNotFoundException(ModelFactory.createDefaultModel())); mockMvc.perform(post("/mobility-hindrances") .contentType("application/n-quads") @@ -116,7 +116,7 @@ void when_POSTRequestIsPerformed_WithoutVersionOf_ThenTheRequestFails() throws E Lang.NQUADS); when(memberIngester.ingest(eq("mobility-hindrances"), any())) - .thenThrow(new MemberIdNotFoundException(ModelFactory.createDefaultModel())); + .thenThrow(new MemberSubjectNotFoundException(ModelFactory.createDefaultModel())); mockMvc.perform(post("/mobility-hindrances").contentType("application/n-quads").content(ldesMemberBytes)) .andExpect(status().isBadRequest()) diff --git a/ldes-server-ingest/ldes-server-ingest-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/memberingestvalidator/MemberIngestValidatorTest.java b/ldes-server-ingest/ldes-server-ingest-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/memberingestvalidator/MemberIngestValidatorTest.java index aa57a3e177..fb66db41a2 100644 --- a/ldes-server-ingest/ldes-server-ingest-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/memberingestvalidator/MemberIngestValidatorTest.java +++ b/ldes-server-ingest/ldes-server-ingest-rest/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/ingest/rest/validators/memberingestvalidator/MemberIngestValidatorTest.java @@ -5,8 +5,9 @@ import be.vlaanderen.informatievlaanderen.ldes.server.domain.exceptions.ShaclValidationException; import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.EventStream; import be.vlaanderen.informatievlaanderen.ldes.server.domain.model.VersionCreationProperties; -import be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.validators.ingestreportvalidator.BlankNodesValidator; -import be.vlaanderen.informatievlaanderen.ldes.server.ingest.rest.validators.ingestreportvalidator.PathsValidator; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.validators.ingestreportvalidator.BlankNodesValidator; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.validators.ingestreportvalidator.PathsValidator; +import be.vlaanderen.informatievlaanderen.ldes.server.ingest.validators.memberingestvalidator.MemberIngestValidator; import org.apache.jena.datatypes.xsd.XSDDatatype; import org.apache.jena.rdf.model.Model; import org.apache.jena.riot.RDFDataMgr; @@ -25,7 +26,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; class MemberIngestValidatorTest { private static final String TIMESTAMP_PATH = "http://purl.org/dc/terms/created"; diff --git a/ldes-server-ingest/pom.xml b/ldes-server-ingest/pom.xml index 6ae54b0b38..9a0592ec5f 100644 --- a/ldes-server-ingest/pom.xml +++ b/ldes-server-ingest/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT pom @@ -15,6 +15,7 @@ ldes-server-ingest-common ldes-server-ingest-rest + ldes-server-ingest-kafka \ No newline at end of file diff --git a/ldes-server-integration-test/pom.xml b/ldes-server-integration-test/pom.xml index 0909ef80ea..b29bfe3b54 100644 --- a/ldes-server-integration-test/pom.xml +++ b/ldes-server-integration-test/pom.xml @@ -6,11 +6,17 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT ldes-server-integration-test + + 1.18.3 + 3.6.0 + 1.18.3 + + @@ -84,6 +90,13 @@ test + + be.vlaanderen.informatievlaanderen.vsds + ldes-server-ingest-kafka + ${project.version} + test + + @@ -130,6 +143,27 @@ org.awaitility awaitility + + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + + org.testcontainers + kafka + ${kafka.version} + test + + + org.apache.kafka + kafka-clients + ${kafka-clients.version} + test + + \ No newline at end of file diff --git a/ldes-server-integration-test/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/IngestKafkaSteps.java b/ldes-server-integration-test/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/IngestKafkaSteps.java new file mode 100644 index 0000000000..1ffd16fc61 --- /dev/null +++ b/ldes-server-integration-test/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/IngestKafkaSteps.java @@ -0,0 +1,80 @@ +package be.vlaanderen.informatievlaanderen.ldes.server; + +import io.cucumber.java.en.Given; +import io.cucumber.java.en.When; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.errors.TopicExistsException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +public class IngestKafkaSteps extends LdesServerIntegrationTest { + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Given("I have a Kafka Container running") + public void iHaveAKafkaContainerRunning() { + assertThat(kafka.isRunning()).isTrue(); + } + + @When("I add {int} members of template {string} to the Kafka topic {string}") + public void iAddMembersOfTemplateToTheKafkaTopic(int numberOfMembers, String memberTemplate, String topic) throws IOException, URISyntaxException { + final String memberContentTemplate = readMemberTemplate(memberTemplate); + + createTopicIfNotExists(topic, 1, (short) 1); + + for (int i = 0; i < numberOfMembers; i++) { + String memberContent = memberContentTemplate + .replace("ID", String.valueOf(i)) + .replace("DATETIME", getCurrentTimestamp()); + kafkaTemplate.send(topic, memberContent); + } + } + + private void createTopicIfNotExists(String topic, int partitions, short replicationFactor) { + Properties props = new Properties(); + props.put("bootstrap.servers", kafka.getBootstrapServers()); + + try (AdminClient adminClient = AdminClient.create(props)) { + ListTopicsResult topics = adminClient.listTopics(new ListTopicsOptions().timeoutMs(10000)); + if (!topics.names().get().contains(topic)) { + NewTopic newTopic = new NewTopic(topic, partitions, replicationFactor); + adminClient.createTopics(Collections.singleton(newTopic)).all().get(); + } + } catch (InterruptedException | ExecutionException e) { + if (!(e.getCause() instanceof TopicExistsException)) { + throw new RuntimeException("Failed to create topic", e); + } + } + } + + private String readMemberTemplate(String fileName) throws IOException, URISyntaxException { + ClassLoader classLoader = getClass().getClassLoader(); + Path path = Paths.get(Objects.requireNonNull(classLoader.getResource(fileName)).toURI()); + return Files.lines(path).collect(Collectors.joining()); + } + + private String getCurrentTimestamp() { + return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.[SSS]'Z'")); + } +} diff --git a/ldes-server-integration-test/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/LdesServerIntegrationTest.java b/ldes-server-integration-test/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/LdesServerIntegrationTest.java index 44d6816f77..e542114b7d 100644 --- a/ldes-server-integration-test/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/LdesServerIntegrationTest.java +++ b/ldes-server-integration-test/src/test/java/be/vlaanderen/informatievlaanderen/ldes/server/LdesServerIntegrationTest.java @@ -18,10 +18,12 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Import; import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.test.context.ActiveProfiles; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.*; import org.springframework.test.web.servlet.MockMvc; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; import javax.sql.DataSource; @@ -41,6 +43,7 @@ "ldes-server.maintenance-cron=*/10 * * * * *", "ldes-server.compaction-duration=PT1S" }) +@Testcontainers @SuppressWarnings("java:S2187") public class LdesServerIntegrationTest { @Autowired @@ -62,4 +65,14 @@ public class LdesServerIntegrationTest { @Autowired JobExplorer jobExplorer; + + @Container + static final KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")) + .withKraft(); + + @DynamicPropertySource + static void overrideProperties(DynamicPropertyRegistry registry) { + kafka.start(); + registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers); + } } diff --git a/ldes-server-integration-test/src/test/resources/application-postgres-test.yml b/ldes-server-integration-test/src/test/resources/application-postgres-test.yml index e0b815ab7b..cc3f2571b6 100644 --- a/ldes-server-integration-test/src/test/resources/application-postgres-test.yml +++ b/ldes-server-integration-test/src/test/resources/application-postgres-test.yml @@ -18,6 +18,14 @@ spring: initialize-schema: always isolation-level-for-create: READ_COMMITTED + kafka: + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + consumer: + group-id: ldes + auto-offset-reset: earliest + zonky: test: database: diff --git a/ldes-server-integration-test/src/test/resources/data/input/eventstreams/kafka/event-stream_kafka_jsonld.ttl b/ldes-server-integration-test/src/test/resources/data/input/eventstreams/kafka/event-stream_kafka_jsonld.ttl new file mode 100644 index 0000000000..e61b98dbb2 --- /dev/null +++ b/ldes-server-integration-test/src/test/resources/data/input/eventstreams/kafka/event-stream_kafka_jsonld.ttl @@ -0,0 +1,16 @@ +@prefix ldes: . +@prefix dcterms: . +@prefix prov: . +@prefix tree: . +@prefix sh: . +@prefix server: . +@prefix xsd: . +@prefix event-stream: . + +server:jsonld + a ldes:EventStream ; + ldes:timestampPath dcterms:created ; + ldes:versionOfPath dcterms:isVersionOf ; + ldes:kafkaSource [ ldes:topic "jsonld" ; + ldes:mimeType "application/ld+json" ; ] ; + tree:shape [ a sh:NodeShape] . \ No newline at end of file diff --git a/ldes-server-integration-test/src/test/resources/data/input/eventstreams/kafka/event-stream_kafka_nq.ttl b/ldes-server-integration-test/src/test/resources/data/input/eventstreams/kafka/event-stream_kafka_nq.ttl new file mode 100644 index 0000000000..3ee24e7d50 --- /dev/null +++ b/ldes-server-integration-test/src/test/resources/data/input/eventstreams/kafka/event-stream_kafka_nq.ttl @@ -0,0 +1,16 @@ +@prefix ldes: . +@prefix dcterms: . +@prefix prov: . +@prefix tree: . +@prefix sh: . +@prefix server: . +@prefix xsd: . +@prefix event-stream: . + +server:nq + a ldes:EventStream ; + ldes:timestampPath dcterms:created ; + ldes:versionOfPath dcterms:isVersionOf ; + ldes:kafkaSource [ ldes:topic "nq" ; + ldes:mimeType "application/n-quads" ; ] ; + tree:shape [ a sh:NodeShape] . \ No newline at end of file diff --git a/ldes-server-integration-test/src/test/resources/data/input/eventstreams/kafka/event-stream_kafka_ttl.ttl b/ldes-server-integration-test/src/test/resources/data/input/eventstreams/kafka/event-stream_kafka_ttl.ttl new file mode 100644 index 0000000000..cf216bfb7d --- /dev/null +++ b/ldes-server-integration-test/src/test/resources/data/input/eventstreams/kafka/event-stream_kafka_ttl.ttl @@ -0,0 +1,16 @@ +@prefix ldes: . +@prefix dcterms: . +@prefix prov: . +@prefix tree: . +@prefix sh: . +@prefix server: . +@prefix xsd: . + +server:ttl + a ldes:EventStream ; + ldes:timestampPath dcterms:created ; + ldes:versionOfPath dcterms:isVersionOf ; + ldes:kafkaSource [ ldes:topic "ttl" ; + ldes:mimeType "text/turtle" ; ] ; + + tree:shape [ a sh:NodeShape] . \ No newline at end of file diff --git a/ldes-server-integration-test/src/test/resources/data/input/members/mob-hind.template.json b/ldes-server-integration-test/src/test/resources/data/input/members/mob-hind.template.json new file mode 100644 index 0000000000..ebcea619f2 --- /dev/null +++ b/ldes-server-integration-test/src/test/resources/data/input/members/mob-hind.template.json @@ -0,0 +1,30 @@ +[ + { + "@id": "http://test-data/mobility-hindrance/1" + }, + { + "@id": "http://test-data/mobility-hindrance/1/ID", + "http://purl.org/dc/terms/isVersionOf": [ + { + "@id": "http://test-data/mobility-hindrance/1" + } + ], + "@type": [ + "https://data.vlaanderen.be/ns/mobiliteit#Mobiliteitshinder" + ], + "http://www.opengis.net/ont/geosparql#asWKT": [ + { + "@value": "POLYGON ((3.7337472847142124 51.04745170597559, 4.359276660355135 50.851907920816956, 4.711285586572245 50.84364854093491, 4.4020885567877315 51.214619167436666, 3.7337472847142124 51.04745170597559))", + "@type": "http://www.opengis.net/ont/geosparql#wktLiteral" + } + ], + "http://purl.org/dc/terms/created": [ + { + "@value": "DATETIME" + } + ] + }, + { + "@id": "https://data.vlaanderen.be/ns/mobiliteit#Mobiliteitshinder" + } +] \ No newline at end of file diff --git a/ldes-server-integration-test/src/test/resources/data/input/members/mob-hind.template.nq b/ldes-server-integration-test/src/test/resources/data/input/members/mob-hind.template.nq new file mode 100644 index 0000000000..6edac37ec3 --- /dev/null +++ b/ldes-server-integration-test/src/test/resources/data/input/members/mob-hind.template.nq @@ -0,0 +1,4 @@ + . + . + "POLYGON ((3.7337472847142124 51.04745170597559, 4.359276660355135 50.851907920816956, 4.711285586572245 50.84364854093491, 4.4020885567877315 51.214619167436666, 3.7337472847142124 51.04745170597559))"^^ . + "DATETIME"^^ . \ No newline at end of file diff --git a/ldes-server-integration-test/src/test/resources/features/ingest/ingest-kafka.feature b/ldes-server-integration-test/src/test/resources/features/ingest/ingest-kafka.feature new file mode 100644 index 0000000000..388ad217be --- /dev/null +++ b/ldes-server-integration-test/src/test/resources/features/ingest/ingest-kafka.feature @@ -0,0 +1,25 @@ +Feature: LDES Server Kafka Ingestion + + Scenario Outline: The LDES server supports Kafka ingestion + Given I have a Kafka Container running + And I create the eventstream + When I add 5 members of template to the Kafka topic + Then the LDES contains 5 members + And I delete the eventstream + Examples: + | eventStreamFile | templateFile | topic | + | "data/input/eventstreams/kafka/event-stream_kafka_ttl.ttl" | "data/input/members/mob-hind.template.ttl" | "ttl" | + | "data/input/eventstreams/kafka/event-stream_kafka_jsonld.ttl" | "data/input/members/mob-hind.template.json" | "jsonld" | + | "data/input/eventstreams/kafka/event-stream_kafka_nq.ttl" | "data/input/members/mob-hind.template.nq" | "nq" | + + + Scenario: The LDES server supports 2 parallel kafka ingestion pipelines + Given I have a Kafka Container running + And I create the eventstream "data/input/eventstreams/kafka/event-stream_kafka_ttl.ttl" + And I create the eventstream "data/input/eventstreams/kafka/event-stream_kafka_nq.ttl" + When I add 5 members of template "data/input/members/mob-hind.template.ttl" to the Kafka topic "ttl" + When I add 10 members of template "data/input/members/mob-hind.template.nq" to the Kafka topic "nq" + Then the LDES "ttl" contains 5 members + And the LDES "nq" contains 10 members + And I delete the eventstream "ttl" + And I delete the eventstream "nq" \ No newline at end of file diff --git a/ldes-server-maintenance/ldes-server-compaction/pom.xml b/ldes-server-maintenance/ldes-server-compaction/pom.xml index 8aca882d5e..ef809b96e6 100644 --- a/ldes-server-maintenance/ldes-server-compaction/pom.xml +++ b/ldes-server-maintenance/ldes-server-compaction/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server-maintenance - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT ldes-server-compaction diff --git a/ldes-server-maintenance/ldes-server-maintenance-common/pom.xml b/ldes-server-maintenance/ldes-server-maintenance-common/pom.xml index 390b269acd..1aacb4f659 100644 --- a/ldes-server-maintenance/ldes-server-maintenance-common/pom.xml +++ b/ldes-server-maintenance/ldes-server-maintenance-common/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server-maintenance - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT ldes-server-maintenance-common diff --git a/ldes-server-maintenance/ldes-server-retention/pom.xml b/ldes-server-maintenance/ldes-server-retention/pom.xml index 13075da4be..a0e0428dca 100644 --- a/ldes-server-maintenance/ldes-server-retention/pom.xml +++ b/ldes-server-maintenance/ldes-server-retention/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server-maintenance - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT ldes-server-retention diff --git a/ldes-server-maintenance/pom.xml b/ldes-server-maintenance/pom.xml index cf4e053c81..4ddf940468 100644 --- a/ldes-server-maintenance/pom.xml +++ b/ldes-server-maintenance/pom.xml @@ -6,7 +6,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT ldes-server-maintenance diff --git a/pom.xml b/pom.xml index 5c8d091a0e..dbd8126798 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ be.vlaanderen.informatievlaanderen.vsds ldes-server - 3.5.0-SNAPSHOT + 3.6.0-SNAPSHOT pom @@ -78,6 +78,7 @@ 6.0.13 + 3.1.2 5.1.0 @@ -89,7 +90,7 @@ 1.10.2 5.8.2 5.10.2 - 5.11.0 + 5.14.2 1.14.12 2.35.2 4.2.0