diff --git a/README.md b/README.md
index 530bf112..3a2a4d6d 100644
--- a/README.md
+++ b/README.md
@@ -335,10 +335,15 @@ until the lock expires. The default is currently 600 seconds but might change in
buffer is included. During the last x seconds before the expiration of the lock the events are not considered for
transmission. The default is currently 60 seconds but might change in future releases.
+* **lock-size**: Defines the maximum amount of events which are loaded into memory and published in one run
+(in one submission per event type). By default, all events are loaded into memory. In future releases, this
+property will become mandatory.
+
```yaml
nakadi-producer:
lock-duration: 600
lock-duration-buffer: 60
+ lock-size: 5000
```
## Contributing
diff --git a/nakadi-producer-loadtest/pom.xml b/nakadi-producer-loadtest/pom.xml
index 9720b3f7..b6e08351 100644
--- a/nakadi-producer-loadtest/pom.xml
+++ b/nakadi-producer-loadtest/pom.xml
@@ -11,7 +11,7 @@
org.zalando
nakadi-producer-reactor
- 20.3.1
+ 20.4.0
@@ -62,4 +62,4 @@
-
\ No newline at end of file
+
diff --git a/nakadi-producer-spring-boot-starter/pom.xml b/nakadi-producer-spring-boot-starter/pom.xml
index 66a8a38e..bdd9fa62 100644
--- a/nakadi-producer-spring-boot-starter/pom.xml
+++ b/nakadi-producer-spring-boot-starter/pom.xml
@@ -10,7 +10,7 @@
org.zalando
nakadi-producer-reactor
- 20.3.1
+ 20.4.0
nakadi-producer-spring-boot-starter
@@ -194,4 +194,4 @@
-
\ No newline at end of file
+
diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java
index 6344d734..6f35e23c 100644
--- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java
+++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java
@@ -133,8 +133,9 @@ public EventLogWriter eventLogWriter(EventLogRepository eventLogRepository, Obje
}
@Bean
- public EventLogRepository eventLogRepository(NamedParameterJdbcTemplate namedParameterJdbcTemplate) {
- return new EventLogRepositoryImpl(namedParameterJdbcTemplate);
+ public EventLogRepository eventLogRepository(NamedParameterJdbcTemplate namedParameterJdbcTemplate,
+ @Value("${nakadi-producer.lock-size:0}") int lockSize) {
+ return new EventLogRepositoryImpl(namedParameterJdbcTemplate, lockSize);
}
@Bean
diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java
index b8af1121..bbb62b40 100644
--- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java
+++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java
@@ -13,10 +13,13 @@
import org.springframework.jdbc.support.GeneratedKeyHolder;
public class EventLogRepositoryImpl implements EventLogRepository {
+
private NamedParameterJdbcTemplate jdbcTemplate;
+ private int lockSize;
- public EventLogRepositoryImpl(NamedParameterJdbcTemplate jdbcTemplate) {
+ public EventLogRepositoryImpl(NamedParameterJdbcTemplate jdbcTemplate, int lockSize) {
this.jdbcTemplate = jdbcTemplate;
+ this.lockSize = lockSize;
}
@Override
@@ -37,8 +40,21 @@ public void lockSomeMessages(String lockId, Instant now, Instant lockExpires) {
namedParameterMap.put("lockId", lockId);
namedParameterMap.put("now", toSqlTimestamp(now));
namedParameterMap.put("lockExpires", toSqlTimestamp(lockExpires));
+
+ StringBuilder optionalLockSizeClause = new StringBuilder();
+ if (lockSize > 0) {
+ optionalLockSizeClause.append("LIMIT :lockSize");
+ namedParameterMap.put("lockSize", lockSize);
+ }
+
jdbcTemplate.update(
- "UPDATE nakadi_events.event_log SET locked_by = :lockId, locked_until = :lockExpires where locked_until is null or locked_until < :now",
+ "UPDATE nakadi_events.event_log "
+ + "SET locked_by = :lockId, locked_until = :lockExpires "
+ + "WHERE id IN (SELECT id "
+ + " FROM nakadi_events.event_log "
+ + " WHERE locked_until IS null OR locked_until < :now "
+ + optionalLockSizeClause
+ + " FOR UPDATE SKIP LOCKED) ",
namedParameterMap
);
}
diff --git a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/EventLockSizeConfiguredIT.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/EventLockSizeConfiguredIT.java
new file mode 100644
index 00000000..ba616d86
--- /dev/null
+++ b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/EventLockSizeConfiguredIT.java
@@ -0,0 +1,36 @@
+package org.zalando.nakadiproducer;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.zalando.nakadiproducer.eventlog.EventLogWriter;
+import org.zalando.nakadiproducer.transmission.impl.EventTransmissionService;
+import org.zalando.nakadiproducer.util.Fixture;
+
+@SpringBootTest(
+ properties = {"nakadi-producer.lock-size=3"}
+)
+public class EventLockSizeConfiguredIT extends BaseMockedExternalCommunicationIT {
+
+ @Autowired
+ private EventLogWriter eventLogWriter;
+
+ @Autowired
+ private EventTransmissionService eventTransmissionService;
+
+ @Test
+ public void eventLockSizeIsRespected() {
+
+ for (int i = 1; i <= 8; i++) {
+ eventLogWriter.fireBusinessEvent( "myEventType", Fixture.mockPayload(i, "code123"));
+ }
+
+ assertThat(eventTransmissionService.lockSomeEvents(), hasSize(3));
+ assertThat(eventTransmissionService.lockSomeEvents(), hasSize(3));
+ assertThat(eventTransmissionService.lockSomeEvents(), hasSize(2));
+ }
+
+}
diff --git a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/EventLockSizeDefaultIT.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/EventLockSizeDefaultIT.java
new file mode 100644
index 00000000..08ddfc8a
--- /dev/null
+++ b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/EventLockSizeDefaultIT.java
@@ -0,0 +1,30 @@
+package org.zalando.nakadiproducer;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.zalando.nakadiproducer.eventlog.EventLogWriter;
+import org.zalando.nakadiproducer.transmission.impl.EventTransmissionService;
+import org.zalando.nakadiproducer.util.Fixture;
+
+public class EventLockSizeDefaultIT extends BaseMockedExternalCommunicationIT {
+
+ @Autowired
+ private EventLogWriter eventLogWriter;
+
+ @Autowired
+ private EventTransmissionService eventTransmissionService;
+
+ @Test
+ public void defaultEventLockSizeIsUsed() {
+
+ for (int i = 1; i <= 8; i++) {
+ eventLogWriter.fireBusinessEvent("myEventType", Fixture.mockPayload(i, "code123"));
+ }
+
+ assertThat(eventTransmissionService.lockSomeEvents(), hasSize(8));
+ }
+
+}
diff --git a/nakadi-producer-starter-spring-boot-2-test/pom.xml b/nakadi-producer-starter-spring-boot-2-test/pom.xml
index 2d434a14..8274d3ad 100644
--- a/nakadi-producer-starter-spring-boot-2-test/pom.xml
+++ b/nakadi-producer-starter-spring-boot-2-test/pom.xml
@@ -10,7 +10,7 @@
org.zalando
nakadi-producer-reactor
- 20.3.1
+ 20.4.0
@@ -92,4 +92,4 @@
-
\ No newline at end of file
+
diff --git a/nakadi-producer/pom.xml b/nakadi-producer/pom.xml
index 2c433840..c054c322 100644
--- a/nakadi-producer/pom.xml
+++ b/nakadi-producer/pom.xml
@@ -10,7 +10,7 @@
org.zalando
nakadi-producer-reactor
- 20.3.1
+ 20.4.0
nakadi-producer
@@ -161,4 +161,4 @@
-
\ No newline at end of file
+
diff --git a/pom.xml b/pom.xml
index 4556ac4a..38af18ee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
nakadi-producer-reactor
org.zalando
- 20.3.1
+ 20.4.0
pom
Nakadi Event Producer Reactor
@@ -78,4 +78,4 @@
-
\ No newline at end of file
+