diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java index db5ba85d7ba8..411352813396 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; @@ -115,7 +116,23 @@ public synchronized void close() { events.clear(); } - public void decreaseEventsReferenceCount(final String holderMessage, final boolean shouldReport) { + /** + * Discard all events of the given pipe. This method only clears the reference count of the events + * and discard them, but do not modify other objects (such as buffers) for simplicity. + */ + public synchronized void discardEventsOfPipe(final String pipeNameToDrop) { + events.removeIf( + event -> { + if (pipeNameToDrop.equals(event.getPipeName())) { + event.clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName()); + return true; + } + return false; + }); + } + + public synchronized void decreaseEventsReferenceCount( + final String holderMessage, final boolean shouldReport) { events.forEach(event -> event.decreaseReferenceCount(holderMessage, shouldReport)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java index f28f19e4da11..77fd73f8abab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java @@ -184,6 +184,11 @@ public boolean isEmpty() { && endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty); } + public synchronized void discardEventsOfPipe(final String pipeNameToDrop) { + defaultBatch.discardEventsOfPipe(pipeNameToDrop); + endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop)); + } + @Override public synchronized void close() { defaultBatch.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 43c5fa6eff2d..3f37b2478697 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -515,11 +515,9 @@ public synchronized void clearRetryEventsReferenceCount() { //////////////////////////// Operations for close //////////////////////////// - /** - * When a pipe is dropped, the connector maybe reused and will not be closed. So we just discard - * its queued events in the output pipe connector. - */ + @Override public synchronized void discardEventsOfPipe(final String pipeNameToDrop) { + tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop); retryEventQueue.removeIf( event -> { if (event instanceof EnrichedEvent diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java index fca40f21d647..a92790c5e805 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java @@ -492,6 +492,11 @@ private void doTransfer( LOGGER.info("Successfully transferred file {}.", tsFile); } + @Override + public synchronized void discardEventsOfPipe(final String pipeNameToDrop) { + tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop); + } + @Override public void close() { if (tabletBatchBuilder != null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java index cc78ebdea54c..a832d43f73e3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue; import org.apache.iotdb.commons.pipe.task.subtask.PipeAbstractConnectorSubtask; @@ -268,8 +269,8 @@ public void discardEventsOfPipe(final String pipeNameToDrop) { } } - if (outputPipeConnector instanceof IoTDBDataRegionAsyncConnector) { - ((IoTDBDataRegionAsyncConnector) outputPipeConnector).discardEventsOfPipe(pipeNameToDrop); + if (outputPipeConnector instanceof IoTDBConnector) { + ((IoTDBConnector) outputPipeConnector).discardEventsOfPipe(pipeNameToDrop); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java index a28a0289b08d..e3d4d35a1711 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java @@ -89,7 +89,7 @@ public synchronized void register() { * the {@link PipeConnectorSubtask} should never be used again * @throws IllegalStateException if {@link PipeConnectorSubtaskLifeCycle#registeredTaskCount} <= 0 */ - public synchronized boolean deregister(String pipeNameToDeregister) { + public synchronized boolean deregister(final String pipeNameToDeregister) { if (registeredTaskCount <= 0) { throw new IllegalStateException("registeredTaskCount <= 0"); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java index 9dc08f934026..55231032e5aa 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java @@ -452,6 +452,14 @@ public void rateLimitIfNeeded( GLOBAL_RATE_LIMITER.acquire(bytesLength); } + /** + * When a pipe is dropped, the connector maybe reused and will not be closed. We need to discard + * its batched or queued events in the output pipe connector. + */ + public synchronized void discardEventsOfPipe(final String pipeName) { + // Do nothing by default + } + public PipeReceiverStatusHandler statusHandler() { return receiverStatusHandler; }