diff --git a/common/src/main/java/com/automq/rocketmq/common/system/S3Constants.java b/common/src/main/java/com/automq/rocketmq/common/system/S3Constants.java index bfc33a3c9..182202aaf 100644 --- a/common/src/main/java/com/automq/rocketmq/common/system/S3Constants.java +++ b/common/src/main/java/com/automq/rocketmq/common/system/S3Constants.java @@ -21,4 +21,6 @@ public class S3Constants { public static final int NOOP_OBJECT_ID = -1; + + public static final int NOOP_OBJECT_COMMIT_TIMESTAMP = 0; } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/S3MetadataManager.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/S3MetadataManager.java index 5bbc9b100..666c356ba 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/S3MetadataManager.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/S3MetadataManager.java @@ -199,7 +199,10 @@ public CompletableFuture commitWalObject(S3WALObject walObject, if (!s3WalObjects.isEmpty()) { // update dataTs to the min compacted object's dataTs - dataTs = s3WalObjects.stream().mapToLong(S3WalObject::getBaseDataTimestamp).min().getAsLong(); + dataTs = s3WalObjects.stream() + .map(S3WalObject::getBaseDataTimestamp) + .map(Date::getTime) + .min(Long::compareTo).get(); // update sequenceId to the min compacted object's sequenceId sequenceId = s3WalObjects.stream().mapToLong(S3WalObject::getSequenceId).min().getAsLong(); } @@ -222,9 +225,9 @@ public CompletableFuture commitWalObject(S3WALObject walObject, com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = new com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject(); object.setStreamId(s3StreamObject.getStreamId()); object.setObjectId(s3StreamObject.getObjectId()); - object.setCommittedTimestamp(System.currentTimeMillis()); + object.setCommittedTimestamp(new Date()); object.setStartOffset(s3StreamObject.getStartOffset()); - object.setBaseDataTimestamp(s3StreamObject.getBaseDataTimestamp()); + object.setBaseDataTimestamp(new Date(s3StreamObject.getBaseDataTimestamp())); object.setEndOffset(s3StreamObject.getEndOffset()); object.setObjectSize(s3StreamObject.getObjectSize()); s3StreamObjectMapper.commit(object); @@ -242,8 +245,8 @@ public CompletableFuture commitWalObject(S3WALObject walObject, S3WalObject s3WALObject = new S3WalObject(); s3WALObject.setObjectId(objectId); s3WALObject.setObjectSize(walObject.getObjectSize()); - s3WALObject.setBaseDataTimestamp(dataTs); - s3WALObject.setCommittedTimestamp(System.currentTimeMillis()); + s3WALObject.setBaseDataTimestamp(new Date(dataTs)); + s3WALObject.setCommittedTimestamp(new Date()); s3WALObject.setNodeId(brokerId); s3WALObject.setSequenceId(sequenceId); s3WALObject.setSubStreams(gson.toJson(walObject.getSubStreamsMap())); @@ -320,7 +323,7 @@ public CompletableFuture commitStreamObject(apache.rocketmq.controller.v1. // update dataTs to the min compacted object's dataTs com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject s3StreamObject = s3StreamObjectMapper.getByObjectId(id); - return s3StreamObject.getBaseDataTimestamp(); + return s3StreamObject.getBaseDataTimestamp().getTime(); }) .min(Long::compareTo).get(); } @@ -332,8 +335,8 @@ public CompletableFuture commitStreamObject(apache.rocketmq.controller.v1. newS3StreamObj.setObjectSize(streamObject.getObjectSize()); newS3StreamObj.setStartOffset(streamObject.getStartOffset()); newS3StreamObj.setEndOffset(streamObject.getEndOffset()); - newS3StreamObj.setBaseDataTimestamp(dataTs); - newS3StreamObj.setCommittedTimestamp(committedTs); + newS3StreamObj.setBaseDataTimestamp(new Date(dataTs)); + newS3StreamObj.setCommittedTimestamp(new Date(committedTs)); s3StreamObjectMapper.create(newS3StreamObj); } @@ -441,21 +444,26 @@ public CompletableFuture> lis try (SqlSession session = metadataStore.openSession()) { S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); List streamObjects = s3StreamObjectMapper.list(null, streamId, startOffset, endOffset, limit).stream() - .map(streamObject -> apache.rocketmq.controller.v1.S3StreamObject.newBuilder() - .setStreamId(streamObject.getStreamId()) - .setObjectSize(streamObject.getObjectSize()) - .setObjectId(streamObject.getObjectId()) - .setStartOffset(streamObject.getStartOffset()) - .setEndOffset(streamObject.getEndOffset()) - .setBaseDataTimestamp(streamObject.getBaseDataTimestamp()) - .setCommittedTimestamp(streamObject.getCommittedTimestamp()) - .build()) + .map(this::buildS3StreamObject) .toList(); future.complete(streamObjects); } return future; } + private S3StreamObject buildS3StreamObject( + com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject originalObject) { + return S3StreamObject.newBuilder() + .setStreamId(originalObject.getStreamId()) + .setObjectSize(originalObject.getObjectSize()) + .setObjectId(originalObject.getObjectId()) + .setStartOffset(originalObject.getStartOffset()) + .setEndOffset(originalObject.getEndOffset()) + .setBaseDataTimestamp(originalObject.getBaseDataTimestamp().getTime()) + .setCommittedTimestamp(originalObject.getCommittedTimestamp() != null ? originalObject.getCommittedTimestamp().getTime() : S3Constants.NOOP_OBJECT_COMMIT_TIMESTAMP) + .build(); + } + private S3WALObject buildS3WALObject( S3WalObject originalObject, Map subStreams) { @@ -464,8 +472,8 @@ private S3WALObject buildS3WALObject( .setObjectSize(originalObject.getObjectSize()) .setBrokerId(originalObject.getNodeId()) .setSequenceId(originalObject.getSequenceId()) - .setBaseDataTimestamp(originalObject.getBaseDataTimestamp()) - .setCommittedTimestamp(originalObject.getCommittedTimestamp()) + .setBaseDataTimestamp(originalObject.getBaseDataTimestamp().getTime()) + .setCommittedTimestamp(originalObject.getCommittedTimestamp() != null ? originalObject.getCommittedTimestamp().getTime() : S3Constants.NOOP_OBJECT_COMMIT_TIMESTAMP) .putAllSubStreams(subStreams) .build(); } @@ -544,15 +552,7 @@ public CompletableFuture, List>> listObje S3WalObjectMapper s3WalObjectMapper = session.getMapper(S3WalObjectMapper.class); List s3StreamObjects = s3StreamObjectMapper.list(null, streamId, startOffset, endOffset, limit) .stream() - .map(streamObject -> apache.rocketmq.controller.v1.S3StreamObject.newBuilder() - .setStreamId(streamObject.getStreamId()) - .setObjectSize(streamObject.getObjectSize()) - .setObjectId(streamObject.getObjectId()) - .setStartOffset(streamObject.getStartOffset()) - .setEndOffset(streamObject.getEndOffset()) - .setBaseDataTimestamp(streamObject.getBaseDataTimestamp()) - .setCommittedTimestamp(streamObject.getCommittedTimestamp()) - .build()) + .map(this::buildS3StreamObject) .toList(); List walObjects = new ArrayList<>(); diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/dao/S3StreamObject.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/dao/S3StreamObject.java index c9e411eee..524ec0757 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/dao/S3StreamObject.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/dao/S3StreamObject.java @@ -18,6 +18,7 @@ package com.automq.rocketmq.controller.metadata.database.dao; +import java.util.Date; import java.util.Objects; public class S3StreamObject { @@ -34,11 +35,11 @@ public class S3StreamObject { private Long objectSize; - private long baseDataTimestamp; + private Date baseDataTimestamp; - private long committedTimestamp; + private Date committedTimestamp; - private long createdTimestamp = System.currentTimeMillis(); + private Date createdTimestamp = new Date(); public Long getId() { return id; @@ -88,45 +89,40 @@ public void setObjectSize(Long objectSize) { this.objectSize = objectSize; } - public long getBaseDataTimestamp() { + public Date getBaseDataTimestamp() { return baseDataTimestamp; } - public void setBaseDataTimestamp(long baseDataTimestamp) { + public void setBaseDataTimestamp(Date baseDataTimestamp) { this.baseDataTimestamp = baseDataTimestamp; } - public long getCommittedTimestamp() { + public Date getCommittedTimestamp() { return committedTimestamp; } - public void setCommittedTimestamp(long committedTimestamp) { + public void setCommittedTimestamp(Date committedTimestamp) { this.committedTimestamp = committedTimestamp; } - public long getCreatedTimestamp() { + public Date getCreatedTimestamp() { return createdTimestamp; } - public void setCreatedTimestamp(long createdTimestamp) { + public void setCreatedTimestamp(Date createdTimestamp) { this.createdTimestamp = createdTimestamp; } @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - S3StreamObject object = (S3StreamObject) o; - return baseDataTimestamp == object.baseDataTimestamp && committedTimestamp == object.committedTimestamp - && Objects.equals(id, object.id) && Objects.equals(objectId, object.objectId) - && Objects.equals(streamId, object.streamId) && Objects.equals(startOffset, object.startOffset) - && Objects.equals(endOffset, object.endOffset) && Objects.equals(objectSize, object.objectSize); + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + S3StreamObject that = (S3StreamObject) o; + return Objects.equals(id, that.id) && Objects.equals(objectId, that.objectId) && Objects.equals(streamId, that.streamId) && Objects.equals(startOffset, that.startOffset) && Objects.equals(endOffset, that.endOffset) && Objects.equals(objectSize, that.objectSize) && Objects.equals(baseDataTimestamp, that.baseDataTimestamp) && Objects.equals(committedTimestamp, that.committedTimestamp) && Objects.equals(createdTimestamp, that.createdTimestamp); } @Override public int hashCode() { - return Objects.hash(id, objectId, streamId, startOffset, endOffset, objectSize, baseDataTimestamp, committedTimestamp); + return Objects.hash(id, objectId, streamId, startOffset, endOffset, objectSize, baseDataTimestamp, committedTimestamp, createdTimestamp); } } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/dao/S3WalObject.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/dao/S3WalObject.java index 06185e087..bf7d2076e 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/dao/S3WalObject.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/dao/S3WalObject.java @@ -18,6 +18,7 @@ package com.automq.rocketmq.controller.metadata.database.dao; +import java.util.Date; import java.util.Objects; public class S3WalObject { @@ -32,11 +33,11 @@ public class S3WalObject { String subStreams; - long baseDataTimestamp; + Date baseDataTimestamp; - long committedTimestamp; + Date committedTimestamp; - long createdTimestamp = System.currentTimeMillis(); + Date createdTimestamp = new Date(); public Long getObjectId() { return objectId; @@ -78,45 +79,40 @@ public void setSubStreams(String subStreams) { this.subStreams = subStreams; } - public long getBaseDataTimestamp() { + public Date getBaseDataTimestamp() { return baseDataTimestamp; } - public void setBaseDataTimestamp(long baseDataTimestamp) { + public void setBaseDataTimestamp(Date baseDataTimestamp) { this.baseDataTimestamp = baseDataTimestamp; } - public long getCommittedTimestamp() { + public Date getCommittedTimestamp() { return committedTimestamp; } - public void setCommittedTimestamp(long committedTimestamp) { + public void setCommittedTimestamp(Date committedTimestamp) { this.committedTimestamp = committedTimestamp; } - public long getCreatedTimestamp() { + public Date getCreatedTimestamp() { return createdTimestamp; } - public void setCreatedTimestamp(long createdTimestamp) { + public void setCreatedTimestamp(Date createdTimestamp) { this.createdTimestamp = createdTimestamp; } @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - S3WalObject object = (S3WalObject) o; - return baseDataTimestamp == object.baseDataTimestamp && committedTimestamp == object.committedTimestamp - && objectId.equals(object.objectId) && nodeId.equals(object.nodeId) - && objectSize.equals(object.objectSize) && sequenceId.equals(object.sequenceId) - && Objects.equals(subStreams, object.subStreams); + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + S3WalObject that = (S3WalObject) o; + return Objects.equals(objectId, that.objectId) && Objects.equals(nodeId, that.nodeId) && Objects.equals(objectSize, that.objectSize) && Objects.equals(sequenceId, that.sequenceId) && Objects.equals(subStreams, that.subStreams) && Objects.equals(baseDataTimestamp, that.baseDataTimestamp) && Objects.equals(committedTimestamp, that.committedTimestamp) && Objects.equals(createdTimestamp, that.createdTimestamp); } @Override public int hashCode() { - return Objects.hash(objectId, nodeId, objectSize, sequenceId, subStreams, baseDataTimestamp, committedTimestamp); + return Objects.hash(objectId, nodeId, objectSize, sequenceId, subStreams, baseDataTimestamp, committedTimestamp, createdTimestamp); } } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/mapper/S3StreamObjectMapper.java b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/mapper/S3StreamObjectMapper.java index b01102b4d..3993eaaf7 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/mapper/S3StreamObjectMapper.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/metadata/database/mapper/S3StreamObjectMapper.java @@ -20,6 +20,7 @@ import com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject; import org.apache.ibatis.annotations.Param; +import java.util.Date; import java.util.List; public interface S3StreamObjectMapper { @@ -48,5 +49,5 @@ List list(@Param("objectId") Long objectId, int batchDelete(@Param("objectIds") List objectIds); - List recyclable(@Param("streamIds") List streamIds, @Param("threshold") long threshold); + List recyclable(@Param("streamIds") List streamIds, @Param("threshold") Date threshold); } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/tasks/RecycleS3Task.java b/controller/src/main/java/com/automq/rocketmq/controller/tasks/RecycleS3Task.java index 0e2d278f2..7ffc5c5d3 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/tasks/RecycleS3Task.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/tasks/RecycleS3Task.java @@ -28,6 +28,7 @@ import com.automq.rocketmq.controller.metadata.database.mapper.TopicMapper; import java.util.ArrayList; import java.util.Calendar; +import java.util.Date; import java.util.List; import org.apache.ibatis.session.SqlSession; @@ -55,7 +56,7 @@ public void process() throws ControllerException { for (Topic topic : topics) { Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.HOUR, -topic.getRetentionHours()); - long threshold = calendar.getTimeInMillis(); + Date threshold = calendar.getTime(); List streamIds = streamMapper.list(topic.getId(), null, null) .stream() .map(Stream::getId) diff --git a/controller/src/main/resources/ddl.sql b/controller/src/main/resources/ddl.sql index 0edb9b05b..196eba0a4 100644 --- a/controller/src/main/resources/ddl.sql +++ b/controller/src/main/resources/ddl.sql @@ -149,9 +149,9 @@ CREATE TABLE IF NOT EXISTS s3streamobject end_offset BIGINT NOT NULL, object_id BIGINT NOT NULL, object_size BIGINT NOT NULL, - base_data_timestamp BIGINT, - committed_timestamp BIGINT, - created_timestamp BIGINT, + base_data_timestamp TIMESTAMP(3), + committed_timestamp TIMESTAMP(3), + created_timestamp TIMESTAMP(3), UNIQUE INDEX uk_s3_stream_object_object_id (object_id), INDEX idx_s3_stream_object_stream_id (stream_id, start_offset) ); @@ -163,9 +163,9 @@ CREATE TABLE IF NOT EXISTS s3walobject node_id INT NOT NULL, sequence_id BIGINT NOT NULL, sub_streams LONGTEXT NOT NULL, -- immutable - base_data_timestamp BIGINT, - committed_timestamp BIGINT, - created_timestamp BIGINT, + base_data_timestamp TIMESTAMP(3), + committed_timestamp TIMESTAMP(3), + created_timestamp TIMESTAMP(3), UNIQUE INDEX uk_s3_wal_object_node_sequence_id (node_id, sequence_id), INDEX idx_s3_wal_object_object_id (object_id) ); diff --git a/controller/src/test/java/com/automq/rocketmq/controller/ControllerServiceImplTest.java b/controller/src/test/java/com/automq/rocketmq/controller/ControllerServiceImplTest.java index 7ce218143..5469c9051 100644 --- a/controller/src/test/java/com/automq/rocketmq/controller/ControllerServiceImplTest.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/ControllerServiceImplTest.java @@ -987,7 +987,7 @@ public void testTrimStream_WithS3Stream() throws IOException, ExecutionException S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); S3StreamObject s3StreamObject = new S3StreamObject(); - s3StreamObject.setBaseDataTimestamp(System.currentTimeMillis()); + s3StreamObject.setBaseDataTimestamp(new Date()); s3StreamObject.setStreamId(streamId); s3StreamObject.setObjectId(objectId); s3StreamObject.setStartOffset(0L); @@ -1348,8 +1348,8 @@ public void test3StreamObjects_2PC() throws IOException, ExecutionException, Int } com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = s3StreamObjectMapper.getByObjectId(objectId + 2); - Assertions.assertTrue(object.getBaseDataTimestamp() > 0); - Assertions.assertTrue(object.getCommittedTimestamp() > 0); + Assertions.assertTrue(object.getBaseDataTimestamp().getTime() > 0); + Assertions.assertTrue(object.getCommittedTimestamp().getTime() > 0); } } @@ -1470,10 +1470,10 @@ public void test3StreamObjects_2PC_NoCompacted() throws IOException, ExecutionEx Assertions.assertEquals(streamId, s3Object.getStreamId()); com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = s3StreamObjectMapper.getByObjectId(objectId + 2); - Assertions.assertTrue(object.getBaseDataTimestamp() > 1); - Assertions.assertTrue(object.getBaseDataTimestamp() > 0); - Assertions.assertTrue(object.getCommittedTimestamp() > 0); - Assertions.assertTrue(object.getCommittedTimestamp() > 0); + Assertions.assertTrue(object.getBaseDataTimestamp().getTime() > 1); + Assertions.assertTrue(object.getBaseDataTimestamp().getTime() > 0); + Assertions.assertTrue(object.getCommittedTimestamp().getTime() > 0); + Assertions.assertTrue(object.getCommittedTimestamp().getTime() > 0); } } @@ -1534,8 +1534,8 @@ public void test3StreamObjects_2PC_duplicate() throws IOException, ExecutionExce Assertions.assertEquals(S3ObjectState.BOS_COMMITTED, s3Object.getState()); com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = s3StreamObjectMapper.getByObjectId(objectId + 2); - Assertions.assertTrue(object.getBaseDataTimestamp() > 0); - Assertions.assertTrue(object.getCommittedTimestamp() > 0); + Assertions.assertTrue(object.getBaseDataTimestamp().getTime() > 0); + Assertions.assertTrue(object.getCommittedTimestamp().getTime() > 0); } } @@ -1620,9 +1620,9 @@ public void test3WALObjects_2PC_NoS3Stream() throws IOException, ExecutionExcept S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); S3WalObject object = s3WALObjectMapper.getByObjectId(objectId + 4); - Assertions.assertTrue(object.getBaseDataTimestamp() > 0); + Assertions.assertTrue(object.getBaseDataTimestamp().getTime() > 0); Assertions.assertEquals(objectId + 2, object.getSequenceId()); - Assertions.assertTrue(object.getCommittedTimestamp() > 0); + Assertions.assertTrue(object.getCommittedTimestamp().getTime() > 0); } } @@ -1686,8 +1686,8 @@ public void test3WALObjects_2PC_NoCompacted() throws IOException, ExecutionExcep S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); S3WalObject object = s3WALObjectMapper.getByObjectId(objectId + 4); Assertions.assertEquals(objectId + 4, object.getSequenceId()); - Assertions.assertTrue(object.getBaseDataTimestamp() > 1); - Assertions.assertTrue(object.getCommittedTimestamp() > 0); + Assertions.assertTrue(object.getBaseDataTimestamp().getTime() > 1); + Assertions.assertTrue(object.getCommittedTimestamp().getTime() > 0); } } @@ -1743,7 +1743,7 @@ public void test3WALObjects_2PC_Expired() throws IOException, ExecutionException .setObjectId(s3StreamObject.getObjectId()) .setStreamId(s3StreamObject.getStreamId()) .setObjectSize(s3StreamObject.getObjectSize()) - .setBaseDataTimestamp(s3StreamObject.getBaseDataTimestamp()) + .setBaseDataTimestamp(s3StreamObject.getBaseDataTimestamp().getTime()) .setStartOffset(s3StreamObject.getStartOffset()) .setEndOffset(s3StreamObject.getEndOffset()) .build()) @@ -1831,7 +1831,7 @@ public void test3WALObjects_2PC() throws IOException, ExecutionException, Interr .setObjectId(s3StreamObject.getObjectId()) .setStreamId(s3StreamObject.getStreamId()) .setObjectSize(s3StreamObject.getObjectSize()) - .setBaseDataTimestamp(s3StreamObject.getBaseDataTimestamp()) + .setBaseDataTimestamp(s3StreamObject.getBaseDataTimestamp().getTime()) .setStartOffset(s3StreamObject.getStartOffset()) .setEndOffset(s3StreamObject.getEndOffset()) .build()) @@ -1878,7 +1878,7 @@ public void test3WALObjects_2PC() throws IOException, ExecutionException, Interr S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); for (long index = objectId; index < objectId + 2; index++) { com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = s3StreamObjectMapper.getByObjectId(index); - if (object.getCommittedTimestamp() - time > 5 * 60) { + if (object.getCommittedTimestamp().getTime() - time > 5 * 60) { Assertions.fail(); } } @@ -1889,9 +1889,9 @@ public void test3WALObjects_2PC() throws IOException, ExecutionException, Interr S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); S3WalObject object = s3WALObjectMapper.getByObjectId(objectId + 4); - Assertions.assertTrue(object.getBaseDataTimestamp() > 0); + Assertions.assertTrue(object.getBaseDataTimestamp().getTime() > 0); Assertions.assertEquals(objectId + 2, object.getSequenceId()); - if (object.getCommittedTimestamp() - time > 5 * 60) { + if (object.getCommittedTimestamp().getTime() - time > 5 * 60) { Assertions.fail(); } } @@ -1950,7 +1950,7 @@ public void test3WALObjects_2PC_duplicate() throws IOException, ExecutionExcepti .setObjectId(s3StreamObject.getObjectId()) .setStreamId(s3StreamObject.getStreamId()) .setObjectSize(s3StreamObject.getObjectSize()) - .setBaseDataTimestamp(s3StreamObject.getBaseDataTimestamp()) + .setBaseDataTimestamp(s3StreamObject.getBaseDataTimestamp().getTime()) .setStartOffset(s3StreamObject.getStartOffset()) .setEndOffset(s3StreamObject.getEndOffset()) .build()) @@ -1998,7 +1998,7 @@ public void test3WALObjects_2PC_duplicate() throws IOException, ExecutionExcepti S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); for (long index = objectId; index < objectId + 2; index++) { com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = s3StreamObjectMapper.getByObjectId(index); - if (object.getCommittedTimestamp() - time > 5 * 60) { + if (object.getCommittedTimestamp().getTime() - time > 5 * 60) { Assertions.fail(); } } @@ -2010,8 +2010,8 @@ public void test3WALObjects_2PC_duplicate() throws IOException, ExecutionExcepti S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); S3WalObject object = s3WALObjectMapper.getByObjectId(objectId + 4); Assertions.assertEquals(objectId + 2, object.getSequenceId()); - Assertions.assertTrue(object.getBaseDataTimestamp() > 0); - if (object.getCommittedTimestamp() - time > 5 * 60) { + Assertions.assertTrue(object.getBaseDataTimestamp().getTime() > 0); + if (object.getCommittedTimestamp().getTime() - time > 5 * 60) { Assertions.fail(); } } diff --git a/controller/src/test/java/com/automq/rocketmq/controller/metadata/DatabaseTestBase.java b/controller/src/test/java/com/automq/rocketmq/controller/metadata/DatabaseTestBase.java index effe63d43..b364f8a89 100644 --- a/controller/src/test/java/com/automq/rocketmq/controller/metadata/DatabaseTestBase.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/metadata/DatabaseTestBase.java @@ -41,6 +41,7 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Calendar; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -144,7 +145,7 @@ protected List buildS3WalObjs(long objectId, int count) { s3StreamObject.setObjectSize(100 + i); s3StreamObject.setSequenceId(objectId + i); s3StreamObject.setNodeId((int) i + 1); - s3StreamObject.setBaseDataTimestamp(System.currentTimeMillis()); + s3StreamObject.setBaseDataTimestamp(new Date()); s3StreamObjects.add(s3StreamObject); } diff --git a/controller/src/test/java/com/automq/rocketmq/controller/metadata/S3StreamObjectTest.java b/controller/src/test/java/com/automq/rocketmq/controller/metadata/S3StreamObjectTest.java index 2953a5eba..a06cb4701 100644 --- a/controller/src/test/java/com/automq/rocketmq/controller/metadata/S3StreamObjectTest.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/metadata/S3StreamObjectTest.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.util.Calendar; +import java.util.Date; import java.util.List; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @@ -47,7 +48,7 @@ public void testS3StreamObjectCRUD() throws IOException { Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.SECOND, 30); - long time = calendar.getTime().getTime(); + Date time = calendar.getTime(); s3StreamObject.setBaseDataTimestamp(time); @@ -57,7 +58,13 @@ public void testS3StreamObjectCRUD() throws IOException { // test getById S3StreamObject s3StreamObject1 = s3StreamObjectMapper.getById(s3StreamObject.getId()); - Assertions.assertEquals(s3StreamObject, s3StreamObject1); + Assertions.assertEquals(s3StreamObject.getId(), s3StreamObject1.getId()); + Assertions.assertEquals(s3StreamObject.getObjectId(), s3StreamObject1.getObjectId()); + Assertions.assertEquals(s3StreamObject.getObjectSize(), s3StreamObject1.getObjectSize()); + Assertions.assertEquals(s3StreamObject.getStreamId(), s3StreamObject1.getStreamId()); + Assertions.assertEquals(s3StreamObject.getStartOffset(), s3StreamObject1.getStartOffset()); + Assertions.assertEquals(s3StreamObject.getEndOffset(), s3StreamObject1.getEndOffset()); + Assertions.assertEquals(s3StreamObject.getCommittedTimestamp(), s3StreamObject1.getCommittedTimestamp()); // test getByStreamAndObject S3StreamObject s3StreamObject2 = s3StreamObjectMapper.getByStreamAndObject(s3StreamObject.getStreamId(), s3StreamObject.getObjectId()); @@ -84,7 +91,14 @@ public void testS3StreamObjectCRUD() throws IOException { // test list s3StreamObjects = s3StreamObjectMapper.list(null, s3StreamObject.getStreamId(), 2000L, 2111L, 1); Assertions.assertEquals(1, s3StreamObjects.size()); - Assertions.assertEquals(s3StreamObject, s3StreamObjects.get(0)); + Assertions.assertEquals(s3StreamObject.getId(), s3StreamObjects.get(0).getId()); + Assertions.assertEquals(s3StreamObject.getObjectId(), s3StreamObjects.get(0).getObjectId()); + Assertions.assertEquals(s3StreamObject.getObjectSize(), s3StreamObjects.get(0).getObjectSize()); + Assertions.assertEquals(s3StreamObject.getStreamId(), s3StreamObjects.get(0).getStreamId()); + Assertions.assertEquals(s3StreamObject.getStartOffset(), s3StreamObjects.get(0).getStartOffset()); + Assertions.assertEquals(s3StreamObject.getEndOffset(), s3StreamObjects.get(0).getEndOffset()); + Assertions.assertEquals(s3StreamObject.getCommittedTimestamp(), s3StreamObjects.get(0).getCommittedTimestamp()); + // test delete s3StreamObjectMapper.delete(s3StreamObject1.getId(), null, null); @@ -107,14 +121,20 @@ public void testCommitS3StreamObject() throws IOException { Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.SECOND, 30); - long time = calendar.getTime().getTime(); + Date time = calendar.getTime(); s3StreamObject.setCommittedTimestamp(time); int affectedRows = s3StreamObjectMapper.commit(s3StreamObject); Assertions.assertEquals(1, affectedRows); S3StreamObject s3StreamObject2 = s3StreamObjectMapper.getById(s3StreamObject.getId()); - Assertions.assertEquals(s3StreamObject, s3StreamObject2); + Assertions.assertEquals(s3StreamObject.getId(), s3StreamObject2.getId()); + Assertions.assertEquals(s3StreamObject.getObjectId(), s3StreamObject2.getObjectId()); + Assertions.assertEquals(s3StreamObject.getObjectSize(), s3StreamObject2.getObjectSize()); + Assertions.assertEquals(s3StreamObject.getStreamId(), s3StreamObject2.getStreamId()); + Assertions.assertEquals(s3StreamObject.getStartOffset(), s3StreamObject2.getStartOffset()); + Assertions.assertEquals(s3StreamObject.getEndOffset(), s3StreamObject2.getEndOffset()); + Assertions.assertEquals(s3StreamObject.getCommittedTimestamp(), s3StreamObject2.getCommittedTimestamp()); s3StreamObjectMapper.delete(null, s3StreamObject2.getStreamId(), s3StreamObject2.getObjectId()); List s3StreamObjects = s3StreamObjectMapper.list(s3StreamObject.getObjectId(), null, null, null, null); @@ -135,7 +155,7 @@ public void testBatchDelete() throws IOException { Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.SECOND, 30); - long time = calendar.getTime().getTime(); + Date time = calendar.getTime(); s3StreamObject.setCommittedTimestamp(time); s3StreamObjectMapper.commit(s3StreamObject); @@ -157,7 +177,7 @@ public void testListRecyclable() throws IOException { Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.SECOND, 30); - long time = calendar.getTime().getTime(); + Date time = calendar.getTime(); s3StreamObject.setCommittedTimestamp(time); s3StreamObjectMapper.commit(s3StreamObject); @@ -165,18 +185,18 @@ public void testListRecyclable() throws IOException { Calendar threshold = Calendar.getInstance(); threshold.add(Calendar.HOUR, 1); - Assertions.assertTrue(threshold.getTimeInMillis() > time); + Assertions.assertTrue(threshold.getTimeInMillis() > time.getTime()); List objects = s3StreamObjectMapper.list(null, null, null, null, null); - Assertions.assertTrue(objects.get(0).getCommittedTimestamp() < threshold.getTimeInMillis()); + Assertions.assertTrue(objects.get(0).getCommittedTimestamp().getTime() < threshold.getTimeInMillis()); Assertions.assertEquals(111, objects.get(0).getStreamId()); - List ids = s3StreamObjectMapper.recyclable(List.of(111L), threshold.getTimeInMillis()); + List ids = s3StreamObjectMapper.recyclable(List.of(111L), threshold.getTime()); Assertions.assertEquals(1, ids.size()); threshold = Calendar.getInstance(); threshold.add(Calendar.HOUR, -80); - ids = s3StreamObjectMapper.recyclable(List.of(111L), threshold.getTimeInMillis()); + ids = s3StreamObjectMapper.recyclable(List.of(111L), threshold.getTime()); Assertions.assertTrue(ids.isEmpty()); } } diff --git a/controller/src/test/java/com/automq/rocketmq/controller/metadata/S3WalObjectTest.java b/controller/src/test/java/com/automq/rocketmq/controller/metadata/S3WalObjectTest.java index ca67f3754..116298b1b 100644 --- a/controller/src/test/java/com/automq/rocketmq/controller/metadata/S3WalObjectTest.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/metadata/S3WalObjectTest.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.util.Calendar; +import java.util.Date; import java.util.List; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @@ -47,7 +48,7 @@ public void testS3WalObjectCRUD() throws IOException { Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.SECOND, 30); - long time = calendar.getTime().getTime(); + Date time = calendar.getTime(); s3WALObject.setBaseDataTimestamp(time); @@ -55,7 +56,12 @@ public void testS3WalObjectCRUD() throws IOException { Assertions.assertEquals(1, affectedRows); S3WalObject s3WalObject1 = s3WalObjectMapper.getByObjectId(s3WALObject.getObjectId()); - Assertions.assertEquals(s3WALObject, s3WalObject1); + Assertions.assertEquals(s3WALObject.getObjectId(), s3WalObject1.getObjectId()); + Assertions.assertEquals(s3WALObject.getObjectSize(), s3WalObject1.getObjectSize()); + Assertions.assertEquals(s3WALObject.getNodeId(), s3WalObject1.getNodeId()); + Assertions.assertEquals(s3WALObject.getSequenceId(), s3WalObject1.getSequenceId()); + Assertions.assertEquals(s3WALObject.getSubStreams(), s3WalObject1.getSubStreams()); + Assertions.assertEquals(s3WALObject.getBaseDataTimestamp(), s3WalObject1.getBaseDataTimestamp()); List s3WalObjects = s3WalObjectMapper.list(s3WalObject1.getNodeId(), s3WalObject1.getSequenceId()); Assertions.assertNotNull(s3WalObjects); @@ -90,7 +96,7 @@ public void testCommitS3WalObject() throws IOException { Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.SECOND, 30); - long time = calendar.getTime().getTime(); + Date time = calendar.getTime(); s3WALObject.setBaseDataTimestamp(time); @@ -98,12 +104,18 @@ public void testCommitS3WalObject() throws IOException { Assertions.assertEquals(1, affectedRows); S3WalObject s3WalObject1 = s3WalObjectMapper.getByObjectId(s3WALObject.getObjectId()); - s3WalObject1.setCommittedTimestamp(time + 10 * 1000); + s3WalObject1.setCommittedTimestamp(new Date(time.getTime() + 10 * 1000)); affectedRows = s3WalObjectMapper.commit(s3WalObject1); S3WalObject s3WalObject2 = s3WalObjectMapper.getByObjectId(s3WalObject1.getObjectId()); Assertions.assertEquals(1, affectedRows); - Assertions.assertEquals(s3WalObject1, s3WalObject2); + Assertions.assertEquals(s3WalObject1.getObjectId(), s3WalObject2.getObjectId()); + Assertions.assertEquals(s3WalObject1.getObjectSize(), s3WalObject2.getObjectSize()); + Assertions.assertEquals(s3WalObject1.getNodeId(), s3WalObject2.getNodeId()); + Assertions.assertEquals(s3WalObject1.getSequenceId(), s3WalObject2.getSequenceId()); + Assertions.assertEquals(s3WalObject1.getSubStreams(), s3WalObject2.getSubStreams()); + Assertions.assertEquals(s3WalObject1.getBaseDataTimestamp(), s3WalObject2.getBaseDataTimestamp()); + Assertions.assertEquals(s3WalObject1.getCommittedTimestamp(), s3WalObject2.getCommittedTimestamp()); s3WalObjectMapper.delete(null, s3WalObject1.getNodeId(), null); List s3WalObjects = s3WalObjectMapper.list(null, s3WalObject1.getSequenceId()); diff --git a/controller/src/test/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStoreTest.java b/controller/src/test/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStoreTest.java index 432320fd2..f3e92aa5b 100644 --- a/controller/src/test/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStoreTest.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/metadata/database/DefaultMetadataStoreTest.java @@ -1595,8 +1595,8 @@ public void testCommitStreamObject() throws IOException, ControllerException { com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = s3StreamObjectMapper.getByObjectId(objectId + 2); Assertions.assertEquals(111L, object.getObjectSize()); Assertions.assertEquals(streamId, object.getStreamId()); - Assertions.assertTrue(object.getBaseDataTimestamp() > 0); - Assertions.assertTrue(object.getCommittedTimestamp() > 0); + Assertions.assertTrue(object.getBaseDataTimestamp().getTime() > 0); + Assertions.assertTrue(object.getCommittedTimestamp().getTime() > 0); } } @@ -1639,8 +1639,8 @@ public void testCommitStreamObject_NoCompacted() throws IOException, ControllerE Assertions.assertEquals(S3ObjectState.BOS_COMMITTED, s3Object.getState()); com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = s3StreamObjectMapper.getByObjectId(objectId + 2); - Assertions.assertTrue(object.getBaseDataTimestamp() > 0); - Assertions.assertTrue(object.getCommittedTimestamp() > 0); + Assertions.assertTrue(object.getBaseDataTimestamp().getTime() > 0); + Assertions.assertTrue(object.getCommittedTimestamp().getTime() > 0); Assertions.assertEquals(111L, object.getObjectSize()); Assertions.assertEquals(streamId, object.getStreamId()); } @@ -1749,7 +1749,7 @@ public void testCommitWALObject() throws IOException, ExecutionException, Interr .setObjectId(s3StreamObject2.getObjectId()) .setStreamId(s3StreamObject2.getStreamId()) .setObjectSize(s3StreamObject2.getObjectSize()) - .setBaseDataTimestamp(s3StreamObject2.getBaseDataTimestamp()) + .setBaseDataTimestamp(s3StreamObject2.getBaseDataTimestamp().getTime()) .setStartOffset(s3StreamObject2.getStartOffset()) .setEndOffset(s3StreamObject2.getEndOffset()) .build()).toList(); @@ -1779,7 +1779,7 @@ public void testCommitWALObject() throws IOException, ExecutionException, Interr S3StreamObjectMapper s3StreamObjectMapper = session.getMapper(S3StreamObjectMapper.class); for (long index = objectId; index < objectId + 2; index++) { com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject object = s3StreamObjectMapper.getByObjectId(index); - Assertions.assertTrue(object.getCommittedTimestamp() > 0); + Assertions.assertTrue(object.getCommittedTimestamp().getTime() > 0); } S3Object s3Object = s3ObjectMapper.getById(objectId + 4); @@ -1789,8 +1789,8 @@ public void testCommitWALObject() throws IOException, ExecutionException, Interr S3WalObjectMapper s3WALObjectMapper = session.getMapper(S3WalObjectMapper.class); S3WalObject object = s3WALObjectMapper.getByObjectId(objectId + 4); Assertions.assertEquals(objectId + 2, object.getSequenceId()); - Assertions.assertTrue(object.getBaseDataTimestamp() > 0); - Assertions.assertTrue(object.getCommittedTimestamp() > 0); + Assertions.assertTrue(object.getBaseDataTimestamp().getTime() > 0); + Assertions.assertTrue(object.getCommittedTimestamp().getTime() > 0); } } @@ -1851,6 +1851,7 @@ public void testCommitWalObject_WalNotExist() throws IOException, ExecutionExcep long streamId; int nodeId = 1; long objectId; + Calendar calendar = Calendar.getInstance(); S3WALObject walObject = S3WALObject.newBuilder() .setObjectId(-1) @@ -1887,7 +1888,7 @@ public void testCommitWalObject_WalNotExist() throws IOException, ExecutionExcep s3Object.setState(S3ObjectState.BOS_PREPARED); s3Object.setStreamId(streamId); s3Object.setObjectSize(2139L); - Calendar calendar = Calendar.getInstance(); + calendar.add(Calendar.HOUR, 1); s3Object.setExpiredTimestamp(calendar.getTime()); objectMapper.prepare(s3Object); @@ -1904,10 +1905,11 @@ public void testCommitWalObject_WalNotExist() throws IOException, ExecutionExcep .atMost(10, TimeUnit.SECONDS) .until(metadataStore::isLeader); + calendar.add(Calendar.HOUR, 2); S3StreamObject streamObject = S3StreamObject.newBuilder() .setObjectId(objectId) .setStreamId(streamId) - .setBaseDataTimestamp(1) + .setBaseDataTimestamp(calendar.getTimeInMillis()) .setStartOffset(0) .setEndOffset(2) .setObjectSize(2139) @@ -1921,7 +1923,7 @@ public void testCommitWalObject_WalNotExist() throws IOException, ExecutionExcep try (SqlSession session = getSessionFactory().openSession()) { S3StreamObjectMapper mapper = session.getMapper(S3StreamObjectMapper.class); com.automq.rocketmq.controller.metadata.database.dao.S3StreamObject s3StreamObject = mapper.getByObjectId(objectId); - Assertions.assertTrue(s3StreamObject.getCommittedTimestamp() > 0); + Assertions.assertTrue(s3StreamObject.getCommittedTimestamp().getTime() > 0); S3ObjectMapper objectMapper = session.getMapper(S3ObjectMapper.class); S3Object s3Object = objectMapper.getById(objectId);