Skip to content

Commit

Permalink
feat(proxy): introduce TimerService
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored and daniel-y committed Oct 30, 2023
1 parent 90c342a commit cc2c072
Show file tree
Hide file tree
Showing 17 changed files with 543 additions and 303 deletions.
145 changes: 145 additions & 0 deletions common/src/main/java/com/automq/rocketmq/common/ServiceThread.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.common;

import com.automq.rocketmq.common.util.Lifecycle;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;

public abstract class ServiceThread implements Runnable, Lifecycle {
private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);

private static final long JOIN_TIME = 90 * 1000;

protected Thread thread;
protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
protected volatile boolean stopped = false;
protected boolean isDaemon = false;

//Make it able to restart the thread
private final AtomicBoolean started = new AtomicBoolean(false);

public ServiceThread() {

}

public abstract String getServiceName();

@Override
public void start() {
log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
log.info("Start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
}

@Override
public void shutdown() {
this.shutdown(false);
}

public void shutdown(final boolean interrupt) {
log.info("Try to shutdown service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(true, false)) {
return;
}
this.stopped = true;
log.info("shutdown thread[{}] interrupt={} ", getServiceName(), interrupt);

//if thead is waiting, wakeup it
wakeup();

try {
if (interrupt) {
this.thread.interrupt();
}

long beginTime = System.currentTimeMillis();
if (!this.thread.isDaemon()) {
this.thread.join(this.getJoinTime());
}
long elapsedTime = System.currentTimeMillis() - beginTime;
log.info("join thread[{}], elapsed time: {}ms, join time:{}ms", getServiceName(), elapsedTime, this.getJoinTime());
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}

public long getJoinTime() {
return JOIN_TIME;
}

public void makeStop() {
if (!started.get()) {
return;
}
this.stopped = true;
log.info("makestop thread[{}] ", this.getServiceName());
}

public void wakeup() {
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}

protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}

//entry to wait
waitPoint.reset();

try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}

protected void onWaitEnd() {
}

public boolean isStopped() {
return stopped;
}

public boolean isDaemon() {
return isDaemon;
}

public void setDaemon(boolean daemon) {
isDaemon = daemon;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.automq.rocketmq.proxy.service;

import com.automq.rocketmq.common.ServiceThread;
import com.automq.rocketmq.proxy.model.ProxyContextExt;
import com.automq.rocketmq.store.model.message.Filter;
import java.util.Optional;
Expand All @@ -31,7 +32,6 @@
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.proxy.common.ProxyContext;
Expand Down
15 changes: 14 additions & 1 deletion store/src/main/fbs/store.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,17 @@ table ConsumerGroupMetadata {
table ConsumeTimes {
offset:long;
consume_times:int;
}
}

enum TimerHandlerType:short {
POP_REVIVE,
PULL_RETRY,
TIMER_MESSAGE,
TRANSACTION_MESSAGE,
}

table TimerTag {
delivery_timestamp:long;
handler_type:TimerHandlerType;
payload: [byte];
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
import com.automq.rocketmq.store.service.RocksDBKVService;
import com.automq.rocketmq.store.service.SnapshotService;
import com.automq.rocketmq.store.service.StreamOperationLogService;
import com.automq.rocketmq.store.service.TimerService;
import com.automq.rocketmq.store.service.api.KVService;
import com.automq.rocketmq.store.service.api.OperationLogService;
import com.automq.stream.s3.operator.DefaultS3Operator;
import com.automq.stream.s3.operator.S3Operator;

import static com.automq.rocketmq.store.MessageStoreImpl.KV_NAMESPACE_CHECK_POINT;
import static com.automq.rocketmq.store.MessageStoreImpl.KV_NAMESPACE_TIMER_TAG;

public class MessageStoreBuilder {
public static MessageStoreImpl build(StoreConfig storeConfig, S3StreamConfig s3StreamConfig,
Expand All @@ -49,12 +49,14 @@ public static MessageStoreImpl build(StoreConfig storeConfig, S3StreamConfig s3S
InflightService inflightService = new InflightService();
SnapshotService snapshotService = new SnapshotService(streamStore, kvService);
OperationLogService operationLogService = new StreamOperationLogService(streamStore, snapshotService, storeConfig);
LogicQueueManager logicQueueManager = new DefaultLogicQueueManager(storeConfig, streamStore, kvService,
// TODO: We may have multiple timer service in the future.
TimerService timerService = new TimerService("timer_tag_0", kvService);
LogicQueueManager logicQueueManager = new DefaultLogicQueueManager(storeConfig, streamStore, kvService, timerService,
metadataService, operationLogService, inflightService);
ReviveService reviveService = new ReviveService(KV_NAMESPACE_CHECK_POINT, KV_NAMESPACE_TIMER_TAG,
kvService, metadataService, inflightService, logicQueueManager, dlqSender);
ReviveService reviveService = new ReviveService(KV_NAMESPACE_CHECK_POINT, kvService, timerService,
metadataService, inflightService, logicQueueManager, dlqSender);
S3ObjectOperator objectOperator = new S3ObjectOperatorImpl(operator);

return new MessageStoreImpl(storeConfig, streamStore, metadataService, kvService, inflightService, snapshotService, logicQueueManager, reviveService, objectOperator);
return new MessageStoreImpl(storeConfig, streamStore, metadataService, kvService, timerService, inflightService, snapshotService, logicQueueManager, reviveService, objectOperator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.automq.rocketmq.store.service.InflightService;
import com.automq.rocketmq.store.service.ReviveService;
import com.automq.rocketmq.store.service.SnapshotService;
import com.automq.rocketmq.store.service.TimerService;
import com.automq.rocketmq.store.service.api.KVService;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -46,7 +47,6 @@

public class MessageStoreImpl implements MessageStore {
public static final String KV_NAMESPACE_CHECK_POINT = "check_point";
public static final String KV_NAMESPACE_TIMER_TAG = "timer_tag";
public static final String KV_NAMESPACE_FIFO_INDEX = "fifo_index";

private final AtomicBoolean started = new AtomicBoolean(false);
Expand All @@ -56,20 +56,22 @@ public class MessageStoreImpl implements MessageStore {
private final StreamStore streamStore;
private final StoreMetadataService metadataService;
private final KVService kvService;
private final TimerService timerService;
private final ReviveService reviveService;
private final InflightService inflightService;
private final SnapshotService snapshotService;
private final LogicQueueManager logicQueueManager;
private final S3ObjectOperator s3ObjectOperator;

public MessageStoreImpl(StoreConfig config, StreamStore streamStore,
StoreMetadataService metadataService, KVService kvService, InflightService inflightService,
SnapshotService snapshotService, LogicQueueManager logicQueueManager, ReviveService reviveService,
S3ObjectOperator s3ObjectOperator) {
StoreMetadataService metadataService, KVService kvService, TimerService timerService,
InflightService inflightService, SnapshotService snapshotService, LogicQueueManager logicQueueManager,
ReviveService reviveService, S3ObjectOperator s3ObjectOperator) {
this.config = config;
this.streamStore = streamStore;
this.metadataService = metadataService;
this.kvService = kvService;
this.timerService = timerService;
this.inflightService = inflightService;
this.snapshotService = snapshotService;
this.logicQueueManager = logicQueueManager;
Expand Down Expand Up @@ -101,7 +103,7 @@ public void start() throws Exception {
}
clearStateMachineData();
streamStore.start();
reviveService.start();
timerService.start();
snapshotService.start();
logicQueueManager.start();
}
Expand All @@ -113,16 +115,16 @@ public void shutdown() throws Exception {
}
logicQueueManager.shutdown();
snapshotService.shutdown();
reviveService.shutdown();
timerService.start();
streamStore.shutdown();
clearStateMachineData();
}

private void clearStateMachineData() throws StoreException {
// clear all statemachine related data in rocksdb
kvService.clear(KV_NAMESPACE_CHECK_POINT);
kvService.clear(KV_NAMESPACE_TIMER_TAG);
kvService.clear(KV_NAMESPACE_FIFO_INDEX);
timerService.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.automq.rocketmq.store.metrics;

import com.automq.rocketmq.common.MetricsManager;
import com.automq.rocketmq.common.ServiceThread;
import com.automq.rocketmq.common.config.MetricsConfig;
import com.automq.rocketmq.store.MessageStoreImpl;
import com.automq.rocketmq.store.api.LogicQueue;
Expand All @@ -37,7 +38,6 @@
import java.util.Set;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.metrics.NopLongCounter;
import org.apache.rocketmq.common.metrics.NopObservableLongGauge;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.automq.rocketmq.store.exception.StoreException;
import com.automq.rocketmq.store.model.message.TopicQueueId;
import com.automq.rocketmq.store.service.InflightService;
import com.automq.rocketmq.store.service.TimerService;
import com.automq.rocketmq.store.service.api.KVService;
import com.automq.rocketmq.store.service.api.OperationLogService;
import com.automq.stream.utils.FutureUtil;
Expand All @@ -46,18 +47,21 @@ public class DefaultLogicQueueManager implements LogicQueueManager {
private final StoreConfig storeConfig;
private final StreamStore streamStore;
private final KVService kvService;
private final TimerService timerService;
private final StoreMetadataService metadataService;
private final OperationLogService operationLogService;
private final InflightService inflightService;
private final ConcurrentMap<TopicQueueId, CompletableFuture<LogicQueue>> logicQueueMap;
private final String identity = "[DefaultLogicQueueManager]";

public DefaultLogicQueueManager(StoreConfig storeConfig, StreamStore streamStore,
KVService kvService, StoreMetadataService metadataService, OperationLogService operationLogService,
KVService kvService, TimerService timerService, StoreMetadataService metadataService,
OperationLogService operationLogService,
InflightService inflightService) {
this.storeConfig = storeConfig;
this.streamStore = streamStore;
this.kvService = kvService;
this.timerService = timerService;
this.metadataService = metadataService;
this.operationLogService = operationLogService;
this.inflightService = inflightService;
Expand Down Expand Up @@ -148,7 +152,7 @@ public CompletableFuture<LogicQueue> createAndOpen(long topicId, int queueId) {
return CompletableFuture.completedFuture(null);
}

MessageStateMachine stateMachine = new DefaultLogicQueueStateMachine(topicId, queueId, kvService);
MessageStateMachine stateMachine = new DefaultLogicQueueStateMachine(topicId, queueId, kvService, timerService);
LogicQueue logicQueue = new StreamLogicQueue(storeConfig, topicId, queueId,
metadataService, stateMachine, streamStore, operationLogService, inflightService);

Expand Down
Loading

0 comments on commit cc2c072

Please sign in to comment.