本篇内容介绍了“storm实时排序TopN怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
阅读背景:1 您需要了解TOP 使用的场景
2 您需要了解当前的TOPN 处理,和定时区间处理的区别
看代码说话
package com.cc.storm;
import com.cc.storm.bolt.MergeBolt;
import com.cc.storm.bolt.RankBolt;
import com.cc.storm.bolt.RollingAllCountBolt;
import com.cc.storm.bolt.RollingCountBolt;
import com.cc.storm.spout.RandomEmitSpout;
import com.cc.storm.spout.RedisPubSubSpout;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
/**
* ToPN是一种常见模式,是对流式数据进行“Streaming topN”的计算:
* 比如要计算的是最近一段时间内的热门话题,热门点击图片,热门商品浏览,热门商品购买
*
* 既然敢要实时的处理,【】【】【】【】【】[] 【】 【】【】【】【】 [] 【】【】【】【】 【】 []
*
* @author Yin Shuai
*/
public class TOP10 {
public static void main(String[] args) throws AlreadyAliveException,
InvalidTopologyException, InterruptedException {
final int TOP_N = 10;
final int time = 1;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("$datasource$", new RandomEmitSpout(), 1);
builder.setBolt("$count$", new RollingCountBolt(3, time), 1)
.fieldsGrouping("$datasource$", new Fields("merchandiseIDS"));
builder.setBolt("$rank$", new RankBolt(TOP_N), 2).fieldsGrouping(
"$count$", new Fields("merchandiseID"));
builder.setBolt("$merge$", new MergeBolt(TOP_N)).globalGrouping(
"$rank$");
Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(2);
conf.setMaxSpoutPending(5000);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf,
builder.createTopology());
Thread.sleep(5000);
}
}
整个处理的流程如图:
“storm实时排序TopN怎么使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注天达云网站,小编将为大家输出更多高质量的实用文章!