今天就跟大家聊聊有关storm中如何自定义数据分组,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
数据流组
设计一个拓扑时,你要做的最重要的事情之一就是定义如何在各组件之间交换数据(数据流是如何被bolts消费的)。一个数据流组指定了每个bolt会消费哪些数据流,以及如何消费它们。
storm自带数据流组
随机数据流组
随机流组是最常用的数据流组。它只有一个参数(数据源组件),并且数据源会向随机选择的bolt发送元组,保证每个消费者收到近似数量的元组。
builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");
域数据流组
域数据流组允许你基于元组的一个或多个域控制如何把元组发送给bolts。它保证拥有相同域组合的值集发送给同一个bolt。回到单词计数器的例子,如果你用word域为数据流分组,word-normalizer bolt将只会把相同单词的元组发送给同一个word-counterbolt实例。
builder.setBolt("word-counter", new WordCounter(),2)
.fieldsGrouping("word-normalizer", new Fields("word"));
全部数据流组
全部数据流组,为每个接收数据的实例复制一份元组副本。这种分组方式用于向bolts发送信号。比如,你要刷新缓存,你可以向所有的bolts发送一个刷新缓存信号。在单词计数器的例子里,你可以使用一个全部数据流组,添加清除计数器缓存的功能
builder.setBolt("word-counter", new WordCounter(),2)
.fieldsGroupint("word-normalizer",new Fields("word"))
.allGrouping("signals-spout","signals");
直接数据流组
这是一个特殊的数据流组,数据源可以用它决定哪个组件接收元组
builder.setBolt("word-counter", new WordCounter(),2)
.directGrouping("word-normalizer");
。与前面的例子类似,数据源将根据单词首字母决定由哪个bolt接收元组。要使用直接数据流组,在WordNormalizer bolt中,使用emitDirect方法代替emit。
public void execute(Tuple input) {
...
for(String word : words){
if(!word.isEmpty()){
...
collector.emitDirect(getWordCountIndex(word),new Values(word));
}
}
//对元组做出应答
collector.ack(input);
}
public Integer getWordCountIndex(String word) {
word = word.trim().toUpperCase();
if(word.isEmpty()){
return 0;
}else{
return word.charAt(0) % numCounterTasks;
}
}
在prepare方法中计算任务数
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
this.numCounterTasks = context.getComponentTasks("word-counter");
}
全局数据流组
全局数据流组把所有数据源创建的元组发送给单一目标实例(即拥有最低ID的任务)。
不分组
这个数据流组相当于随机数据流组。也就是说,使用这个数据流组时,并不关心数据流是如何分组的。
自定义数据流组
storm自定义数据流组和hadoop Partitioner分组很相似,storm自定义分组要实现CustomStreamGrouping接口,接口源码如下:
public
interface
CustomStreamGrouping
extends
Serializable {
void
prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);
List<Integer> chooseTasks(
int
taskId, List<Object> values);
}
targetTasks就是Storm运行时告诉你,当前有几个目标Task可以选择,每一个都给编上了数字编号。而 chooseTasks(int taskId, List values); 就是让你选择,你的这条数据values,是要哪几个目标Task处理?
这是我写的一个自定义分组,总是把数据分到第一个Task:
public
class
MyFirstStreamGrouping
implements
CustomStreamGrouping {
private
static
Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping.
class
);
private
List<Integer> tasks;
@Override
public
void
prepare(WorkerTopologyContext context, GlobalStreamId stream,
List<Integer> targetTasks) {
this
.tasks = targetTasks;
log.info(tasks.toString());
}
@Override
public
List<Integer> chooseTasks(
int
taskId, List<Object> values) {
log.info(values.toString());
return
Arrays.asList(tasks.get(
0
));
}
}
从上面的代码可以看出,该自定义分组会把数据归并到第一个TaskArrays.asList(tasks.get(0));,也就是数据到达后总是被派发到第一组。和Hadoop不同的是,Storm允许一条数据被多个Task处理,因此返回值是List .就是让你来在提供的 'List targetTasks' Task中选择任意的几个(必须至少是一个)Task来处理数据。
第二个自定义分组,wordcount中使首字母相同的单词交给同一个bolt处理:
public class ModuleGrouping implements CustormStreamGrouping{
int numTasks = 0;
@Override
public List<Integer> chooseTasks(List<Object> values) {
List<Integer> boltIds = new ArrayList<Integer>();
if(values.size()>0){
String str = values.get(0).toString();
if(str.isEmpty()){
boltIds.add(0);
}else{
boltIds.add(str.charAt(0) % numTasks);
}
}
return boltIds;
}
@Override
public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) {
numTasks = targetTasks.size();
}
}
这是一个CustomStreamGrouping的简单实现,在这里我们采用单词首字母字符的整数值与任务数的余数,决定接收元组的bolt。
builder.setBolt("word-normalizer", new WordNormalizer())
.customGrouping("word-reader", new ModuleGrouping());
看完上述内容,你们对storm中如何自定义数据分组有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注天达云行业资讯频道,感谢大家的支持。