Skip to content

Commit

Permalink
fix(controller): fix bug in recycling stream object (#577)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Nov 6, 2023
1 parent 097afdc commit 3730ced
Showing 1 changed file with 29 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package com.automq.rocketmq.controller.server.tasks;

import apache.rocketmq.controller.v1.Code;
import apache.rocketmq.controller.v1.TopicStatus;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.metadata.dao.S3ObjectCriteria;
import com.automq.rocketmq.metadata.dao.Stream;
import com.automq.rocketmq.metadata.dao.StreamCriteria;
Expand All @@ -31,7 +32,9 @@
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.ibatis.session.SqlSession;

public class RecycleS3Task extends ControllerTask {
Expand Down Expand Up @@ -73,18 +76,31 @@ public void process() throws ControllerException {
return;
}

metadataStore.getDataStore().batchDeleteS3Objects(recyclable)
.whenComplete((list, e) -> {
if (null != e) {
LOGGER.error("DataStore failed to delete S3 objects", e);
return;
}
LOGGER.info("Batch delete S3 objects, having object-id-list={}", list);
s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addAll(list).build());
streamObjectMapper.batchDelete(list);
session.commit();
});
}
List<Long> result = metadataStore.getDataStore().batchDeleteS3Objects(recyclable).get();

HashSet<Long> expired = new HashSet<>(recyclable);
result.forEach(expired::remove);
LOGGER.info("Recycle {} S3 objects: deleted: [{}], expired but not deleted: [{}]",
result.size(), result.stream().map(String::valueOf).collect(Collectors.joining(", ")),
expired.stream().map(String::valueOf).collect(Collectors.joining(", "))
);

int count = s3ObjectMapper.deleteByCriteria(S3ObjectCriteria.newBuilder().addAll(result).build());
if (count != result.size()) {
LOGGER.error("Failed to delete S3 objects, having object-id-list={} but deleting {} rows", result, count);
return;
}

count = streamObjectMapper.batchDelete(result);
if (count != result.size()) {
LOGGER.error("Failed to delete S3 objects, having object-id-list={} but deleting {} rows", result, count);
return;
}

session.commit();
} catch (Exception e) {
LOGGER.error("Failed to recycle S3 Objects", e);
throw new ControllerException(Code.INTERNAL_VALUE, e);
}
}
}

0 comments on commit 3730ced

Please sign in to comment.