diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStream.java b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStream.java index e7526817..be8498a7 100644 --- a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStream.java +++ b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStream.java @@ -30,9 +30,6 @@ public interface GroupedStream { GroupedStream count(); - GroupedStream count(SelectAction selectAction); - - GroupedStream min(SelectAction selectAction); diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java index 71dd73da..b60a1037 100644 --- a/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java +++ b/core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java @@ -76,22 +76,6 @@ public GroupedStream count() { return this.pipeline.addGroupedStreamVirtualNode(graphNode, parent); } - @Override - public GroupedStream count(SelectAction selectAction) { - String name = OperatorNameMaker.makeName(COUNT_PREFIX, pipeline.getJobId()); - - Supplier> supplier = new AggregateSupplier<>(name, parent.getName(), () -> 0, (K key, V value, Integer agg) -> agg + 1); - - GraphNode graphNode; - if (this.parent.shuffleNode()) { - graphNode = new ShuffleProcessorNode<>(name, parent.getName(), supplier); - } else { - graphNode = new ProcessorNode<>(name, parent.getName(), supplier); - } - - return this.pipeline.addGroupedStreamVirtualNode(graphNode, parent); - } - @Override public GroupedStream min(SelectAction selectAction) { String name = OperatorNameMaker.makeName(MIN_PREFIX, pipeline.getJobId()); diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/running/MessageQueueListenerWrapper.java b/core/src/main/java/org/apache/rocketmq/streams/core/running/MessageQueueListenerWrapper.java index 7e6f0877..dd8c877a 100644 --- a/core/src/main/java/org/apache/rocketmq/streams/core/running/MessageQueueListenerWrapper.java +++ b/core/src/main/java/org/apache/rocketmq/streams/core/running/MessageQueueListenerWrapper.java @@ -41,8 +41,6 @@ class MessageQueueListenerWrapper implements MessageQueueListener { private BiFunction, Set, Throwable> recoverHandler; - private Function, Throwable> resetOffsetHandler; - MessageQueueListenerWrapper(MessageQueueListener originListener, TopologyBuilder topologyBuilder) { this.originListener = originListener; this.topologyBuilder = topologyBuilder; @@ -103,8 +101,4 @@ Processor selectProcessor(String key) { public void setRecoverHandler(BiFunction, Set, Throwable> handler) { this.recoverHandler = handler; } - - public void setResetOffsetHandler(Function, Throwable> handler) { - this.resetOffsetHandler = handler; - } } diff --git a/example/src/main/java/org/apache/rocketmq/streams/examples/pojo/Demo.java b/example/src/main/java/org/apache/rocketmq/streams/examples/pojo/Demo.java deleted file mode 100644 index 81c21ffb..00000000 --- a/example/src/main/java/org/apache/rocketmq/streams/examples/pojo/Demo.java +++ /dev/null @@ -1,70 +0,0 @@ -package org.apache.rocketmq.streams.examples.pojo; -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -import com.alibaba.fastjson.JSON; -import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.streams.core.RocketMQStream; -import org.apache.rocketmq.streams.core.rstream.StreamBuilder; -import org.apache.rocketmq.streams.core.serialization.KeyValueDeserializer; -import org.apache.rocketmq.streams.core.topology.TopologyBuilder; -import org.apache.rocketmq.streams.core.util.Pair; - -import java.util.Properties; - -/** - * 1、启动RocketMQ - * 2、创建topic - * 3、启动本例子运行 - * 4、向topic中写入数据 - * 5、观察输出结果 - */ -public class Demo { - public static void main(String[] args) throws Throwable { - StreamBuilder builder = new StreamBuilder("demo"); - - builder.source("user", new KeyValueDeserializer() { - @Override - public Pair deserialize(byte[] total) throws Throwable { - //对象需要有默认构造器 - User user = JSON.parseObject(total, User.class); - return new Pair<>(null, user); - } - }) - .keyBy(User::getAge) - .count(User::getName) - .toRStream() - .print(); - - TopologyBuilder topologyBuilder = builder.build(); - - Properties properties = new Properties(); - properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876"); - - RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties); - Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") { - @Override - public void run() { - rocketMQStream.stop(); - } - }); - - rocketMQStream.start(); - } - -}