Skip to content

Commit

Permalink
Merge branch 'zalando-nakadi:master' into issues/140-persist-multiple…
Browse files Browse the repository at this point in the history
…-events-in-batch
  • Loading branch information
fbrns authored Nov 22, 2022
2 parents 908a03a + dc01695 commit 44307b2
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 6 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,15 @@ until the lock expires. The default is currently 600 seconds but might change in
buffer is included. During the last x seconds before the expiration of the lock the events are not considered for
transmission. The default is currently 60 seconds but might change in future releases.

* **lock-size**: Defines the maximum amount of events which are loaded into memory and published in one run
(in one submission per event type). By default, all events are loaded into memory. In future releases, this
property will become mandatory.

```yaml
nakadi-producer:
lock-duration: 600
lock-duration-buffer: 60
lock-size: 5000
```

## Contributing
Expand Down
2 changes: 1 addition & 1 deletion nakadi-producer-loadtest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.5</version>
<version>42.3.3</version>
</dependency>
<dependency>
<groupId>org.zalando.stups</groupId>
Expand Down
2 changes: 1 addition & 1 deletion nakadi-producer-spring-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>9.4.1211</version>
<version>42.2.25</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ public EventLogWriter eventLogWriter(EventLogRepository eventLogRepository, Obje
}

@Bean
public EventLogRepository eventLogRepository(NamedParameterJdbcTemplate namedParameterJdbcTemplate) {
return new EventLogRepositoryImpl(namedParameterJdbcTemplate);
public EventLogRepository eventLogRepository(NamedParameterJdbcTemplate namedParameterJdbcTemplate,
@Value("${nakadi-producer.lock-size:0}") int lockSize) {
return new EventLogRepositoryImpl(namedParameterJdbcTemplate, lockSize);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;

public class EventLogRepositoryImpl implements EventLogRepository {

private NamedParameterJdbcTemplate jdbcTemplate;
private int lockSize;

public EventLogRepositoryImpl(NamedParameterJdbcTemplate jdbcTemplate) {
public EventLogRepositoryImpl(NamedParameterJdbcTemplate jdbcTemplate, int lockSize) {
this.jdbcTemplate = jdbcTemplate;
this.lockSize = lockSize;
}

@Override
Expand All @@ -37,8 +40,21 @@ public void lockSomeMessages(String lockId, Instant now, Instant lockExpires) {
namedParameterMap.put("lockId", lockId);
namedParameterMap.put("now", toSqlTimestamp(now));
namedParameterMap.put("lockExpires", toSqlTimestamp(lockExpires));

StringBuilder optionalLockSizeClause = new StringBuilder();
if (lockSize > 0) {
optionalLockSizeClause.append("LIMIT :lockSize");
namedParameterMap.put("lockSize", lockSize);
}

jdbcTemplate.update(
"UPDATE nakadi_events.event_log SET locked_by = :lockId, locked_until = :lockExpires where locked_until is null or locked_until < :now",
"UPDATE nakadi_events.event_log "
+ "SET locked_by = :lockId, locked_until = :lockExpires "
+ "WHERE id IN (SELECT id "
+ " FROM nakadi_events.event_log "
+ " WHERE locked_until IS null OR locked_until < :now "
+ optionalLockSizeClause
+ " FOR UPDATE SKIP LOCKED) ",
namedParameterMap
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.zalando.nakadiproducer;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;

import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.zalando.nakadiproducer.eventlog.EventLogWriter;
import org.zalando.nakadiproducer.transmission.impl.EventTransmissionService;
import org.zalando.nakadiproducer.util.Fixture;

@SpringBootTest(
properties = {"nakadi-producer.lock-size=3"}
)
public class EventLockSizeConfiguredIT extends BaseMockedExternalCommunicationIT {

@Autowired
private EventLogWriter eventLogWriter;

@Autowired
private EventTransmissionService eventTransmissionService;

@Test
public void eventLockSizeIsRespected() {

for (int i = 1; i <= 8; i++) {
eventLogWriter.fireBusinessEvent( "myEventType", Fixture.mockPayload(i, "code123"));
}

assertThat(eventTransmissionService.lockSomeEvents(), hasSize(3));
assertThat(eventTransmissionService.lockSomeEvents(), hasSize(3));
assertThat(eventTransmissionService.lockSomeEvents(), hasSize(2));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.zalando.nakadiproducer;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;

import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.zalando.nakadiproducer.eventlog.EventLogWriter;
import org.zalando.nakadiproducer.transmission.impl.EventTransmissionService;
import org.zalando.nakadiproducer.util.Fixture;

public class EventLockSizeDefaultIT extends BaseMockedExternalCommunicationIT {

@Autowired
private EventLogWriter eventLogWriter;

@Autowired
private EventTransmissionService eventTransmissionService;

@Test
public void defaultEventLockSizeIsUsed() {

for (int i = 1; i <= 8; i++) {
eventLogWriter.fireBusinessEvent("myEventType", Fixture.mockPayload(i, "code123"));
}

assertThat(eventTransmissionService.lockSomeEvents(), hasSize(8));
}

}

0 comments on commit 44307b2

Please sign in to comment.