This repository has been archived by the owner on Jun 16, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
调度定制化接口(0.9.4.1 及以下版本)
Longda edited this page Sep 1, 2014
·
1 revision
从JStorm 0.9.0 开始, JStorm 提供非常强大的调度功能, 基本上可以满足大部分的需求。
在学习如何使用新调度前, 麻烦先学习 JStorm 0.9.0介绍 提供哪些功能
JStorm的资源不在是以前worker单独的一个端口, 而是以4个维度展现,CPU/Memory/Disk/Net
默认一个task,一个cpu slot 当task消耗更多的cpu时,可以申请更多cpu slot。 示例代码如下:
int boltParal = get("bolt.parallel", 1);
BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
boltParal).noneGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
Map totalBoltConf = new HashMap();
ConfigExtension.setCpuSlotsPerTask(totalBoltConf, 3); // 申请每个task 3个cpu slot
totalBolt.addConfigurations(totalBoltConf);
默认一个task,一个memory slot 当task消耗更多的memory时,可以申请更多memory slot。 示例代码如下:
int boltParal = get("bolt.parallel", 1);
BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
boltParal).noneGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
Map totalBoltConf = new HashMap();
ConfigExtension.setMemSlotPerTask(totalBoltConf, 3); // 申请每个task 3个memory slot
totalBolt.addConfigurations(totalBoltConf);
默认task不申请磁盘slot,但当task 磁盘IO较重时,可以申请Disk slot 当task消耗更多的disk时,可以申请更多disk slot。 示例代码如下:
int boltParal = get("bolt.parallel", 1);
BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
boltParal).noneGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
Map totalBoltConf = new HashMap();
ConfigExtension.setTaskAllocDisk(totalBoltConf, true); // 申请磁盘slot
totalBolt.addConfigurations(totalBoltConf);
一旦分配成功, 在task的prepare里面即可 获得分配的disk slot目录
public class TotalCount implements IRichBolt {
....
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
//...
String diskSlot = ConfigExtension.getTaskAssignDiskSlot(stormConf);
//...
}
可以强制某个component的task 运行在不同的节点上
int boltParal = get("bolt.parallel", 1);
BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
boltParal).noneGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
Map totalBoltConf = new HashMap();
ConfigExtension.setTaskOnDifferentNode(totalBoltConf, true); // 申请task 运行在不同的节点上
totalBolt.addConfigurations(totalBoltConf);
对于一些小应用,可以强制这个topology的所有task运行在同一个节点上,从而节省网络开销
Config.setNumAckers(conf, ackerParal);
ConfigExtension.setUseSingleNode(conf, true); // 强制所有的task 运行在同一个节点上
StormSubmitter.submitTopology(streamName, conf,
builder.createTopology());
在某些情况下,可以自定义某个component的task分配到某些特定机器的特定端口,当指定的端口被占时或指定的机器资源不够时,nimbus会降级默认分配算法
Nimbus 分配算法如下:
- 优先使用自定义任务分配, 当资源无法满足需求时,该任务放到下一级任务分配算法
- 使用历史任务分配算法,如果打开使用历史任务属性开关后,则使用该算法, 当资源无法满足需求时,该任务放到下一级任务分配算法。
- 使用默认资源平衡算法, 计算每个supervisor上剩余资源权值, 取权值最高的supervisor进行分配。
示例代码如下:
int boltParal = get("bolt.parallel", 1);
BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
boltParal).noneGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
Map totalBoltConf = new HashMap();
List<ResourceAssignment> userDefineAssignments = new ArrayList<ResourceAssignment>();
for (int i = 0, base = 150; i < boltParal; i++, base++) {
ResourceAssignment assign = new ResourceAssignment();
assign.setCpuSlotNum(2);
assign.setMemSlotNum(2);
assign.setPort(6800 + i);
assign.setHostname("free-56-151.shucang.alipay.net"); //
userDefineAssignments.add(assign);
}
ConfigExtension.setUserDefineAssignment(totalBoltConf, userDefineAssignments); //申请使用自定义资源
totalBolt.addConfigurations(totalBoltConf);
可以预约上一次成功运行时的任务分配,上次task分配了什么资源,这次还是使用这些资源。 如果上次的资源被占或不能满足时,使用默认分配算法。
int boltParal = get("bolt.parallel", 1);
BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
boltParal).noneGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
Map totalBoltConf = new HashMap();
ConfigExtension.setUseOldAssignment(totalBoltConf, true); // 申请task 运行使用上一次资源
totalBolt.addConfigurations(totalBoltConf);
Config.setNumAckers(conf, ackerParal);
ConfigExtension.setUseOldAssignment(totalBoltConf, true); // 对整个topology所有task 优先使用上次资源
StormSubmitter.submitTopology(streamName, conf,
builder.createTopology());