Skip to content

Commit

Permalink
feat: support for unified date management of metadata (#484)
Browse files Browse the repository at this point in the history
Signed-off-by: wangxye <[email protected]>
  • Loading branch information
wangxye authored Oct 27, 2023
1 parent 8381020 commit db1af30
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@
public class S3Constants {

public static final int NOOP_OBJECT_ID = -1;

public static final int NOOP_OBJECT_COMMIT_TIMESTAMP = 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,10 @@ public CompletableFuture<Void> commitWalObject(S3WALObject walObject,

if (!s3WalObjects.isEmpty()) {
// update dataTs to the min compacted object's dataTs
dataTs = s3WalObjects.stream().mapToLong(S3WalObject::getBaseDataTimestamp).min().getAsLong();
dataTs = s3WalObjects.stream()
.map(S3WalObject::getBaseDataTimestamp)
.map(Date::getTime)
.min(Long::compareTo).get();
// update sequenceId to the min compacted object's sequenceId
sequenceId = s3WalObjects.stream().mapToLong(S3WalObject::getSequenceId).min().getAsLong();
}
Expand All @@ -222,9 +225,9 @@ public CompletableFuture<Void> commitWalObject(S3WALObject walObject,
com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = new com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject();
object.setStreamId(s3StreamObject.getStreamId());
object.setObjectId(s3StreamObject.getObjectId());
object.setCommittedTimestamp(System.currentTimeMillis());
object.setCommittedTimestamp(new Date());
object.setStartOffset(s3StreamObject.getStartOffset());
object.setBaseDataTimestamp(s3StreamObject.getBaseDataTimestamp());
object.setBaseDataTimestamp(new Date(s3StreamObject.getBaseDataTimestamp()));
object.setEndOffset(s3StreamObject.getEndOffset());
object.setObjectSize(s3StreamObject.getObjectSize());
s3StreamObjectMapper.commit(object);
Expand All @@ -242,8 +245,8 @@ public CompletableFuture<Void> commitWalObject(S3WALObject walObject,
S3WalObject s3WALObject = new S3WalObject();
s3WALObject.setObjectId(objectId);
s3WALObject.setObjectSize(walObject.getObjectSize());
s3WALObject.setBaseDataTimestamp(dataTs);
s3WALObject.setCommittedTimestamp(System.currentTimeMillis());
s3WALObject.setBaseDataTimestamp(new Date(dataTs));
s3WALObject.setCommittedTimestamp(new Date());
s3WALObject.setNodeId(brokerId);
s3WALObject.setSequenceId(sequenceId);
s3WALObject.setSubStreams(gson.toJson(walObject.getSubStreamsMap()));
Expand Down Expand Up @@ -320,7 +323,7 @@ public CompletableFuture<Void> commitStreamObject(apache.rocketmq.controller.v1.

// update dataTs to the min compacted object's dataTs
com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject s3StreamObject = s3StreamObjectMapper.getByObjectId(id);
return s3StreamObject.getBaseDataTimestamp();
return s3StreamObject.getBaseDataTimestamp().getTime();
})
.min(Long::compareTo).get();
}
Expand All @@ -332,8 +335,8 @@ public CompletableFuture<Void> commitStreamObject(apache.rocketmq.controller.v1.
newS3StreamObj.setObjectSize(streamObject.getObjectSize());
newS3StreamObj.setStartOffset(streamObject.getStartOffset());
newS3StreamObj.setEndOffset(streamObject.getEndOffset());
newS3StreamObj.setBaseDataTimestamp(dataTs);
newS3StreamObj.setCommittedTimestamp(committedTs);
newS3StreamObj.setBaseDataTimestamp(new Date(dataTs));
newS3StreamObj.setCommittedTimestamp(new Date(committedTs));
s3StreamObjectMapper.create(newS3StreamObj);
}

Expand Down Expand Up @@ -441,21 +444,26 @@ public CompletableFuture<List<apache.rocketmq.controller.v1.S3StreamObject>> lis
try (SqlSession session = metadataStore.openSession()) {
S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class);
List<apache.rocketmq.controller.v1.S3StreamObject> streamObjects = s3StreamObjectMapper.list(null, streamId, startOffset, endOffset, limit).stream()
.map(streamObject -> apache.rocketmq.controller.v1.S3StreamObject.newBuilder()
.setStreamId(streamObject.getStreamId())
.setObjectSize(streamObject.getObjectSize())
.setObjectId(streamObject.getObjectId())
.setStartOffset(streamObject.getStartOffset())
.setEndOffset(streamObject.getEndOffset())
.setBaseDataTimestamp(streamObject.getBaseDataTimestamp())
.setCommittedTimestamp(streamObject.getCommittedTimestamp())
.build())
.map(this::buildS3StreamObject)
.toList();
future.complete(streamObjects);
}
return future;
}

private S3StreamObject buildS3StreamObject(
com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject originalObject) {
return S3StreamObject.newBuilder()
.setStreamId(originalObject.getStreamId())
.setObjectSize(originalObject.getObjectSize())
.setObjectId(originalObject.getObjectId())
.setStartOffset(originalObject.getStartOffset())
.setEndOffset(originalObject.getEndOffset())
.setBaseDataTimestamp(originalObject.getBaseDataTimestamp().getTime())
.setCommittedTimestamp(originalObject.getCommittedTimestamp() != null ? originalObject.getCommittedTimestamp().getTime() : S3Constants.NOOP_OBJECT_COMMIT_TIMESTAMP)
.build();
}

private S3WALObject buildS3WALObject(
S3WalObject originalObject,
Map<Long, SubStream> subStreams) {
Expand All @@ -464,8 +472,8 @@ private S3WALObject buildS3WALObject(
.setObjectSize(originalObject.getObjectSize())
.setBrokerId(originalObject.getNodeId())
.setSequenceId(originalObject.getSequenceId())
.setBaseDataTimestamp(originalObject.getBaseDataTimestamp())
.setCommittedTimestamp(originalObject.getCommittedTimestamp())
.setBaseDataTimestamp(originalObject.getBaseDataTimestamp().getTime())
.setCommittedTimestamp(originalObject.getCommittedTimestamp() != null ? originalObject.getCommittedTimestamp().getTime() : S3Constants.NOOP_OBJECT_COMMIT_TIMESTAMP)
.putAllSubStreams(subStreams)
.build();
}
Expand Down Expand Up @@ -544,15 +552,7 @@ public CompletableFuture<Pair<List<S3StreamObject>, List<S3WALObject>>> listObje
S3WalObjectMapper s3WalObjectMapper = session.getMapper(S3WalObjectMapper.class);
List<apache.rocketmq.controller.v1.S3StreamObject> s3StreamObjects = s3StreamObjectMapper.list(null, streamId, startOffset, endOffset, limit)
.stream()
.map(streamObject -> apache.rocketmq.controller.v1.S3StreamObject.newBuilder()
.setStreamId(streamObject.getStreamId())
.setObjectSize(streamObject.getObjectSize())
.setObjectId(streamObject.getObjectId())
.setStartOffset(streamObject.getStartOffset())
.setEndOffset(streamObject.getEndOffset())
.setBaseDataTimestamp(streamObject.getBaseDataTimestamp())
.setCommittedTimestamp(streamObject.getCommittedTimestamp())
.build())
.map(this::buildS3StreamObject)
.toList();

List<S3WALObject> walObjects = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.automq.rocketmq.controller.metadata.database.dao;

import java.util.Date;
import java.util.Objects;

public class S3StreamObject {
Expand All @@ -34,11 +35,11 @@ public class S3StreamObject {

private Long objectSize;

private long baseDataTimestamp;
private Date baseDataTimestamp;

private long committedTimestamp;
private Date committedTimestamp;

private long createdTimestamp = System.currentTimeMillis();
private Date createdTimestamp = new Date();

public Long getId() {
return id;
Expand Down Expand Up @@ -88,45 +89,40 @@ public void setObjectSize(Long objectSize) {
this.objectSize = objectSize;
}

public long getBaseDataTimestamp() {
public Date getBaseDataTimestamp() {
return baseDataTimestamp;
}

public void setBaseDataTimestamp(long baseDataTimestamp) {
public void setBaseDataTimestamp(Date baseDataTimestamp) {
this.baseDataTimestamp = baseDataTimestamp;
}

public long getCommittedTimestamp() {
public Date getCommittedTimestamp() {
return committedTimestamp;
}

public void setCommittedTimestamp(long committedTimestamp) {
public void setCommittedTimestamp(Date committedTimestamp) {
this.committedTimestamp = committedTimestamp;
}

public long getCreatedTimestamp() {
public Date getCreatedTimestamp() {
return createdTimestamp;
}

public void setCreatedTimestamp(long createdTimestamp) {
public void setCreatedTimestamp(Date createdTimestamp) {
this.createdTimestamp = createdTimestamp;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
S3StreamObject object = (S3StreamObject) o;
return baseDataTimestamp == object.baseDataTimestamp && committedTimestamp == object.committedTimestamp
&& Objects.equals(id, object.id) && Objects.equals(objectId, object.objectId)
&& Objects.equals(streamId, object.streamId) && Objects.equals(startOffset, object.startOffset)
&& Objects.equals(endOffset, object.endOffset) && Objects.equals(objectSize, object.objectSize);
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
S3StreamObject that = (S3StreamObject) o;
return Objects.equals(id, that.id) && Objects.equals(objectId, that.objectId) && Objects.equals(streamId, that.streamId) && Objects.equals(startOffset, that.startOffset) && Objects.equals(endOffset, that.endOffset) && Objects.equals(objectSize, that.objectSize) && Objects.equals(baseDataTimestamp, that.baseDataTimestamp) && Objects.equals(committedTimestamp, that.committedTimestamp) && Objects.equals(createdTimestamp, that.createdTimestamp);
}

@Override
public int hashCode() {
return Objects.hash(id, objectId, streamId, startOffset, endOffset, objectSize, baseDataTimestamp, committedTimestamp);
return Objects.hash(id, objectId, streamId, startOffset, endOffset, objectSize, baseDataTimestamp, committedTimestamp, createdTimestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.automq.rocketmq.controller.metadata.database.dao;

import java.util.Date;
import java.util.Objects;

public class S3WalObject {
Expand All @@ -32,11 +33,11 @@ public class S3WalObject {

String subStreams;

long baseDataTimestamp;
Date baseDataTimestamp;

long committedTimestamp;
Date committedTimestamp;

long createdTimestamp = System.currentTimeMillis();
Date createdTimestamp = new Date();

public Long getObjectId() {
return objectId;
Expand Down Expand Up @@ -78,45 +79,40 @@ public void setSubStreams(String subStreams) {
this.subStreams = subStreams;
}

public long getBaseDataTimestamp() {
public Date getBaseDataTimestamp() {
return baseDataTimestamp;
}

public void setBaseDataTimestamp(long baseDataTimestamp) {
public void setBaseDataTimestamp(Date baseDataTimestamp) {
this.baseDataTimestamp = baseDataTimestamp;
}

public long getCommittedTimestamp() {
public Date getCommittedTimestamp() {
return committedTimestamp;
}

public void setCommittedTimestamp(long committedTimestamp) {
public void setCommittedTimestamp(Date committedTimestamp) {
this.committedTimestamp = committedTimestamp;
}

public long getCreatedTimestamp() {
public Date getCreatedTimestamp() {
return createdTimestamp;
}

public void setCreatedTimestamp(long createdTimestamp) {
public void setCreatedTimestamp(Date createdTimestamp) {
this.createdTimestamp = createdTimestamp;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
S3WalObject object = (S3WalObject) o;
return baseDataTimestamp == object.baseDataTimestamp && committedTimestamp == object.committedTimestamp
&& objectId.equals(object.objectId) && nodeId.equals(object.nodeId)
&& objectSize.equals(object.objectSize) && sequenceId.equals(object.sequenceId)
&& Objects.equals(subStreams, object.subStreams);
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
S3WalObject that = (S3WalObject) o;
return Objects.equals(objectId, that.objectId) && Objects.equals(nodeId, that.nodeId) && Objects.equals(objectSize, that.objectSize) && Objects.equals(sequenceId, that.sequenceId) && Objects.equals(subStreams, that.subStreams) && Objects.equals(baseDataTimestamp, that.baseDataTimestamp) && Objects.equals(committedTimestamp, that.committedTimestamp) && Objects.equals(createdTimestamp, that.createdTimestamp);
}

@Override
public int hashCode() {
return Objects.hash(objectId, nodeId, objectSize, sequenceId, subStreams, baseDataTimestamp, committedTimestamp);
return Objects.hash(objectId, nodeId, objectSize, sequenceId, subStreams, baseDataTimestamp, committedTimestamp, createdTimestamp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject;
import org.apache.ibatis.annotations.Param;

import java.util.Date;
import java.util.List;

public interface S3StreamObjectMapper {
Expand Down Expand Up @@ -48,5 +49,5 @@ List<S3StreamObject> list(@Param("objectId") Long objectId,

int batchDelete(@Param("objectIds") List<Long> objectIds);

List<Long> recyclable(@Param("streamIds") List<Long> streamIds, @Param("threshold") long threshold);
List<Long> recyclable(@Param("streamIds") List<Long> streamIds, @Param("threshold") Date threshold);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.automq.rocketmq.controller.metadata.database.mapper.TopicMapper;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import org.apache.ibatis.session.SqlSession;

Expand Down Expand Up @@ -55,7 +56,7 @@ public void process() throws ControllerException {
for (Topic topic : topics) {
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.HOUR, -topic.getRetentionHours());
long threshold = calendar.getTimeInMillis();
Date threshold = calendar.getTime();
List<Long> streamIds = streamMapper.list(topic.getId(), null, null)
.stream()
.map(Stream::getId)
Expand Down
12 changes: 6 additions & 6 deletions controller/src/main/resources/ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ CREATE TABLE IF NOT EXISTS s3streamobject
end_offset BIGINT NOT NULL,
object_id BIGINT NOT NULL,
object_size BIGINT NOT NULL,
base_data_timestamp BIGINT,
committed_timestamp BIGINT,
created_timestamp BIGINT,
base_data_timestamp TIMESTAMP(3),
committed_timestamp TIMESTAMP(3),
created_timestamp TIMESTAMP(3),
UNIQUE INDEX uk_s3_stream_object_object_id (object_id),
INDEX idx_s3_stream_object_stream_id (stream_id, start_offset)
);
Expand All @@ -163,9 +163,9 @@ CREATE TABLE IF NOT EXISTS s3walobject
node_id INT NOT NULL,
sequence_id BIGINT NOT NULL,
sub_streams LONGTEXT NOT NULL, -- immutable
base_data_timestamp BIGINT,
committed_timestamp BIGINT,
created_timestamp BIGINT,
base_data_timestamp TIMESTAMP(3),
committed_timestamp TIMESTAMP(3),
created_timestamp TIMESTAMP(3),
UNIQUE INDEX uk_s3_wal_object_node_sequence_id (node_id, sequence_id),
INDEX idx_s3_wal_object_object_id (object_id)
);
Expand Down
Loading

0 comments on commit db1af30

Please sign in to comment.