Skip to content

Commit

Permalink
refactor(store): refactor MessageStateMachine (#849)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Dec 21, 2023
1 parent fd9dbe3 commit 78dce84
Show file tree
Hide file tree
Showing 14 changed files with 187 additions and 365 deletions.
3 changes: 0 additions & 3 deletions store/src/main/fbs/store.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,8 @@ table ConsumerGroupMetadata {
consumer_group_id:long;
consume_offset:long;
ack_offset:long;
ack_bit_map:[byte];
retry_consume_offset:long;
retry_ack_offset:long;
retry_ack_bit_map:[byte];
consume_times:[ConsumeTimes];
// for identify the consumer group metadata version, usually it is the first operation id when the consumer group metadata is created or reset
version: long;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ public interface MessageStateMachine {

int consumeTimes(long consumerGroupId, long offset);

void registerAckOffsetListener(OffsetListener listener);

void registerRetryAckOffsetListener(OffsetListener listener);

class ReplayPopResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,27 @@
package com.automq.rocketmq.store.model.metadata;

import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListMap;

public class ConsumerGroupMetadata {
private final long consumerGroupId;
private long consumeOffset;
private long ackOffset;
private long retryConsumeOffset;
private long retryAckOffset;
private final ConcurrentSkipListMap<Long/*offset*/, Integer/*times*/> consumeTimes;
private final long version;

public ConsumerGroupMetadata(long consumerGroupId) {
this.consumerGroupId = consumerGroupId;
this.consumeTimes = new ConcurrentSkipListMap<>();
this.version = 0;
}

public ConsumerGroupMetadata(long consumerGroupId, long consumeOffset, long ackOffset, long retryConsumeOffset,
long retryAckOffset, ConcurrentSkipListMap<Long, Integer> consumeTimes,
long version) {
long retryAckOffset, long version) {
this.consumerGroupId = consumerGroupId;
this.consumeOffset = consumeOffset;
this.ackOffset = ackOffset;
this.retryConsumeOffset = retryConsumeOffset;
this.retryAckOffset = retryAckOffset;
this.consumeTimes = consumeTimes;
this.version = version;
}

Expand Down Expand Up @@ -71,22 +66,22 @@ public void setConsumeOffset(long consumeOffset) {
this.consumeOffset = consumeOffset;
}

public void setAckOffset(long ackOffset) {
this.ackOffset = ackOffset;
// when ack offset is updated, we should clear the consume times
this.consumeTimes.subMap(0L, ackOffset).clear();
public void advanceAckOffset(long ackOffset) {
if (ackOffset > this.ackOffset) {
this.ackOffset = ackOffset;
}
}

public void setRetryConsumeOffset(long retryConsumeOffset) {
this.retryConsumeOffset = retryConsumeOffset;
}

public void setRetryAckOffset(long retryAckOffset) {
this.retryAckOffset = retryAckOffset;
public void advanceRetryConsumeOffset(long retryConsumeOffset) {
if (retryConsumeOffset > this.retryConsumeOffset) {
this.retryConsumeOffset = retryConsumeOffset;
}
}

public ConcurrentSkipListMap<Long, Integer> getConsumeTimes() {
return consumeTimes;
public void advanceRetryAckOffset(long retryAckOffset) {
if (retryAckOffset > this.retryAckOffset) {
this.retryAckOffset = retryAckOffset;
}
}

public long getVersion() {
Expand All @@ -100,11 +95,11 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass())
return false;
ConsumerGroupMetadata metadata = (ConsumerGroupMetadata) o;
return consumerGroupId == metadata.consumerGroupId && consumeOffset == metadata.consumeOffset && ackOffset == metadata.ackOffset && retryConsumeOffset == metadata.retryConsumeOffset && retryAckOffset == metadata.retryAckOffset && version == metadata.version && Objects.equals(consumeTimes, metadata.consumeTimes);
return consumerGroupId == metadata.consumerGroupId && consumeOffset == metadata.consumeOffset && ackOffset == metadata.ackOffset && retryConsumeOffset == metadata.retryConsumeOffset && retryAckOffset == metadata.retryAckOffset && version == metadata.version;
}

@Override
public int hashCode() {
return Objects.hash(consumerGroupId, consumeOffset, ackOffset, retryConsumeOffset, retryAckOffset, consumeTimes, version);
return Objects.hash(consumerGroupId, consumeOffset, ackOffset, retryConsumeOffset, retryAckOffset, version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,23 @@
import com.automq.rocketmq.store.model.metadata.ConsumerGroupMetadata;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListMap;

public class OperationSnapshot {

private final long snapshotEndOffset;
private List<CheckPoint> checkPoints;
private long kvServiceSnapshotVersion;
private final List<ConsumerGroupMetadataSnapshot> consumerGroupMetadataList;
private final List<ConsumerGroupMetadata> consumerGroupMetadataList;

public OperationSnapshot(long snapshotEndOffset, long kvServiceSnapshotVersion, List<ConsumerGroupMetadataSnapshot> consumerGroupMetadataList) {
public OperationSnapshot(long snapshotEndOffset, long kvServiceSnapshotVersion,
List<ConsumerGroupMetadata> consumerGroupMetadataList) {
this.snapshotEndOffset = snapshotEndOffset;
this.kvServiceSnapshotVersion = kvServiceSnapshotVersion;
this.consumerGroupMetadataList = consumerGroupMetadataList;
}

public OperationSnapshot(long snapshotEndOffset, List<ConsumerGroupMetadataSnapshot> consumerGroupMetadataList, List<CheckPoint> checkPoints) {
public OperationSnapshot(long snapshotEndOffset, List<ConsumerGroupMetadata> consumerGroupMetadataList,
List<CheckPoint> checkPoints) {
this.snapshotEndOffset = snapshotEndOffset;
this.consumerGroupMetadataList = consumerGroupMetadataList;
this.checkPoints = checkPoints;
Expand All @@ -58,7 +59,7 @@ public long getKvServiceSnapshotVersion() {
return kvServiceSnapshotVersion;
}

public List<ConsumerGroupMetadataSnapshot> getConsumerGroupMetadataList() {
public List<ConsumerGroupMetadata> getConsumerGroupMetadataList() {
return consumerGroupMetadataList;
}

Expand All @@ -84,28 +85,4 @@ public String toString() {
'}';
}

public static class ConsumerGroupMetadataSnapshot extends ConsumerGroupMetadata {
private final byte[] ackOffsetBitmapBuffer;
private final byte[] retryAckOffsetBitmapBuffer;

public ConsumerGroupMetadataSnapshot(long consumerGroupId, long consumeOffset, long ackOffset,
long retryConsumeOffset, long retryAckOffset,
byte[] ackOffsetBitmapBuffer, byte[] retryAckOffsetBitmapBuffer,
ConcurrentSkipListMap<Long, Integer> consumeTimes,
long version) {
super(consumerGroupId, consumeOffset, ackOffset, retryConsumeOffset, retryAckOffset, consumeTimes, version);
this.ackOffsetBitmapBuffer = ackOffsetBitmapBuffer;
this.retryAckOffsetBitmapBuffer = retryAckOffsetBitmapBuffer;
}

public byte[] getAckOffsetBitmapBuffer() {
return ackOffsetBitmapBuffer;
}

public byte[] getRetryAckOffsetBitmapBuffer() {
return retryAckOffsetBitmapBuffer;
}

}

}
Loading

0 comments on commit 78dce84

Please sign in to comment.