From 4e41d31562f30a58c8c4e029d48bc16cdf715ab6 Mon Sep 17 00:00:00 2001 From: Paul Ebermann Date: Fri, 28 Jul 2023 19:11:23 +0200 Subject: [PATCH 1/2] Batch deletion of eventlogs * repository: add batch deletion method to interface (with default implementation) * repository: implement batch deletion (uses updateBatch) * transmission: use batch deletion --- .../eventlog/impl/EventLogRepositoryImpl.java | 18 +++++++--- .../eventlog/impl/EventLogRepository.java | 4 +++ .../impl/EventTransmissionService.java | 2 +- .../impl/EventTransmissionServiceTest.java | 34 +++++++++++++------ 4 files changed, 42 insertions(+), 16 deletions(-) diff --git a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java index 9ed63e3..c62dd60 100644 --- a/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java +++ b/nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java @@ -61,11 +61,19 @@ public void lockSomeMessages(String lockId, Instant now, Instant lockExpires) { @Override public void delete(EventLog eventLog) { - Map namedParameterMap = new HashMap<>(); - namedParameterMap.put("id", eventLog.getId()); - jdbcTemplate.update( - "DELETE FROM nakadi_events.event_log where id = :id", - namedParameterMap + delete(Collections.singleton(eventLog)); + } + + @Override + public void delete(Collection eventLogs) { + MapSqlParameterSource[] namedParameterMaps = eventLogs.stream() + .map(eventLog -> + new MapSqlParameterSource().addValue("id", eventLog.getId()) + ).toArray(MapSqlParameterSource[]::new); + + jdbcTemplate.batchUpdate( + "DELETE FROM nakadi_events.event_log where id = :id", + namedParameterMaps ); } diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java index 1d3bcbd..2cfdb31 100644 --- a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepository.java @@ -10,6 +10,10 @@ public interface EventLogRepository { void delete(EventLog eventLog); + default void delete(Collection eventLogs) { + eventLogs.forEach(this::delete); + } + void persist(EventLog eventLog); default void persist(Collection eventLogs) { diff --git a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java index f56f730..a12ad38 100644 --- a/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java +++ b/nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java @@ -116,7 +116,7 @@ private void tryToPublishBatch(List batch) throws Exception { .filter(rawEvent -> !failedEids.contains(convertToUUID(rawEvent.getId()))); } - successfulEvents.forEach(eventLogRepository::delete); + eventLogRepository.delete(successfulEvents.collect(Collectors.toList())); } private List collectEids(EventPublishingException e) { diff --git a/nakadi-producer/src/test/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionServiceTest.java b/nakadi-producer/src/test/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionServiceTest.java index 1517ec6..8ea8a05 100644 --- a/nakadi-producer/src/test/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionServiceTest.java +++ b/nakadi-producer/src/test/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionServiceTest.java @@ -6,7 +6,10 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; import org.zalando.fahrschein.EventPublishingException; import org.zalando.fahrschein.domain.BatchItemResponse; import org.zalando.nakadiproducer.eventlog.impl.EventLog; @@ -16,20 +19,23 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; +import java.util.stream.Collectors; import static com.jayway.jsonpath.JsonPath.read; import static java.time.Instant.now; import static java.time.temporal.ChronoUnit.MINUTES; import static java.time.temporal.ChronoUnit.SECONDS; import static java.util.Collections.singletonList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; -import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doCallRealMethod; @@ -47,6 +53,8 @@ public class EventTransmissionServiceTest { private MockNakadiPublishingClient publishingClient; private ObjectMapper mapper; private EventLogRepository repo; + @Captor + private ArgumentCaptor> eventLogColCaptor; @BeforeEach public void setUp() { @@ -54,6 +62,7 @@ public void setUp() { publishingClient = spy(new MockNakadiPublishingClient()); mapper = spy(new ObjectMapper().registerModules(new JavaTimeModule())); service = new EventTransmissionService(repo, publishingClient, mapper, 600, 60); + MockitoAnnotations.openMocks(this); } @Test @@ -135,9 +144,8 @@ public void testErrorInPayloadDeserializationIsHandledGracefully() throws IOExce assertThat(read(type2Events.get(0), "$.metadata.eid"), is("00000000-0000-0000-0000-000000000003")); // and only the successful ones have been deleted. - verify(repo).delete(ev1); - verify(repo, never()).delete(ev2); - verify(repo).delete(ev3); + List deletedEvents = verifyDeletionAndGetAllDeletedEvents(); + assertThat(deletedEvents, containsInAnyOrder(ev1, ev3)); } @Test @@ -164,9 +172,8 @@ public void testUnknownErrorInTransmissionIsHandledGracefully() throws Exception assertThat(read(type2Events.get(0), "$.metadata.eid"), is("00000000-0000-0000-0000-000000000003")); // and only the successful ones have been deleted. - verify(repo, never()).delete(ev1); - verify(repo, never()).delete(ev2); - verify(repo).delete(ev3); + List deletedEvents = verifyDeletionAndGetAllDeletedEvents(); + assertThat(deletedEvents, containsInAnyOrder(ev3)); } @Test @@ -196,9 +203,8 @@ public void testEventPublishingExceptionIsHandledGracefully() throws Exception { assertThat(read(type2Events.get(0), "$.metadata.eid"), is("00000000-0000-0000-0000-000000000003")); // and only the successful ones have been deleted. - verify(repo).delete(ev1); - verify(repo, never()).delete(ev2); - verify(repo).delete(ev3); + List deletedEvents = verifyDeletionAndGetAllDeletedEvents(); + assertThat(deletedEvents, containsInAnyOrder(ev1, ev3)); } @Test @@ -260,4 +266,12 @@ private TypeReference> anyLinkedHashmapTypeReferen return any(); } + private List verifyDeletionAndGetAllDeletedEvents() { + verify(repo, Mockito.atLeastOnce()).delete(eventLogColCaptor.capture()); + verify(repo, never()).delete(any(EventLog.class)); + return eventLogColCaptor.getAllValues() + .stream() + .flatMap(c -> c.stream()) + .collect(Collectors.toList()); + } } From 23b471f93914ecc866582bc83fd43b981aa4912a Mon Sep 17 00:00:00 2001 From: Paul Ebermann Date: Fri, 28 Jul 2023 19:48:37 +0200 Subject: [PATCH 2/2] add test for deletion --- .../eventlog/impl/EventLogRepositoryIT.java | 37 ++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryIT.java b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryIT.java index c8e887f..4370e1e 100644 --- a/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryIT.java +++ b/nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryIT.java @@ -1,6 +1,7 @@ package org.zalando.nakadiproducer.eventlog.impl; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.core.Is.is; import javax.transaction.Transactional; @@ -8,9 +9,12 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jdbc.core.BeanPropertyRowMapper; import org.springframework.jdbc.core.JdbcTemplate; import org.zalando.nakadiproducer.BaseMockedExternalCommunicationIT; +import java.util.List; + @Transactional public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT { @@ -44,14 +48,45 @@ public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT { public void setUp() { eventLogRepository.deleteAll(); + persistTestEvent("FLOW_ID"); + } + + private void persistTestEvent(String flowId) { final EventLog eventLog = EventLog.builder() .eventBodyData(WAREHOUSE_EVENT_BODY_DATA) .eventType(WAREHOUSE_EVENT_TYPE) .compactionKey(COMPACTION_KEY) - .flowId("FLOW_ID").build(); + .flowId(flowId) + .build(); eventLogRepository.persist(eventLog); } + @Test + public void testDeleteMultipleEvents() { + persistTestEvent("second_Flow-ID"); + persistTestEvent("third flow-ID"); + persistTestEvent("fourth flow-ID"); + persistTestEvent("fifth flow-ID"); + + List events = findAllEventsInDB(); + assertThat(events, hasSize(5)); + EventLog notDeleted = events.remove(0); + + // now the actual test – delete just 4 of the 5 events from the DB + eventLogRepository.delete(events); + + List remaining = findAllEventsInDB(); + assertThat(remaining, hasSize(1)); + assertThat(remaining.get(0).getId(), is(notDeleted.getId())); + assertThat(remaining.get(0).getFlowId(), is(notDeleted.getFlowId())); + } + + private List findAllEventsInDB() { + return jdbcTemplate.query( + "SELECT * FROM nakadi_events.event_log", + new BeanPropertyRowMapper<>(EventLog.class)); + } + @Test public void testFindEventInRepositoryById() { Integer id = jdbcTemplate.queryForObject(