-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
executable file
·99 lines (65 loc) · 2.51 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import debuglog from 'debuglog';
import Bolt from './lib/bolt';
import Spout from './lib/spout';
import Dispatcher from './lib/dispatcher';
import LocalCluster from './lib/local_cluster';
import StormSubmitter from './lib/storm_submitter';
import TopologyBuilder from './lib/topology_builder';
const debug = debuglog('cyclone');
const DEFAULT_OPTIONS = {
name: 'topology',
config: {}
};
export default {
Bolt,
Spout,
Dispatcher,
TopologyBuilder,
StormSubmitter,
LocalCluster,
run(builder, options = { nimbus: {}, config: {}}) {
options = Object.assign(DEFAULT_OPTIONS, options);
let argv = process.argv.slice(2);
if (argv.length) {
let topology = builder.createTopology();
if (argv[0] === '--local') {
// OPTION 1: Called locally to start a LocalCluster
let cluster = new LocalCluster();
cluster.submitTopology(topology, options);
cluster.on('connected', () => {
debug('Connected');
process.once('SIGINT', function () {
cluster.shutdown();
});
});
cluster.on('error', err => {
debug(err.stack);
});
cluster.on('exit', () => {
debug('Local cluster exited.');
process.exit();
});
} else {
// OPTION 2: Called by `storm shell`
let [ host, port, uploadedJarLocation ] = argv;
let nimbus = { host, port };
debug(`Submitting '${options.name}' (${uploadedJarLocation}) to ${host}:${port}.`);
StormSubmitter.submitTopology(options.name, uploadedJarLocation, { nimbus, config: options.config }, topology, function (err, _) {
if (err) {
throw err;
}
debug(`Topology '${options.name}' submitted.`);
});
}
} else {
// OPTION 3: Invoked as a task.
let dispatcher = new Dispatcher(function (conf, context, done) {
let id = context['task->component'][context.taskid];
let component = builder.bolts[id] || builder.spouts[id] || builder.state_spouts[id];
component.attach(this);
component.initialize(conf, context, done);
});
dispatcher.run();
}
}
}