Skip to content

Commit

Permalink
fix: key expand (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Jan 21, 2025
1 parent 52b2961 commit 13c02f3
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ public void start() {

commands = new Commands(commandConfig);

ScheduledExecutorService scheduler = LocalStorageExecutors.getInstance().getScheduler();
scheduler.scheduleAtFixedRate(this::flush, 1, 1, TimeUnit.SECONDS);
ScheduledExecutorService scheduler = LocalStorageExecutors.getInstance().getFlushScheduler();
scheduler.scheduleAtFixedRate(this::flush, 10, 10, TimeUnit.SECONDS);

logger.info("local storage client start success, dir = {}", dir);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public class LocalStorageExecutors {

private final MpscSlotHashExecutor commandExecutor;
private final FlushExecutor flushExecutor;
private final ScheduledExecutorService scheduler;
private final ScheduledExecutorService flushScheduler;
private final ScheduledExecutorService walScheduler;

private LocalStorageExecutors() {
{
Expand All @@ -39,9 +40,13 @@ private LocalStorageExecutors() {
logger.info("local storage flush executor init success, threads = {}, queueSize = {}", threads, queueSize);
}
{
scheduler = Executors.newScheduledThreadPool(1, new CamelliaThreadFactory("local-storage-schedule"));
flushScheduler = Executors.newScheduledThreadPool(1, new CamelliaThreadFactory("local-storage-flush-schedule"));
logger.info("local storage flush scheduler init success");
}
{
walScheduler = Executors.newScheduledThreadPool(1, new CamelliaThreadFactory("local-storage-wal-schedule"));
logger.info("local storage wal scheduler init success");
}
}

public static LocalStorageExecutors getInstance() {
Expand All @@ -63,7 +68,11 @@ public FlushExecutor getFlushExecutor() {
return flushExecutor;
}

public ScheduledExecutorService getScheduler() {
return scheduler;
public ScheduledExecutorService getFlushScheduler() {
return flushScheduler;
}

public ScheduledExecutorService getWalScheduler() {
return walScheduler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ public void load() throws IOException {
ByteBuffer buffer1 = ByteBuffer.wrap(magic_header);
while (buffer1.hasRemaining()) {
int write = fileChannel.write(buffer1);
logger.info("init key.manifest.file, magic_header, key.manifest.file = {}, result = {}", fileName, write);
logger.info("init key.manifest.file, magic_header, key.manifest.file = {}, write.len = {}", fileName, write);
}
ByteBuffer buffer2 = ByteBuffer.wrap(magic_footer);
while (buffer2.hasRemaining()) {
int write = fileChannel.write(buffer2, magic_header.length + RedisClusterCRC16Utils.SLOT_SIZE * (8+8+4));
logger.info("init key.manifest.file, magic_footer, key.manifest.file = {}, result = {}", fileName, write);
logger.info("init key.manifest.file, magic_footer, key.manifest.file = {}, write.len = {}", fileName, write);
}
} else {
int len = magic_header.length + magic_footer.length + RedisClusterCRC16Utils.SLOT_SIZE * (8+8+4);
Expand Down Expand Up @@ -205,10 +205,38 @@ public SlotInfo expand(short slot) throws IOException {
}
}
}
//尝试其他文件
for (Map.Entry<Long, BitSet> entry : fileBitsMap.entrySet()) {
Long otherFileId = entry.getKey();
if (otherFileId == fileId) {
continue;
}
BitSet bitSet = entry.getValue();
if (LocalStorageConstants.key_manifest_bit_size - bitSet.cardinality() >= bitsStep*2) {
for (int i = 0; i< LocalStorageConstants.key_manifest_bit_size -bitsStep*2; i++) {
if (bitSet.get(i, i + bitsStep * 2).cardinality() == 0) {
//clear old
for (int j=bitsStart; j<bitsEnd; j++) {
bitSet.set(j, false);
}
//set new
for (int j=i; j<i+bitsStep*2; j++) {
bitSet.set(j, true);
}
//update
update(slot, otherFileId, (long) i * LocalStorageConstants._64k, capacity*2);
return slotInfoMap.get(slot);
}
}
}
}
//分配不出来就使用新文件
fileId = initFileId();
update(slot, fileId, 0, LocalStorageConstants._64k);
fileBitsMap.get(fileId).set(0, true);
update(slot, fileId, 0, capacity*2);
BitSet bitSet = fileBitsMap.get(fileId);
for (int i=0; i<bitsStep*2; i++) {
bitSet.set(i, true);
}
return slotInfoMap.get(slot);
} finally {
readWriteLock.writeLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,25 @@ public class SlotKeyReadWrite {

private static final Logger logger = LoggerFactory.getLogger(SlotKeyReadWrite.class);

private static int keyFlushSize;
private static int keyFlushIntervalSeconds;
static {
updateConf();
ProxyDynamicConf.registerCallback(SlotKeyReadWrite::updateConf);
}
private static void updateConf() {
int keyFlushSize = ProxyDynamicConf.getInt("local.storage.key.flush.size", 200);
if (SlotKeyReadWrite.keyFlushSize != keyFlushSize) {
SlotKeyReadWrite.keyFlushSize = keyFlushSize;
logger.info("local.storage.key.flush.size, update to {}", keyFlushSize);
}
int keyFlushIntervalSeconds = ProxyDynamicConf.getInt("local.storage.key.flush.interval.seconds", 300);
if (SlotKeyReadWrite.keyFlushIntervalSeconds != keyFlushIntervalSeconds) {
SlotKeyReadWrite.keyFlushIntervalSeconds = keyFlushIntervalSeconds;
logger.info("local.storage.key.flush.interval.seconds, update to {}", keyFlushIntervalSeconds);
}
}

private long lastFlushTime = TimeCache.currentMillis;

private final short slot;
Expand All @@ -37,30 +56,13 @@ public class SlotKeyReadWrite {

private volatile FlushStatus flushStatus = FlushStatus.FLUSH_OK;

private int keyFlushSize;
private int keyFlushIntervalSeconds;

private CompletableFuture<FlushResult> flushFuture;

public SlotKeyReadWrite(short slot, KeyFlushExecutor executor, KeyBlockReadWrite keyBlockReadWrite) {
this.slot = slot;
this.executor = executor;
this.keyBlockReadWrite = keyBlockReadWrite;
updateConf();
ProxyDynamicConf.registerCallback(this::updateConf);
}

private void updateConf() {
int keyFlushSize = ProxyDynamicConf.getInt("local.storage.key.flush.size", 200);
if (this.keyFlushSize != keyFlushSize) {
this.keyFlushSize = keyFlushSize;
logger.info("local.storage.key.flush.size, update to {}", keyFlushSize);
}
int keyFlushIntervalSeconds = ProxyDynamicConf.getInt("local.storage.key.flush.interval.seconds", 300);
if (this.keyFlushIntervalSeconds != keyFlushIntervalSeconds) {
this.keyFlushIntervalSeconds = keyFlushIntervalSeconds;
logger.info("local.storage.key.flush.interval.seconds, update to {}", keyFlushIntervalSeconds);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,25 @@ public class SlotStringReadWrite {

private static final Logger logger = LoggerFactory.getLogger(SlotStringReadWrite.class);

private static int stringValueFlushSize;
private static int stringValueFlushIntervalSeconds;
static {
updateConf();
ProxyDynamicConf.registerCallback(SlotStringReadWrite::updateConf);
}
private static void updateConf() {
int stringValueFlushSize = ProxyDynamicConf.getInt("local.storage.string.value.flush.size", 200);
if (SlotStringReadWrite.stringValueFlushSize != stringValueFlushSize) {
SlotStringReadWrite.stringValueFlushSize = stringValueFlushSize;
logger.info("local.storage.string.value.flush.size, update to {}", stringValueFlushSize);
}
int stringValueFlushIntervalSeconds = ProxyDynamicConf.getInt("local.storage.string.value.flush.interval.seconds", 300);
if (SlotStringReadWrite.stringValueFlushIntervalSeconds != stringValueFlushIntervalSeconds) {
SlotStringReadWrite.stringValueFlushIntervalSeconds = stringValueFlushIntervalSeconds;
logger.info("local.storage.string.value.flush.interval.seconds, update to {}", stringValueFlushIntervalSeconds);
}
}

private long lastFlushTime = TimeCache.currentMillis;

private final short slot;
Expand All @@ -38,28 +57,11 @@ public class SlotStringReadWrite {

private CompletableFuture<FlushResult> flushFuture;

private int stringValueFlushSize;
private int stringValueFlushIntervalSeconds;

public SlotStringReadWrite(short slot, ValueFlushExecutor flushExecutor, StringBlockReadWrite slotStringBlockCache) {
this.slot = slot;
this.flushExecutor = flushExecutor;
this.stringBlockReadWrite = slotStringBlockCache;
updateConf();
ProxyDynamicConf.registerCallback(this::updateConf);
}

private void updateConf() {
int stringValueFlushSize = ProxyDynamicConf.getInt("local.storage.string.value.flush.size", 200);
if (this.stringValueFlushSize != stringValueFlushSize) {
this.stringValueFlushSize = stringValueFlushSize;
logger.info("local.storage.string.value.flush.size, update to {}", stringValueFlushSize);
}
int stringValueFlushIntervalSeconds = ProxyDynamicConf.getInt("local.storage.string.value.flush.interval.seconds", 300);
if (this.stringValueFlushIntervalSeconds != stringValueFlushIntervalSeconds) {
this.stringValueFlushIntervalSeconds = stringValueFlushIntervalSeconds;
logger.info("local.storage.string.value.flush.interval.seconds, update to {}", stringValueFlushIntervalSeconds);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

/**
* Created by caojiajun on 2025/1/17
Expand All @@ -20,6 +21,13 @@ public interface IWalManifest {
*/
long fileId(short slot) throws IOException;

/**
* get write lock
* @param fileId fileId
* @return lock
*/
ReentrantLock getWriteLock(long fileId);

/**
* wal文件已经写到哪里了
* @param fileId 文件id
Expand Down
Loading

0 comments on commit 13c02f3

Please sign in to comment.