Skip to content

Commit

Permalink
Modified ZeroBufferTests to use MockitoExtension and addressed comments
Browse files Browse the repository at this point in the history
Signed-off-by: Mohammed Aghil Puthiyottil <[email protected]>
  • Loading branch information
MohammedAghil committed Feb 7, 2025
1 parent 4488fb8 commit 07e2047
Showing 1 changed file with 49 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.core.pipeline.PipelineRunner;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
Expand All @@ -28,13 +33,19 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@ExtendWith(MockitoExtension.class)
public class ZeroBufferTests {
private static final Logger LOG = LoggerFactory.getLogger(ZeroBufferTests.class);
private static final String MOCK_PIPELINE_NAME = "mock-pipeline";
private static final int WRITE_TIMEOUT = 100;
private static final int READ_TIMEOUT = 500;
private static final String SINGLE_RECORD_DATA_FORMAT = "{\"message\":\"test\"}";
private static final String BATCH_RECORDS_DATA_FORMAT = "{\"message\":\"test-%d\"}";
@Mock
PipelineDescription pipelineDescription;
@Mock
PipelineRunner pipelineRunner;


@BeforeEach
public void setup() {
Expand All @@ -49,7 +60,8 @@ public void setup() {
class WriteTests {
@Test
public void testSingleWriteAndReadReturnsCorrectRecord() throws Exception {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();
ZeroBuffer<Record<String>> zeroBuffer = createObjectUnderTest();
doNothing().when(pipelineRunner).runAllProcessorsAndPublishToSinks();

zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT);

Expand All @@ -61,11 +73,13 @@ public void testSingleWriteAndReadReturnsCorrectRecord() throws Exception {
zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT);
readRecords = zeroBuffer.read(READ_TIMEOUT).getKey();
assertEquals(1, readRecords.size());
verify(pipelineRunner, times(2)).runAllProcessorsAndPublishToSinks();
}

@Test
public void testMultipleWriteAndReadReturnsCorrectRecord() throws Exception {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();
ZeroBuffer<Record<String>> zeroBuffer = createObjectUnderTest();
doNothing().when(pipelineRunner).runAllProcessorsAndPublishToSinks();

zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT);
zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT);
Expand All @@ -74,11 +88,13 @@ public void testMultipleWriteAndReadReturnsCorrectRecord() throws Exception {
assertEquals(2, readRecords.size());
assertEquals(SINGLE_RECORD_DATA_FORMAT, readRecords.iterator().next().getData());
assertEquals(SINGLE_RECORD_DATA_FORMAT, readRecords.iterator().next().getData());
verify(pipelineRunner, times(2)).runAllProcessorsAndPublishToSinks();
}

@Test
public void testWriteAllAndReadReturnsAllRecords() throws Exception {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();
ZeroBuffer<Record<String>> zeroBuffer = createObjectUnderTest();
doNothing().when(pipelineRunner).runAllProcessorsAndPublishToSinks();

Collection<Record<String>> writeRecords = generateRecords(IntStream.range(0, 10)
.mapToObj(i -> String.format(BATCH_RECORDS_DATA_FORMAT, i))
Expand All @@ -93,11 +109,12 @@ public void testWriteAllAndReadReturnsAllRecords() throws Exception {

// Ensure that the write records are the same as the read records
assertEquals(writeRecords.size(), readRecords.size());
verify(pipelineRunner).runAllProcessorsAndPublishToSinks();
}

@Test
public void testWriteNullRecordThrowsException() {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();
ZeroBuffer<Record<String>> zeroBuffer = createObjectUnderTest();

Exception writeException = assertThrows(NullPointerException.class, () -> {
zeroBuffer.write(null, WRITE_TIMEOUT);
Expand All @@ -109,29 +126,33 @@ public void testWriteNullRecordThrowsException() {

assertEquals("The write record cannot be null", writeException.getMessage());
assertEquals("The write records cannot be null", writeAllException.getMessage());
verify(pipelineRunner, never()).runAllProcessorsAndPublishToSinks();
}

@Test
public void testWriteEmptyRecordDoesNotThrowException() {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();
ZeroBuffer<Record<String>> zeroBuffer = createObjectUnderTest();
doNothing().when(pipelineRunner).runAllProcessorsAndPublishToSinks();

Record<String> emptyRecord = generateRecord(null);
Collection<Record<String>> emptyRecordCollection = generateRecords(new ArrayList<>());

assertDoesNotThrow(() -> zeroBuffer.write(emptyRecord, WRITE_TIMEOUT));
assertDoesNotThrow(() -> zeroBuffer.writeAll(emptyRecordCollection, WRITE_TIMEOUT));
verify(pipelineRunner, times(2)).runAllProcessorsAndPublishToSinks();
}

@Test
public void testThreadReadAndWriteIsolation() throws Exception {
final ZeroBuffer<Record<String>> zeroBuffer = initializeZeroBufferWithPipelineName();
final ZeroBuffer<Record<String>> zeroBuffer = createObjectUnderTestWithPipelineName();

Thread workerThread = new Thread(() -> {
try {
PipelineRunner pipelineRunnerMock = mock(PipelineRunner.class);
zeroBuffer.setPipelineRunner(pipelineRunnerMock);
doNothing().when(pipelineRunnerMock).runAllProcessorsAndPublishToSinks();
zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT);
verify(pipelineRunnerMock).runAllProcessorsAndPublishToSinks();
} catch (TimeoutException e) {
fail("Timeout exception occurred");
}
Expand All @@ -142,11 +163,13 @@ public void testThreadReadAndWriteIsolation() throws Exception {
// Ensure that main thread does not share the same records store as the worker thread
assertEquals(0, zeroBuffer.read(READ_TIMEOUT).getKey().size());
assertTrue(zeroBuffer.isEmpty());
verify(pipelineRunner, never()).runAllProcessorsAndPublishToSinks();
}

@Test
public void testWriteAndWriteAllReturnsCorrectRecords() throws Exception {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();
ZeroBuffer<Record<String>> zeroBuffer = createObjectUnderTest();
doNothing().when(pipelineRunner).runAllProcessorsAndPublishToSinks();

zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT);
zeroBuffer.writeAll(generateRecords(IntStream.range(0, 10)
Expand All @@ -158,14 +181,15 @@ public void testWriteAndWriteAllReturnsCorrectRecords() throws Exception {
LOG.debug(record.getData());
}
assertEquals(11, readRecords.size());
verify(pipelineRunner, times(2)).runAllProcessorsAndPublishToSinks();
}
}

@Nested
class ReadTests {
@Test
public void testReadFromNonEmptyBufferReturnsCorrectRecords() throws Exception {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();
ZeroBuffer<Record<String>> zeroBuffer = createObjectUnderTest();

zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT);

Expand All @@ -176,46 +200,50 @@ public void testReadFromNonEmptyBufferReturnsCorrectRecords() throws Exception {
assertEquals(SINGLE_RECORD_DATA_FORMAT, initialReadRecords.iterator().next().getData());

assertEquals(0, secondAttemptToReadRecords.size());
verify(pipelineRunner).runAllProcessorsAndPublishToSinks();
}

@Test
public void testReadFromEmptyBufferReturnsNoRecords() {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();
ZeroBuffer<Record<String>> zeroBuffer = createObjectUnderTest();

Map.Entry<Collection<Record<String>>, CheckpointState> readRecordsMap = zeroBuffer.read(READ_TIMEOUT);
assertTrue(readRecordsMap.getKey().isEmpty());
verify(pipelineRunner, never()).runAllProcessorsAndPublishToSinks();
}
}

@Nested
class EmptyBufferTests {
@Test
public void testIsEmptyReturnsTrueWhenBufferIsEmpty() {
ZeroBuffer<Record<String>> zeroBuffer = initializeZeroBufferWithPipelineName();
ZeroBuffer<Record<String>> zeroBuffer = createObjectUnderTestWithPipelineName();
assertTrue(zeroBuffer.isEmpty());
verify(pipelineRunner, never()).runAllProcessorsAndPublishToSinks();
}

@Test
public void testIsEmptyReturnsFalseWhenBufferIsNotEmpty() throws Exception {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();
ZeroBuffer<Record<String>> zeroBuffer = createObjectUnderTest();

zeroBuffer.write(generateRecord(SINGLE_RECORD_DATA_FORMAT), WRITE_TIMEOUT);

assertFalse(zeroBuffer.isEmpty());
verify(pipelineRunner).runAllProcessorsAndPublishToSinks();
}
}

@Nested
class CommonTests {
@Test
public void testCreateZeroBufferWithPipelineName() {
ZeroBuffer<Record<String>> zeroBuffer = initializeZeroBufferWithPipelineName();
ZeroBuffer<Record<String>> zeroBuffer = createObjectUnderTestWithPipelineName();
assertEquals(MOCK_PIPELINE_NAME, zeroBuffer.pipelineName);
}

@Test
public void testCheckpointDoesNotThrowException() {
ZeroBuffer<Record<String>> zeroBuffer = setupAndInitializeZeroBuffer();
ZeroBuffer<Record<String>> zeroBuffer = createObjectUnderTest();
assertDoesNotThrow(() -> zeroBuffer.checkpoint(null));
assertDoesNotThrow(() -> zeroBuffer.checkpoint(new CheckpointState(0)));
}
Expand All @@ -235,16 +263,13 @@ private <T> Collection<Record<T>> generateRecords(Collection<T> data) {
return records;
}

private <T> ZeroBuffer<Record<T>> setupAndInitializeZeroBuffer() {
ZeroBuffer<Record<T>> zeroBuffer = initializeZeroBufferWithPipelineName();
PipelineRunner pipelineRunnerMock = mock(PipelineRunner.class);
zeroBuffer.setPipelineRunner(pipelineRunnerMock);
doNothing().when(pipelineRunnerMock).runAllProcessorsAndPublishToSinks();
private <T> ZeroBuffer<Record<T>> createObjectUnderTest() {
ZeroBuffer<Record<T>> zeroBuffer = createObjectUnderTestWithPipelineName();
zeroBuffer.setPipelineRunner(pipelineRunner);
return zeroBuffer;
}

private <T> ZeroBuffer<Record<T>> initializeZeroBufferWithPipelineName() {
PipelineDescription pipelineDescription = mock(PipelineDescription.class);
private <T> ZeroBuffer<Record<T>> createObjectUnderTestWithPipelineName() {
when(pipelineDescription.getPipelineName()).thenReturn(MOCK_PIPELINE_NAME);
return new ZeroBuffer<>(pipelineDescription);
}
Expand Down

0 comments on commit 07e2047

Please sign in to comment.