Skip to content

Commit

Permalink
fix(store): ignore corrupt operation when recovering (#579)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Nov 6, 2023
1 parent 9ce85d9 commit a0460fa
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,9 @@ public static MessageExt convertTo(FlatMessageExt flatMessage, String topicName,
ByteBuffer payloadBuffer = flatMessage.message().payloadAsByteBuffer();

// Convert buffer to byte array
if (payloadBuffer.hasArray()) {
messageExt.setBody(payloadBuffer.array());
} else {
byte[] payload = new byte[payloadBuffer.remaining()];
payloadBuffer.get(payload);
messageExt.setBody(payload);
}
byte[] payload = new byte[payloadBuffer.remaining()];
payloadBuffer.get(payloadBuffer.position(), payload);
messageExt.setBody(payload);

SystemProperties systemProperties = flatMessage.message().systemProperties();
messageExt.setBornTimestamp(systemProperties.bornTimestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,15 @@ public CompletableFuture<Void> recover(MessageStateMachine stateMachine, long op
.thenAccept(result -> {
// load operations
for (RecordBatchWithContext batchWithContext : result.recordBatchList()) {
// TODO: assume that a batch only contains one operation
Operation operation = SerializeUtil.decodeOperation(batchWithContext.rawPayload(), stateMachine,
operationStreamId, snapshotStreamId);
try {
// TODO: assume that a batch only contains one operation
Operation operation = SerializeUtil.decodeOperation(batchWithContext.rawPayload(), stateMachine,
operationStreamId, snapshotStreamId);

// TODO: operation may be null
replay(batchWithContext.baseOffset(), operation);
} catch (StoreException e) {
LOGGER.error("Topic {}, queue: {}: Replay operation: {} failed when recover", stateMachine.topicId(), stateMachine.queueId(), operation, e);
LOGGER.error("Topic {}, queue: {}, operation stream id: {}, offset: {}: replay operation failed when recover", stateMachine.topicId(), stateMachine.queueId(), operationStreamId, batchWithContext.baseOffset(), e);
if (e.code() != StoreErrorCode.ILLEGAL_ARGUMENT) {
throw new CompletionException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package com.automq.rocketmq.store.util;

import com.automq.rocketmq.store.api.MessageStateMachine;
import com.automq.rocketmq.store.exception.StoreErrorCode;
import com.automq.rocketmq.store.exception.StoreException;
import com.automq.rocketmq.store.model.generated.CheckPoint;
import com.automq.rocketmq.store.model.generated.OperationLogItem;
import com.automq.rocketmq.store.model.generated.ReceiptHandle;
Expand Down Expand Up @@ -135,8 +137,8 @@ public static byte[] encodePopOperation(PopOperation popOperation) {
return builder.sizedByteArray();
}

public static Operation decodeOperation(ByteBuffer buffer,
MessageStateMachine stateMachine, long operationStreamId, long snapshotStreamId) {
public static Operation decodeOperation(ByteBuffer buffer, MessageStateMachine stateMachine, long operationStreamId,
long snapshotStreamId) throws StoreException {
OperationLogItem operationLogItem = OperationLogItem.getRootAsOperationLogItem(buffer);

switch (operationLogItem.operationType()) {
Expand Down Expand Up @@ -168,7 +170,7 @@ public static Operation decodeOperation(ByteBuffer buffer,
resetConsumeOffsetOperation.operationTimestamp());
}
default ->
throw new IllegalStateException("Unexpected operation type: " + operationLogItem.operationType());
throw new StoreException(StoreErrorCode.ILLEGAL_ARGUMENT, "Unexpected operation type: " + operationLogItem.operationType());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.automq.rocketmq.store.util;

import com.automq.rocketmq.store.exception.StoreException;
import com.automq.rocketmq.store.model.generated.CheckPoint;
import com.automq.rocketmq.store.model.generated.ReceiptHandle;
import com.automq.rocketmq.store.model.operation.AckOperation.AckOperationType;
Expand Down Expand Up @@ -112,7 +113,7 @@ void decodeReceiptHandle() {
}

@Test
void encodePopOperation() {
void encodePopOperation() throws StoreException {
com.automq.rocketmq.store.model.operation.PopOperation popOperation = new com.automq.rocketmq.store.model.operation.PopOperation(
TOPIC_ID, QUEUE_ID, OPERATION_STREAM_ID, SNAPSHOT_STREAM_ID, null, CONSUMER_GROUP_ID, OFFSET,
BATCH_SIZE, INVISIBLE_DURATION, OPERATION_TIMESTAMP, IS_END_MARK, POP_OPERATION_TYPE
Expand All @@ -123,7 +124,7 @@ void encodePopOperation() {
}

@Test
void encodeAckOperation() {
void encodeAckOperation() throws StoreException {
com.automq.rocketmq.store.model.operation.AckOperation ackOperation = new com.automq.rocketmq.store.model.operation.AckOperation(
TOPIC_ID, QUEUE_ID, OPERATION_STREAM_ID, SNAPSHOT_STREAM_ID, null, CONSUMER_GROUP_ID,
OPERATION_ID, OPERATION_TIMESTAMP, ACK_OPERATION_TYPE
Expand All @@ -134,7 +135,7 @@ void encodeAckOperation() {
}

@Test
void encodeChangeInvisibleDurationOperation() {
void encodeChangeInvisibleDurationOperation() throws StoreException {
ChangeInvisibleDurationOperation changeInvisibleDurationOperation = new ChangeInvisibleDurationOperation(
TOPIC_ID, QUEUE_ID, OPERATION_STREAM_ID, SNAPSHOT_STREAM_ID, null, CONSUMER_GROUP_ID, OPERATION_ID, INVISIBLE_DURATION, OPERATION_TIMESTAMP
);
Expand All @@ -144,7 +145,7 @@ void encodeChangeInvisibleDurationOperation() {
}

@Test
void encodeResetConsumerOffset() {
void encodeResetConsumerOffset() throws StoreException {
ResetConsumeOffsetOperation resetConsumeOffsetOperation = new ResetConsumeOffsetOperation(
TOPIC_ID, QUEUE_ID, OPERATION_STREAM_ID, SNAPSHOT_STREAM_ID, null, CONSUMER_GROUP_ID, 0, OPERATION_TIMESTAMP
);
Expand Down

0 comments on commit a0460fa

Please sign in to comment.