From 07e204744183dc6c9660a10f335bd683869df88b Mon Sep 17 00:00:00 2001 From: Mohammed Aghil Puthiyottil <57040494+MohammedAghil@users.noreply.github.com> Date: Fri, 7 Feb 2025 21:05:59 +0000 Subject: [PATCH] Modified ZeroBufferTests to use MockitoExtension and addressed comments Signed-off-by: Mohammed Aghil Puthiyottil <57040494+MohammedAghil@users.noreply.github.com> --- .../core/pipeline/buffer/ZeroBufferTests.java | 73 +++++++++++++------ 1 file changed, 49 insertions(+), 24 deletions(-) diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/buffer/ZeroBufferTests.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/buffer/ZeroBufferTests.java index 270ebf7625..653b2b4f8c 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/buffer/ZeroBufferTests.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/buffer/ZeroBufferTests.java @@ -8,13 +8,18 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mock; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.core.pipeline.PipelineRunner; import org.opensearch.dataprepper.model.CheckpointState; import org.opensearch.dataprepper.model.configuration.PipelineDescription; @@ -28,6 +33,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +@ExtendWith(MockitoExtension.class) public class ZeroBufferTests { private static final Logger LOG = LoggerFactory.getLogger(ZeroBufferTests.class); private static final String MOCK_PIPELINE_NAME = "mock-pipeline"; @@ -35,6 +41,11 @@ public class ZeroBufferTests { private static final int READ_TIMEOUT = 500; private static final String SINGLE_RECORD_DATA_FORMAT = "{\"message\":\"test\"}"; private static final String BATCH_RECORDS_DATA_FORMAT = "{\"message\":\"test-%d\"}"; + @Mock + PipelineDescription pipelineDescription; + @Mock + PipelineRunner pipelineRunner; + @BeforeEach public void setup() { @@ -49,7 +60,8 @@ public void setup() { class WriteTests { @Test public void testSingleWriteAndReadReturnsCorrectRecord() throws Exception { - ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + ZeroBuffer> zeroBuffer = createObjectUnderTest(); + doNothing().when(pipelineRunner).runAllProcessorsAndPublishToSinks(); zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); @@ -61,11 +73,13 @@ public void testSingleWriteAndReadReturnsCorrectRecord() throws Exception { zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); readRecords = zeroBuffer.read(READ_TIMEOUT).getKey(); assertEquals(1, readRecords.size()); + verify(pipelineRunner, times(2)).runAllProcessorsAndPublishToSinks(); } @Test public void testMultipleWriteAndReadReturnsCorrectRecord() throws Exception { - ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + ZeroBuffer> zeroBuffer = createObjectUnderTest(); + doNothing().when(pipelineRunner).runAllProcessorsAndPublishToSinks(); zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); @@ -74,11 +88,13 @@ public void testMultipleWriteAndReadReturnsCorrectRecord() throws Exception { assertEquals(2, readRecords.size()); assertEquals(SINGLE_RECORD_DATA_FORMAT, readRecords.iterator().next().getData()); assertEquals(SINGLE_RECORD_DATA_FORMAT, readRecords.iterator().next().getData()); + verify(pipelineRunner, times(2)).runAllProcessorsAndPublishToSinks(); } @Test public void testWriteAllAndReadReturnsAllRecords() throws Exception { - ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + ZeroBuffer> zeroBuffer = createObjectUnderTest(); + doNothing().when(pipelineRunner).runAllProcessorsAndPublishToSinks(); Collection> writeRecords = generateRecords(IntStream.range(0, 10) .mapToObj(i -> String.format(BATCH_RECORDS_DATA_FORMAT, i)) @@ -93,11 +109,12 @@ public void testWriteAllAndReadReturnsAllRecords() throws Exception { // Ensure that the write records are the same as the read records assertEquals(writeRecords.size(), readRecords.size()); + verify(pipelineRunner).runAllProcessorsAndPublishToSinks(); } @Test public void testWriteNullRecordThrowsException() { - ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + ZeroBuffer> zeroBuffer = createObjectUnderTest(); Exception writeException = assertThrows(NullPointerException.class, () -> { zeroBuffer.write(null, WRITE_TIMEOUT); @@ -109,22 +126,25 @@ public void testWriteNullRecordThrowsException() { assertEquals("The write record cannot be null", writeException.getMessage()); assertEquals("The write records cannot be null", writeAllException.getMessage()); + verify(pipelineRunner, never()).runAllProcessorsAndPublishToSinks(); } @Test public void testWriteEmptyRecordDoesNotThrowException() { - ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + ZeroBuffer> zeroBuffer = createObjectUnderTest(); + doNothing().when(pipelineRunner).runAllProcessorsAndPublishToSinks(); Record emptyRecord = generateRecord(null); Collection> emptyRecordCollection = generateRecords(new ArrayList<>()); assertDoesNotThrow(() -> zeroBuffer.write(emptyRecord, WRITE_TIMEOUT)); assertDoesNotThrow(() -> zeroBuffer.writeAll(emptyRecordCollection, WRITE_TIMEOUT)); + verify(pipelineRunner, times(2)).runAllProcessorsAndPublishToSinks(); } @Test public void testThreadReadAndWriteIsolation() throws Exception { - final ZeroBuffer> zeroBuffer = initializeZeroBufferWithPipelineName(); + final ZeroBuffer> zeroBuffer = createObjectUnderTestWithPipelineName(); Thread workerThread = new Thread(() -> { try { @@ -132,6 +152,7 @@ public void testThreadReadAndWriteIsolation() throws Exception { zeroBuffer.setPipelineRunner(pipelineRunnerMock); doNothing().when(pipelineRunnerMock).runAllProcessorsAndPublishToSinks(); zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); + verify(pipelineRunnerMock).runAllProcessorsAndPublishToSinks(); } catch (TimeoutException e) { fail("Timeout exception occurred"); } @@ -142,11 +163,13 @@ public void testThreadReadAndWriteIsolation() throws Exception { // Ensure that main thread does not share the same records store as the worker thread assertEquals(0, zeroBuffer.read(READ_TIMEOUT).getKey().size()); assertTrue(zeroBuffer.isEmpty()); + verify(pipelineRunner, never()).runAllProcessorsAndPublishToSinks(); } @Test public void testWriteAndWriteAllReturnsCorrectRecords() throws Exception { - ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + ZeroBuffer> zeroBuffer = createObjectUnderTest(); + doNothing().when(pipelineRunner).runAllProcessorsAndPublishToSinks(); zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); zeroBuffer.writeAll(generateRecords(IntStream.range(0, 10) @@ -158,6 +181,7 @@ public void testWriteAndWriteAllReturnsCorrectRecords() throws Exception { LOG.debug(record.getData()); } assertEquals(11, readRecords.size()); + verify(pipelineRunner, times(2)).runAllProcessorsAndPublishToSinks(); } } @@ -165,7 +189,7 @@ public void testWriteAndWriteAllReturnsCorrectRecords() throws Exception { class ReadTests { @Test public void testReadFromNonEmptyBufferReturnsCorrectRecords() throws Exception { - ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + ZeroBuffer> zeroBuffer = createObjectUnderTest(); zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); @@ -176,14 +200,16 @@ public void testReadFromNonEmptyBufferReturnsCorrectRecords() throws Exception { assertEquals(SINGLE_RECORD_DATA_FORMAT, initialReadRecords.iterator().next().getData()); assertEquals(0, secondAttemptToReadRecords.size()); + verify(pipelineRunner).runAllProcessorsAndPublishToSinks(); } @Test public void testReadFromEmptyBufferReturnsNoRecords() { - ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + ZeroBuffer> zeroBuffer = createObjectUnderTest(); Map.Entry>, CheckpointState> readRecordsMap = zeroBuffer.read(READ_TIMEOUT); assertTrue(readRecordsMap.getKey().isEmpty()); + verify(pipelineRunner, never()).runAllProcessorsAndPublishToSinks(); } } @@ -191,17 +217,19 @@ public void testReadFromEmptyBufferReturnsNoRecords() { class EmptyBufferTests { @Test public void testIsEmptyReturnsTrueWhenBufferIsEmpty() { - ZeroBuffer> zeroBuffer = initializeZeroBufferWithPipelineName(); + ZeroBuffer> zeroBuffer = createObjectUnderTestWithPipelineName(); assertTrue(zeroBuffer.isEmpty()); + verify(pipelineRunner, never()).runAllProcessorsAndPublishToSinks(); } @Test public void testIsEmptyReturnsFalseWhenBufferIsNotEmpty() throws Exception { - ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + ZeroBuffer> zeroBuffer = createObjectUnderTest(); zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT); assertFalse(zeroBuffer.isEmpty()); + verify(pipelineRunner).runAllProcessorsAndPublishToSinks(); } } @@ -209,13 +237,13 @@ public void testIsEmptyReturnsFalseWhenBufferIsNotEmpty() throws Exception { class CommonTests { @Test public void testCreateZeroBufferWithPipelineName() { - ZeroBuffer> zeroBuffer = initializeZeroBufferWithPipelineName(); + ZeroBuffer> zeroBuffer = createObjectUnderTestWithPipelineName(); assertEquals(MOCK_PIPELINE_NAME, zeroBuffer.pipelineName); } @Test public void testCheckpointDoesNotThrowException() { - ZeroBuffer> zeroBuffer = setupAndInitializeZeroBuffer(); + ZeroBuffer> zeroBuffer = createObjectUnderTest(); assertDoesNotThrow(() -> zeroBuffer.checkpoint(null)); assertDoesNotThrow(() -> zeroBuffer.checkpoint(new CheckpointState(0))); } @@ -235,16 +263,13 @@ private Collection> generateRecords(Collection data) { return records; } - private ZeroBuffer> setupAndInitializeZeroBuffer() { - ZeroBuffer> zeroBuffer = initializeZeroBufferWithPipelineName(); - PipelineRunner pipelineRunnerMock = mock(PipelineRunner.class); - zeroBuffer.setPipelineRunner(pipelineRunnerMock); - doNothing().when(pipelineRunnerMock).runAllProcessorsAndPublishToSinks(); + private ZeroBuffer> createObjectUnderTest() { + ZeroBuffer> zeroBuffer = createObjectUnderTestWithPipelineName(); + zeroBuffer.setPipelineRunner(pipelineRunner); return zeroBuffer; } - private ZeroBuffer> initializeZeroBufferWithPipelineName() { - PipelineDescription pipelineDescription = mock(PipelineDescription.class); + private ZeroBuffer> createObjectUnderTestWithPipelineName() { when(pipelineDescription.getPipelineName()).thenReturn(MOCK_PIPELINE_NAME); return new ZeroBuffer<>(pipelineDescription); }