diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java index e9ca335..05d6506 100644 --- a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java @@ -3,29 +3,83 @@ import java.time.Instant; import java.util.Collection; +/** + * This interface represents the database table where event log entries are stored. + * Normal users of the library don't need to care about this, it's implemented in + * nakadi-producer-spring-boot-starter and used in nakadi-producer. + * Only if you are using nakadi-producer without the spring-boot-starter, you'll have to implement it. + */ public interface EventLogRepository { - Collection findByLockedByAndLockedUntilGreaterThan(String lockedBy, Instant lockedUntil); + /** + * Fetched events which were locked by the given lock ID, and whose lock is not yet expired. + * @param lockId the lock ID used for locking. This should be the same value as previously used + * in {@link #lockSomeMessages(String, Instant, Instant)} for locking the event log entries. + * @param lockedUntil A cut-off for the expiry time. Use a time here where you are reasonably confident + * that you can send out the fetched events until this time. + * @return the fetched events. + */ + Collection findByLockedByAndLockedUntilGreaterThan(String lockId, Instant lockedUntil); + /** + * Lock some event log entries, so that other instances won't try to send them out. + * You can later retrieve the locked entries with {@link #findByLockedByAndLockedUntilGreaterThan(String, Instant)}. + * @param lockId a unique identifier for this instance / job run / etc. The same value should + * later be used for fetching them in {@link #findByLockedByAndLockedUntilGreaterThan(String, Instant)}. + * @param now existing locked event logs whose lock expiry time is before this value can be locked again. + * @param lockExpires an expiry time to use for the new locks. + */ void lockSomeMessages(String lockId, Instant now, Instant lockExpires); + /** + * Deletes a single event log entry from the database. + * @param eventLog the event log entry. Only its id property is used. + */ void delete(EventLog eventLog); + /** + * Deletes multiple event log entries. + * @param eventLogs A collection of event log entries. + * Only their id properties are be used. + */ default void delete(Collection eventLogs) { eventLogs.forEach(this::delete); } + /** + * Persists a single eventlog entry. + * @param eventLog the event log entry to insert into the database. + * On return, it's id property will be filled with the generated identifier. + */ void persist(EventLog eventLog); + /** + * Persist multiple event log entries at once. + * @param eventLogs A collection of event logs entries. + * On return, their id properties will be filled with the generated identifiers. + */ default void persist(Collection eventLogs) { eventLogs.forEach(this::persist); } + /** + * Persists and immediately deletes some event log entries. + * This is meant to be used together with infrastructure listening to a logical DB replication stream. + * @param eventLogs the event log entries to be persisted and deleted. + */ default void persistAndDelete(Collection eventLogs) { persist(eventLogs); delete(eventLogs); }; + /** + * Deletes all event log entries. This is only meant for cleanup in tests. + */ void deleteAll(); + /** + * Fetches a specific event log by its ID. This is only meant to be used in tests. + * @param id the id attribute. + * @return the event log entry with the given ID, or null if there is none. + */ EventLog findOne(Integer id); }