storm中如何自定义数据分组
更新:HHH   时间:2023-1-7


今天就跟大家聊聊有关storm中如何自定义数据分组,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

数据流组

设计一个拓扑时,你要做的最重要的事情之一就是定义如何在各组件之间交换数据(数据流是如何被bolts消费的)。一个数据流组指定了每个bolt会消费哪些数据流,以及如何消费它们。 

storm自带数据流组

随机数据流组

随机流组是最常用的数据流组。它只有一个参数(数据源组件),并且数据源会向随机选择的bolt发送元组,保证每个消费者收到近似数量的元组。

 builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");

 域数据流组

域数据流组允许你基于元组的一个或多个域控制如何把元组发送给bolts。它保证拥有相同域组合的值集发送给同一个bolt。回到单词计数器的例子,如果你用word域为数据流分组,word-normalizer bolt将只会把相同单词的元组发送给同一个word-counterbolt实例。

 builder.setBolt("word-counter", new WordCounter(),2)
           .fieldsGrouping("word-normalizer", new Fields("word"));

全部数据流组

全部数据流组,为每个接收数据的实例复制一份元组副本。这种分组方式用于向bolts发送信号。比如,你要刷新缓存,你可以向所有的bolts发送一个刷新缓存信号。在单词计数器的例子里,你可以使用一个全部数据流组,添加清除计数器缓存的功能 

builder.setBolt("word-counter", new WordCounter(),2)
           .fieldsGroupint("word-normalizer",new Fields("word"))
           .allGrouping("signals-spout","signals");

直接数据流组

这是一个特殊的数据流组,数据源可以用它决定哪个组件接收元组

 builder.setBolt("word-counter", new WordCounter(),2)
           .directGrouping("word-normalizer");

。与前面的例子类似,数据源将根据单词首字母决定由哪个bolt接收元组。要使用直接数据流组,在WordNormalizer bolt中,使用emitDirect方法代替emit。

public void execute(Tuple input) {
        ...
        for(String word : words){
            if(!word.isEmpty()){
                ...
                collector.emitDirect(getWordCountIndex(word),new Values(word));
            }
        }
        //对元组做出应答
        collector.ack(input);
    }
    public Integer getWordCountIndex(String word) {
        word = word.trim().toUpperCase();
        if(word.isEmpty()){
            return 0;
        }else{
            return word.charAt(0) % numCounterTasks;
        }
    }

在prepare方法中计算任务数

 public void prepare(Map stormConf, TopologyContext context, 
                OutputCollector collector) {
        this.collector = collector;
        this.numCounterTasks = context.getComponentTasks("word-counter");
    }

全局数据流组

全局数据流组把所有数据源创建的元组发送给单一目标实例(即拥有最低ID的任务)。

不分组

这个数据流组相当于随机数据流组。也就是说,使用这个数据流组时,并不关心数据流是如何分组的。

自定义数据流组

storm自定义数据流组和hadoop Partitioner分组很相似,storm自定义分组要实现CustomStreamGrouping接口,接口源码如下:

public   interface   CustomStreamGrouping  extends   Serializable {
 
    void   prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);
 
    List<Integer> chooseTasks( int   taskId, List<Object> values);
}

targetTasks就是Storm运行时告诉你,当前有几个目标Task可以选择,每一个都给编上了数字编号。而 chooseTasks(int taskId, List values); 就是让你选择,你的这条数据values,是要哪几个目标Task处理?

这是我写的一个自定义分组,总是把数据分到第一个Task:

public   class   MyFirstStreamGrouping  implements   CustomStreamGrouping {
     private   static   Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping. class );
 
     private   List<Integer> tasks;
 
     @Override
     public   void   prepare(WorkerTopologyContext context, GlobalStreamId stream,
         List<Integer> targetTasks) {
         this .tasks = targetTasks;
         log.info(tasks.toString());
     }  
     @Override
     public   List<Integer> chooseTasks( int   taskId, List<Object> values) {
         log.info(values.toString());
         return   Arrays.asList(tasks.get( 0 ));
     }
}

从上面的代码可以看出,该自定义分组会把数据归并到第一个TaskArrays.asList(tasks.get(0));,也就是数据到达后总是被派发到第一组。和Hadoop不同的是,Storm允许一条数据被多个Task处理,因此返回值是List .就是让你来在提供的 'List targetTasks' Task中选择任意的几个(必须至少是一个)Task来处理数据。

第二个自定义分组,wordcount中使首字母相同的单词交给同一个bolt处理:

public class ModuleGrouping implements CustormStreamGrouping{
        int numTasks = 0;
        @Override
        public List<Integer> chooseTasks(List<Object> values) {
            List<Integer> boltIds = new ArrayList<Integer>();
            if(values.size()>0){
                String str = values.get(0).toString();
                if(str.isEmpty()){
                    boltIds.add(0);
                }else{
                    boltIds.add(str.charAt(0) % numTasks);
                }
            }
            return boltIds;
        }
        @Override
        public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) {
            numTasks = targetTasks.size();
        }
    }

这是一个CustomStreamGrouping的简单实现,在这里我们采用单词首字母字符的整数值与任务数的余数,决定接收元组的bolt。

builder.setBolt("word-normalizer", new WordNormalizer())
           .customGrouping("word-reader", new ModuleGrouping());

看完上述内容,你们对storm中如何自定义数据分组有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注天达云行业资讯频道,感谢大家的支持。

返回云计算教程...