Skip to content

Commit

Permalink
zalando-nakadi#112 Do not lock all available events
Browse files Browse the repository at this point in the history
  • Loading branch information
fbrns committed Nov 15, 2022
1 parent 17270e0 commit 6c31f4c
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 18 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,8 @@ until the lock expires. The default is currently 600 seconds but might change in
buffer is included. During the last x seconds before the expiration of the lock the events are not considered for
transmission. The default is currently 60 seconds but might change in future releases.

* **lock-size**: Defines the amount of events which are loaded into memory and published in one run.
By default, all events are loaded into memory.
* **lock-size**: Defines the maximum amount of events which are loaded into memory and published in one run
(in one submission per event type). By default, all events are loaded into memory.

```yaml
nakadi-producer:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ public EventLogWriter eventLogWriter(EventLogRepository eventLogRepository, Obje
}

@Bean
public EventLogRepository eventLogRepository(NamedParameterJdbcTemplate namedParameterJdbcTemplate) {
return new EventLogRepositoryImpl(namedParameterJdbcTemplate);
public EventLogRepository eventLogRepository(NamedParameterJdbcTemplate namedParameterJdbcTemplate,
@Value("${nakadi-producer.lock-size:0}") int lockSize) {
return new EventLogRepositoryImpl(namedParameterJdbcTemplate, lockSize);
}

@Bean
Expand All @@ -153,11 +154,10 @@ public EventTransmissionService eventTransmissionService(
EventLogRepository eventLogRepository,
NakadiPublishingClient nakadiPublishingClient,
ObjectMapper objectMapper,
@Value("${nakadi-producer.lock-size:0}") int lockSize,
@Value("${nakadi-producer.lock-duration:600}") int lockDuration,
@Value("${nakadi-producer.lock-duration-buffer:60}") int lockDurationBuffer) {
return new EventTransmissionService(
eventLogRepository, nakadiPublishingClient, objectMapper, lockSize, lockDuration, lockDurationBuffer);
eventLogRepository, nakadiPublishingClient, objectMapper, lockDuration, lockDurationBuffer);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
import org.springframework.jdbc.support.GeneratedKeyHolder;

public class EventLogRepositoryImpl implements EventLogRepository {

private NamedParameterJdbcTemplate jdbcTemplate;
private int lockSize;

public EventLogRepositoryImpl(NamedParameterJdbcTemplate jdbcTemplate) {
public EventLogRepositoryImpl(NamedParameterJdbcTemplate jdbcTemplate, int lockSize) {
this.jdbcTemplate = jdbcTemplate;
this.lockSize = lockSize;
}

@Override
Expand All @@ -32,7 +35,7 @@ public Collection<EventLog> findByLockedByAndLockedUntilGreaterThan(String locke
}

@Override
public void lockSomeMessages(String lockId, int lockSize, Instant now, Instant lockExpires) {
public void lockSomeMessages(String lockId, Instant now, Instant lockExpires) {
Map<String, Object> namedParameterMap = new HashMap<>();
namedParameterMap.put("lockId", lockId);
namedParameterMap.put("now", toSqlTimestamp(now));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
public interface EventLogRepository {
Collection<EventLog> findByLockedByAndLockedUntilGreaterThan(String lockedBy, Instant lockedUntil);

void lockSomeMessages(String lockId, int lockSize, Instant now, Instant lockExpires);
void lockSomeMessages(String lockId, Instant now, Instant lockExpires);

void delete(EventLog eventLog);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,25 @@ public class EventTransmissionService {
private final EventLogRepository eventLogRepository;
private final NakadiPublishingClient nakadiPublishingClient;
private final ObjectMapper objectMapper;
private final int lockSize;
private final int lockDuration;
private final int lockDurationBuffer;

private Clock clock = Clock.systemDefaultZone();

public EventTransmissionService(EventLogRepository eventLogRepository,
NakadiPublishingClient nakadiPublishingClient, ObjectMapper objectMapper, int lockSize,
int lockDuration, int lockDurationBuffer) {
public EventTransmissionService(EventLogRepository eventLogRepository, NakadiPublishingClient nakadiPublishingClient, ObjectMapper objectMapper,
int lockDuration, int lockDurationBuffer) {
this.eventLogRepository = eventLogRepository;
this.nakadiPublishingClient = nakadiPublishingClient;
this.objectMapper = objectMapper;
this.lockSize = lockSize;
this.lockDuration = lockDuration;
this.lockDurationBuffer = lockDurationBuffer;
}

@Transactional
public Collection<EventLog> lockSomeEvents() {
String lockId = UUID.randomUUID().toString();
log.debug("Locking {} events for replication with lockId {} for {} seconds", lockSize,
lockId, lockDuration);
eventLogRepository.lockSomeMessages(lockId, lockSize, now(), now().plus(lockDuration, SECONDS));
log.debug("Locking events for replication with lockId {} for {} seconds", lockId, lockDuration);
eventLogRepository.lockSomeMessages(lockId, now(), now().plus(lockDuration, SECONDS));
return eventLogRepository.findByLockedByAndLockedUntilGreaterThan(lockId, now());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void setUp() {
repo = mock(EventLogRepository.class);
publishingClient = spy(new MockNakadiPublishingClient());
mapper = spy(new ObjectMapper());
service = new EventTransmissionService(repo, publishingClient, mapper, 0, 600, 60);
service = new EventTransmissionService(repo, publishingClient, mapper, 600, 60);
}

@Test
Expand Down

0 comments on commit 6c31f4c

Please sign in to comment.