Skip to content

Commit

Permalink
feat: kafka in + bump 3.6.0 (#1414)
Browse files Browse the repository at this point in the history
* wip: first setup

* feat: working kafka listener

* feat: working kafka source in admin

* fix: update bean name in tests

* feat: working kafka source in admin

* feat: add integration test

* feat: add integration test

* fix test

* fix test

* chore: add kafka jar

* sonar

* add assembly

* docs: documentation effort

* chore: pr remarks

* ci: prepare release 3.6.0

---------

Co-authored-by: Yalz <[email protected]>
  • Loading branch information
Yalz and Yalz authored Nov 18, 2024
1 parent fb2939e commit 50ac3a6
Show file tree
Hide file tree
Showing 102 changed files with 1,370 additions and 169 deletions.
2 changes: 2 additions & 0 deletions .github/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ COPY ./ldes-server-ingest/ldes-server-ingest-rest/target/ldes-server-ingest-rest
COPY ./ldes-server-fetch/ldes-server-fetch-rest/target/ldes-server-fetch-rest-jar-with-dependencies.jar ./lib/
COPY ./ldes-server-admin/ldes-server-admin-rest/target/ldes-server-admin-rest-jar-with-dependencies.jar ./lib/

COPY ./ldes-server-ingest/ldes-server-ingest-kafka/target/ldes-server-ingest-kafka-jar-with-dependencies.jar ./lib/

# Plugin Fragmentations
COPY ./ldes-server-fragmentation/ldes-server-fragmentation-geospatial/target/ldes-server-fragmentation-geospatial-jar-with-dependencies.jar ./lib/
COPY ./ldes-server-fragmentation/ldes-server-fragmentation-timebased-hierarchical/target/ldes-server-fragmentation-timebased-hierarchical-jar-with-dependencies.jar ./lib/
Expand Down
25 changes: 13 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,19 @@ Each maven profile represents a different functionality of the LDES server that
The default exported image contains all the profiles, but a custom image can be created with only the needed
dependencies.

| Profile | Dependencies | Description |
|----------------------------|--------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|
| `fragmentation` | `postgres-pagination-repository` | Allows basic fragmentation (pagination) |
| `maintenance` | `postgres-maintenance-repository` | Allows the LDES server to perform maintenance operations on its Event Streams and Members: compaction, retention, deletion. |
| **Interfaces** | | |
| `http-admin` | `ldes-server-admin-rest`,`postgres-admin-repository`* | Gives access to REST API to create and manage Event Streams and Views. |
| `http-ingest` | `ldes-server-ingest-rest`,`postgres-ingest-repository` | Gives access to REST API to ingest members into the LDES. |
| `http-fetch` | `ldes-server-fetch-rest`,`postgres-fetch-repository` | Gives access to REST API to fetch Event Streams, its Views and pages. |
| **Plugin Fragmentations** | | |
| `fragmentation-timebased` | `ldes-server-fragmentation-timebased-hierarchical` | Allows fragmentation in based on a timebased property. |
| `fragmentation-geospatial` | `ldes-server-fragmentation-geospatial` | Allows fragmentation in based on a geospatial property. |
| `fragmentation-reference` | `ldes-server-fragmentation-reference` | Allows fragmentation in based on a textual property. |
| Profile | Dependencies | Description |
|----------------------------|---------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|
| `fragmentation` | `postgres-pagination-repository` | Allows basic fragmentation (pagination) |
| `maintenance` | `postgres-maintenance-repository` | Allows the LDES server to perform maintenance operations on its Event Streams and Members: compaction, retention, deletion. |
| **Interfaces** | | |
| `http-admin` | `ldes-server-admin-rest`,`postgres-admin-repository`* | Gives access to REST API to create and manage Event Streams and Views. |
| `http-ingest` | `ldes-server-ingest-rest`,`postgres-ingest-repository` | Gives access to REST API to ingest members into the LDES. |
| `kafka-ingest` | `ldes-server-ingest-kafka`,`postgres-ingest-repository` | Allows Kafka member ingestion into the LDES. |
| `http-fetch` | `ldes-server-fetch-rest`,`postgres-fetch-repository` | Gives access to REST API to fetch Event Streams, its Views and pages. |
| **Plugin Fragmentations** | | |
| `fragmentation-timebased` | `ldes-server-fragmentation-timebased-hierarchical` | Allows fragmentation in based on a timebased property. |
| `fragmentation-geospatial` | `ldes-server-fragmentation-geospatial` | Allows fragmentation in based on a geospatial property. |
| `fragmentation-reference` | `ldes-server-fragmentation-reference` | Allows fragmentation in based on a textual property. |


*: The `postgres-admin-repository`, as shown by the dependency graph, will be loaded in by the other above-mentioned functionality profiles.
Expand Down
4 changes: 4 additions & 0 deletions content/dependency-graph.dot
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ digraph "ldes-server" {
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-ingest-common:jar:compile"[label=<ldes-server-ingest-common>]
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-fetch-rest:jar:compile"[label=<ldes-server-fetch-rest>, fillcolor="#87CEFA", style="filled"]
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-fetch-common:jar:compile"[label=<ldes-server-fetch-common>]
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-ingest-kafka:jar:compile"[label=<ldes-server-ingest-kafka>, fillcolor="#87CEFA", style="filled"]
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-retention:jar:compile"[label=<ldes-server-retention>]
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-maintenance-common:jar:compile"[label=<ldes-server-maintenance-common>]
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-compaction:jar:compile"[label=<ldes-server-compaction>]
Expand All @@ -33,6 +34,7 @@ digraph "ldes-server" {
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-fragmentation-reference:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-fragmentation-common:jar:compile"
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-admin-rest:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-admin-common:jar:compile"
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-ingest-rest:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-ingest-common:jar:compile"
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-ingest-kafka:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-ingest-common:jar:compile"
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-fetch-rest:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-fetch-common:jar:compile"
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-fetch-rest:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-admin-common:jar:compile"
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-retention:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-maintenance-common:jar:compile"
Expand All @@ -52,6 +54,7 @@ digraph "ldes-server" {
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:postgres-liquibase:jar:compile" [label="Database Changes"]
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-ingest-rest:jar:compile" [label="Ingest REST API"]
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:postgres-ingest-repository:jar:compile" [label="Ingest Implementation"]
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-ingest-kafka:jar:compile" [label="Ingest Kafka"]
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-fetch-rest:jar:compile" [label="Fetch REST API"]
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:postgres-fetch-repository:jar:compile" [label="Fetch Implementation"]
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:postgres-pagination-repository:jar:compile" [label="Basic Fragmentation Implementation"]
Expand All @@ -60,4 +63,5 @@ digraph "ldes-server" {
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-fragmentation-reference:jar:compile" [label="Plugin Fragmentation"]
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:ldes-server-admin-rest:jar:compile" [label="Admin REST API"]
"be.vlaanderen.informatievlaanderen.vsds:ldes-server-application:jar:compile" -> "be.vlaanderen.informatievlaanderen.vsds:postgres-maintenance-repository:jar:compile" [label="Maintenance Implementation"]

}
Binary file modified content/ldes-server-graph.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
71 changes: 71 additions & 0 deletions docker-compose/kafka/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
version: "3.3"
services:
ldes-server:
container_name: basic_ldes-server
image: ldes/ldes-server
environment:
- SPRING_CONFIG_LOCATION=/config/
- PYROSCOPE_CONFIGURATION_FILE=/config/pyroscope.properties
volumes:
- ./docker-compose/server.config.yml:/config/application.yml:ro
- ./docker-compose/pyroscope.properties:/config/pyroscope.properties:ro
ports:
- "8080:8080"
- "8087:8087"
- "8088:8088"
- "8089:8089"
networks:
- ldes
depends_on:
- postgres
postgres:
container_name: ldes-postgres
image: postgres:14-alpine
ports:
- 5432:5432
environment:
- POSTGRES_PASSWORD=admin
- POSTGRES_USER=admin
- POSTGRES_DB=test
networks:
- ldes
pyroscope:
image: grafana/pyroscope:latest
ports:
- 4040:4040
networks:
- ldes

zookeeper:
image: zookeeper:3.7.0
ports:
- "2181:2181"

kafka:
image: confluentinc/cp-kafka:7.3.2
ports:
- "9092:9092"
- "29092:29092"
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://0.0.0.0:29092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
ALLOW_PLAINTEXT_LISTENER: 'yes'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- "9200:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
networks:
ldes:
4 changes: 4 additions & 0 deletions docs/_configuration/event-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ server:generic-eventstream a ldes:EventStream ;
genericES:shape a sh:NodeShape .
````

## Configuring an Event Stream with a Kafka source

To configure an Event Stream that ingests members from a Kafka topic, please visit the [Kafka Ingest documentation](../ingest/kafka).

## Configuring a SHACL Shape

[SHACL (Shapes Constraint Language)](https://www.w3.org./TR/shacl/) is a standard for validating RDF data and ensuring
Expand Down
70 changes: 70 additions & 0 deletions docs/_ingest/kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
---
layout: default
title: Ingest Members With Kafka
nav_order: 0
---

# Ingest Members With Kafka

Ingesting members into an Event Stream without too much overhead?
That's now possible via ingestion over Apache Kafka.

## Getting Started

To get started with ingesting members via Kafka, you need to have the following:
* Kafka consumer configuration (in the Application Properties)
* Event Stream configuration pointing to a Kafka topic (in the Admin API)

### Application Properties

The Kafka consumer configuration can be set in the `application.properties` file.

The most basic properties that are needed are:
````yaml
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
````

To guarantee that the Kafka consumer will always read from the beginning of the topic, you can add the following property:
````yaml
spring.kafka.consumer.auto-offset-reset=earliest
````

For more advanced options to configure advanced Kafka connections, please refer to the [Spring Kafka documentation](https://docs.spring.io/spring-boot/appendix/application-properties/index.html).

### Event Stream Configuration

To configure a new Event Stream that uses Kafka as the ingestion method,
you need to create an Event Stream configuration that points to a Kafka topic. \
This can be done by adding a `https://w3id.org/ldes#KafkaEventStream` object to the Event Stream configuration.

This object should contain the following properties:
* `ldes:topic` - The Kafka topic to which the members should be ingested.
* `ldes:mimeType` - The mime type in which the data of your topic will be. This is used to parse your member to a model. \
This can be `application/ld+json`, `application/json`, `text/turtle`, ... \
All members in your topic need to therefor be in one mime type.

#### Example

Creating a generic Event Stream named "event-stream" that uses Kafka as the ingestion method.

````turtle
@prefix ldes: <https://w3id.org/ldes#> .
@prefix dcterms: <http://purl.org/dc/terms/> .
@prefix prov: <http://www.w3.org/ns/prov#> .
@prefix tree: <https://w3id.org/tree#>.
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix server: <http://localhost:8080/> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix event-stream: <http://localhost:8080/event-stream/> .
server:event-stream a ldes:EventStream ;
ldes:timestampPath dcterms:created ;
ldes:versionOfPath dcterms:isVersionOf ;
tree:shape [ a sh:NodeShape ] ;
ldes:kafkaSource [
ldes:topic "testTopic" ;
ldes:mimeType "application/n-quads" ;
] .
````
2 changes: 1 addition & 1 deletion ldes-server-admin/ldes-server-admin-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>be.vlaanderen.informatievlaanderen.vsds</groupId>
<artifactId>ldes-server-admin</artifactId>
<version>3.5.0-SNAPSHOT</version>
<version>3.6.0-SNAPSHOT</version>
</parent>

<artifactId>ldes-server-admin-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ public void initViews() {
repository
.getAllEventSources()
.forEach(eventSource -> eventPublisher
.publishEvent(new DeletionPolicyChangedEvent(eventSource.getCollectionName(), eventSource.getRetentionPolicies())));
.publishEvent(new DeletionPolicyChangedEvent(eventSource.collectionName(), eventSource.retentionPolicies())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public interface EventStreamRepository {

Optional<EventStreamTO> retrieveEventStreamTO(String collectionName);

void saveEventStream(EventStreamTO eventStreamTO);
Integer saveEventStream(EventStreamTO eventStreamTO);

int deleteEventStream(String collectionName);

Expand Down
Loading

0 comments on commit 50ac3a6

Please sign in to comment.