Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
Conflicts:
	external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
  • Loading branch information
lukess committed Feb 16, 2017
2 parents 0152c5e + d235a0c commit 95264d4
Show file tree
Hide file tree
Showing 160 changed files with 10,245 additions and 691 deletions.
22 changes: 17 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
## 2.0.0
## 2.0.0
* STORM-2250: Kafka Spout Refactoring to Increase Modularity and Testability
* STORM-2346: Files with unapproved licenses: download-rc-directory.sh verify-release-file.sh
* STORM-2350: Storm-HDFS's listFilesByModificationTime is broken
* STORM-1961: Stream api for storm core use cases
* STORM-2327: Introduce ConfigurableTopology
* STORM-2323: Precondition for Leader Nimbus should check all topology blobs and also corresponding dependencies.
* STORM-2305: STORM-2279 calculates task index different from grouper code
Expand Down Expand Up @@ -28,7 +32,6 @@
* storm-2205: Racecondition in getting nimbus summaries while ZK connections are reconnected
* STORM-1278: Port org.apache.storm.daemon.worker to java
* STORM-2192: Add a new IAutoCredentials plugin to support SSL files
* STORM-2197: NimbusClient connectins leak due to leakage in ThriftClient
* STORM-2185: Storm Supervisor doesn't delete directories properly sometimes
* STORM-2188: Interrupt all executor threads before joining in executor shutdown
* STORM-203: Adding paths to default java library path
Expand Down Expand Up @@ -196,11 +199,13 @@
* STORM-1769: Added a test to check local nimbus with notifier plugin

## 1.1.0
* STORM-2340: fix AutoCommitMode issue in KafkaSpout
* STORM-2344: Flux YAML File Viewer for Nimbus UI
* STORM-2281: Running Multiple Kafka Spouts (Trident) Throws Illegal State Exception
* STORM-2296: Kafka spout no dup on leader changes
* STORM-2014: New Kafka spout duplicates checking if failed messages have reached max retries
* STORM-1443: [Storm SQL] Support customizing parallelism in StormSQL
* STORM-2148: [Storm SQL] Trident mode: back to code generate and compile Trident topology
* STORM-2321: Handle blobstore zk key deletion in KeySequenceNumber.
* STORM-2336: Close Localizer and AsyncLocalizer when supervisor is shutting down
* STORM-2335: Fix broken Topology visualization with empty ':transferred' in executor stats
* STORM-2331: Emitting from JavaScript should work when not anchoring.
* STORM-2320: DRPC client printer class reusable for local and remote DRPC.
* STORM-2225: change spout config to be simpler.
Expand Down Expand Up @@ -301,6 +306,13 @@
* STORM-1868: Modify TridentKafkaWordCount to run in distributed mode

## 1.0.3
* STORM-2197: NimbusClient connectins leak due to leakage in ThriftClient
* STORM-2321: Handle blobstore zk key deletion in KeySequenceNumber.
* STORM-2324: Fix deployment failure if resources directory is missing in topology jar
* STORM-2335: Fix broken Topology visualization with empty ':transferred' in executor stats
* STORM-2336: Close Localizer and AsyncLocalizer when supervisor is shutting down
* STORM-2338: Subprocess exception handling is broken in storm.py on Windows environment
* STORM-2337: Broken documentation generation for storm-metrics-profiling-internal-actions.md and windows-users-guide.md
* STORM-2325: Logviewer doesn't consider 'storm.local.hostname'
* STORM-1742: More accurate 'complete latency'
* STORM-2176: Workers do not shutdown cleanly and worker hooks don't run when a topology is killed
Expand Down
102 changes: 101 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,106 @@ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS "AS IS" AND ANY EXPRESS OR IM

-----------------------------------------------------------------------

For js-yaml.min.js (storm-core/src/ui/public/js/)

(The MIT License)

Copyright (C) 2011-2015 by Vitaly Puzrin

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

-----------------------------------------------------------------------

For dagre.min.js (storm-core/src/ui/public/js/)

Copyright (c) 2012-2014 Chris Pettitt

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

-----------------------------------------------------------------------

For cytoscape.min.js and cytoscape-dagre.js (storm-core/src/ui/public/js/)

Copyright (c) 2016 The Cytoscape Consortium

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the “Software”), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

-----------------------------------------------------------------------

For esprima.js (storm-core/src/ui/public/js/)

Copyright JS Foundation and other contributors, https://js.foundation/

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

-----------------------------------------------------------------------

For arbor.js and arbor-graphics.js (storm-core/src/ui/public/js/)

Copyright (c) 2011 Samizdat Drafting Co.
Expand Down Expand Up @@ -586,4 +686,4 @@ THE SOFTWARE.


This product bundles PMML Sample Files, which are available under a
"3-clause BSD" license. For details, see http://dmg.org/documents/dmg-pmml-license-2016.pdf.
"3-clause BSD" license. For details, see http://dmg.org/documents/dmg-pmml-license-2016.pdf.
2 changes: 1 addition & 1 deletion bin/storm.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[]
try:
ret = sub.check_output(all_args, stderr=sub.STDOUT)
print(ret)
except sub.CalledProcessor as e:
except sub.CalledProcessError as e:
sys.exit(e.returncode)
else:
os.execvp(JAVA_CMD, all_args)
Expand Down
11 changes: 11 additions & 0 deletions dev-tools/rc/download-rc-directory.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
#!/bin/bash
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

TARGET_URL=$1

Expand Down
11 changes: 11 additions & 0 deletions dev-tools/rc/verify-release-file.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
#!/bin/bash
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

TARGET_FILE=$1

Expand Down
22 changes: 22 additions & 0 deletions docs/storm-kafka-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -345,3 +345,25 @@ Currently the Kafka spout has has the following default values, which have shown
* offset.commit.period.ms = 30000 (30s)
* max.uncommitted.offsets = 10000000
<br/>

# Kafka AutoCommitMode

If reliability isn't important to you -- that is, you don't care about losing tuples in failure situations --, and want to remove the overhead of tuple tracking, then you can run a KafkaSpout with AutoCommitMode.

To enable it, you need to:
* set Config.TOPOLOGY_ACKERS to 0;
* enable *AutoCommitMode* in Kafka consumer configuration;

Here's one example to set AutoCommitMode in KafkaSpout:
```java
KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
.builder(String bootstrapServers, String ... topics)
.setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
.build();
```

*Note that it's not exactly At-Most-Once in Storm, as offset is committed periodically by Kafka consumer, some tuples could be replayed when KafkaSpout is crashed.*



Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@

package org.apache.storm.kafka.trident;

import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.spout.Func;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
Expand All @@ -36,6 +33,13 @@
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;

public class TridentKafkaClientWordCountNamedTopics {
private static final String TOPIC_1 = "test-trident";
private static final String TOPIC_2 = "test-trident-1";
Expand All @@ -45,11 +49,23 @@ private KafkaTridentSpoutOpaque<String, String> newKafkaTridentSpoutOpaque() {
return new KafkaTridentSpoutOpaque<>(newKafkaSpoutConfig());
}

private static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new JustValueFunc();

/**
* Needs to be serializable
*/
private static class JustValueFunc implements Func<ConsumerRecord<String, String>, List<Object>>, Serializable {
@Override
public List<Object> apply(ConsumerRecord<String, String> record) {
return new Values(record.value());
}
}

protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig() {
return KafkaSpoutConfig.builder(KAFKA_LOCAL_BROKER, TOPIC_1, TOPIC_2)
.setGroupId("kafkaSpoutTestGroup")
.setGroupId("kafkaSpoutTestGroup_" + System.nanoTime())
.setMaxPartitionFectchBytes(200)
.setRecordTranslator((r) -> new Values(r.value()), new Fields("str"))
.setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
.setRetry(newRetryService())
.setOffsetCommitPeriodMs(10_000)
.setFirstPollOffsetStrategy(EARLIEST)
Expand Down Expand Up @@ -77,7 +93,7 @@ protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyE

System.out.printf("Running with broker_url: [%s], topics: [%s, %s]\n", brokerUrl, topic1, topic2);

Config tpConf = LocalSubmitter.defaultConfig();
Config tpConf = LocalSubmitter.defaultConfig(true);

if (args.length == 4) { //Submit Remote
// Producers
Expand All @@ -102,11 +118,15 @@ protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyE
localSubmitter.submit(topic1Tp, tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic1));
localSubmitter.submit(topic2Tp, tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic2));
// Consumer
localSubmitter.submit(consTpName, tpConf, TridentKafkaConsumerTopology.newTopology(
localSubmitter.getDrpc(), newKafkaTridentSpoutOpaque()));
try {
localSubmitter.submit(consTpName, tpConf, TridentKafkaConsumerTopology.newTopology(
localSubmitter.getDrpc(), newKafkaTridentSpoutOpaque()));
// print
localSubmitter.printResults(15, 1, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}

// print
new DrpcResultsPrinter(localSubmitter.getDrpc()).printResults(60, 1, TimeUnit.SECONDS);
} finally {
// kill
localSubmitter.kill(topic1Tp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ public static LocalSubmitter newInstance() {
}

public static Config defaultConfig() {
return defaultConfig(false);
}

public static Config defaultConfig(boolean debug) {
final Config conf = new Config();
conf.setMaxSpoutPending(20);
conf.setMaxTaskParallelism(1);
conf.setNumWorkers(1);
conf.setDebug(debug);
return conf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

package org.apache.storm.kafka.trident;

import org.apache.storm.Config;
import org.apache.storm.LocalDRPC;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.starter.trident.DebugMemoryMapState;
import org.apache.storm.trident.Stream;
Expand All @@ -42,14 +40,6 @@
public class TridentKafkaConsumerTopology {
protected static final Logger LOG = LoggerFactory.getLogger(TridentKafkaConsumerTopology.class);

public static void submitRemote(String name, ITridentDataSource tridentSpout) {
try {
StormSubmitter.submitTopology(name, newTpConfig(), newTopology(null, tridentSpout));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* See {@link TridentKafkaConsumerTopology#newTopology(LocalDRPC, ITridentDataSource)}
*/
Expand Down Expand Up @@ -85,19 +75,11 @@ public boolean isKeep(TridentTuple tuple) {
}

private static TridentState addTridentState(TridentTopology tridentTopology, ITridentDataSource tridentSpout) {
final Stream spoutStream = tridentTopology.newStream("spout1", tridentSpout).parallelismHint(1);
final Stream spoutStream = tridentTopology.newStream("spout1", tridentSpout).parallelismHint(2);

return spoutStream.each(spoutStream.getOutputFields(), new Debug(true))
.each(new Fields("str"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new DebugMemoryMapState.Factory(), new Count(), new Fields("count"));
}

private static Config newTpConfig() {
Config conf = new Config();
conf.setMaxSpoutPending(20);
conf.setMaxTaskParallelism(1);
return conf;
}

}
Loading

0 comments on commit 95264d4

Please sign in to comment.