本篇内容主要讲解“Storm流方式的统计系统怎么实现”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Storm流方式的统计系统怎么实现”吧!
1: 初期硬件准备:
1 如果条件具备:请保证您安装好了 redis集群
2 配置好您的Storm开发环境
3 保证好您的开发环境的畅通: 主机与主机之间,Storm与redis之间
2:业务背景的介绍:
1 在这里我们将模拟一个 流方式的数据处理过程
2 数据的源头保存在我们的redis 集群之中
3 发射的数据格式为: ip,url,client_key
数据发射器
package storm.spout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Values;
import backtype.storm.tuple.Fields;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import redis.clients.jedis.Jedis;
import storm.utils.Conf;
import java.util.Map;
import org.apache.log4j.Logger;
/**
* click Spout 从redis中间读取所需要的数据
*/
public class ClickSpout extends BaseRichSpout {
private static final long serialVersionUID = -6200450568987812474L;
public static Logger LOG = Logger.getLogger(ClickSpout.class);
// 对于redis,我们使用的是jedis客户端
private Jedis jedis;
// 主机
private String host;
// 端口
private int port;
// Spout 收集器
private SpoutOutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
// 这里,我们发射的格式为
// IP,URL,CLIENT_KEY
outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.IP,
storm.cookbook.Fields.URL, storm.cookbook.Fields.CLIENT_KEY));
}
@Override
public void open(Map conf, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
host = conf.get(Conf.REDIS_HOST_KEY).toString();
port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
this.collector = spoutOutputCollector;
connectToRedis();
}
private void connectToRedis() {
jedis = new Jedis(host, port);
}
@Override
public void nextTuple() {
String content = jedis.rpop("count");
if (content == null || "nil".equals(content)) {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
}
} else {
// 将jedis对象 rpop出来的字符串解析为 json对象
JSONObject obj = (JSONObject) JSONValue.parse(content);
String ip = obj.get(storm.cookbook.Fields.IP).toString();
String url = obj.get(storm.cookbook.Fields.URL).toString();
String clientKey = obj.get(storm.cookbook.Fields.CLIENT_KEY)
.toString();
System.out.println("this is a clientKey");
// List<Object> tuple对象
collector.emit(new Values(ip, url, clientKey));
}
}
}
在这个过程之中,请注意:
1 我们在 OPEN 方法之中初始化 host,port,collector,以及Redis的连接,调用Connect方法并连接到redis数据库
2 我们在nextTupe 取出数据,并且将他转换为一个JSON对象,并且拿到 ip,url,clientKey,同时将他们包装成为一个
Values对象
让我们来看看数据的流向图:
在我们的数据从clickSpout 读取以后,接下来,我们将采用2个bolt
1 : repeatVisitBolt
2 : geographyBolt
共同来读取同一个数据源的数据:clickSpout
3 细细察看 repeatVisitBolt
package storm.bolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import redis.clients.jedis.Jedis;
import storm.utils.Conf;
import java.util.Map;
public class RepeatVisitBolt extends BaseRichBolt {
private OutputCollector collector;
private Jedis jedis;
private String host;
private int port;
@Override
public void prepare(Map conf, TopologyContext topologyContext,
OutputCollector outputCollector) {
this.collector = outputCollector;
host = conf.get(Conf.REDIS_HOST_KEY).toString();
port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
connectToRedis();
}
private void connectToRedis() {
jedis = new Jedis(host, port);
jedis.connect();
}
public boolean isConnected() {
if (jedis == null)
return false;
return jedis.isConnected();
}
@Override
public void execute(Tuple tuple) {
String ip = tuple.getStringByField(storm.cookbook.Fields.IP);
String clientKey = tuple
.getStringByField(storm.cookbook.Fields.CLIENT_KEY);
String url = tuple.getStringByField(storm.cookbook.Fields.URL);
String key = url + ":" + clientKey;
String value = jedis.get(key);
// redis中取,如果redis中没有,就插入新的一条访问记录。
if (value == null) {
jedis.set(key, "visited");
collector.emit(new Values(clientKey, url, Boolean.TRUE.toString()));
} else {
collector
.emit(new Values(clientKey, url, Boolean.FALSE.toString()));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(
storm.cookbook.Fields.CLIENT_KEY, storm.cookbook.Fields.URL,
storm.cookbook.Fields.UNIQUE));
}
}
在这里,我们把url 和 clientKey 组合成为 【url:clientKey】的格式组合,并依据这个对象,在redis中去查找,如果没有,那那Set到redis中间去,并且判定它为【unique】
4:
package storm.bolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class VisitStatsBolt extends BaseRichBolt {
private OutputCollector collector;
private int total = 0;
private int uniqueCount = 0;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
//在这里,我们在上游来判断这个Fields 是否是独特和唯一的
boolean unique = Boolean.parseBoolean(tuple.getStringByField(storm.cookbook.Fields.UNIQUE));
total++;
if(unique)uniqueCount++;
collector.emit(new Values(total,uniqueCount));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(storm.cookbook.Fields.TOTAL_COUNT,
storm.cookbook.Fields.TOTAL_UNIQUE));
}
}
第一次出现,uv ++
5 接下来,看看流水线2 :
package storm.bolt;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import org.json.simple.JSONObject;
import storm.cookbook.IPResolver;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* User: yin shaui Date: 2014/05/21 Time: 8:58 AM To change this template use
* File | Settings | File Templates.
*/
public class GeographyBolt extends BaseRichBolt {
// ip解析器
private IPResolver resolver;
private OutputCollector collector;
public GeographyBolt(IPResolver resolver) {
this.resolver = resolver;
}
@Override
public void prepare(Map map, TopologyContext topologyContext,
OutputCollector outputCollector) {
this.collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
// 1 从上级的目录之中拿到我们所要使用的ip
String ip = tuple.getStringByField(storm.cookbook.Fields.IP);
// 将ip 转换为json
JSONObject json = resolver.resolveIP(ip);
// 将 city和country 组织成为一个新的元祖,在这里也就是我们的Values对象
String city = (String) json.get(storm.cookbook.Fields.CITY);
String country = (String) json.get(storm.cookbook.Fields.COUNTRY_NAME);
collector.emit(new Values(country, city));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
// 确定了我们这次输出元祖的格式
outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.COUNTRY,
storm.cookbook.Fields.CITY));
}
}
以上Bolt,完成了一个Ip到 CITY,COUNTRY 的转换
package storm.bolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class GeoStatsBolt extends BaseRichBolt {
private class CountryStats {
//
private int countryTotal = 0;
private static final int COUNT_INDEX = 0;
private static final int PERCENTAGE_INDEX = 1;
private String countryName;
public CountryStats(String countryName) {
this.countryName = countryName;
}
private Map<String, List<Integer>> cityStats = new HashMap<String, List<Integer>>();
/**
* @param cityName
*/
public void cityFound(String cityName) {
countryTotal++;
// 已经有了值,一个加1的操作
if (cityStats.containsKey(cityName)) {
cityStats.get(cityName)
.set(COUNT_INDEX,
cityStats.get(cityName).get(COUNT_INDEX)
.intValue() + 1);
// 没有值的时候
} else {
List<Integer> list = new LinkedList<Integer>();
list.add(1);
list.add(0);
cityStats.put(cityName, list);
}
double percent = (double) cityStats.get(cityName).get(COUNT_INDEX)
/ (double) countryTotal;
cityStats.get(cityName).set(PERCENTAGE_INDEX, (int) percent);
}
/**
* @return 拿到的国家总数
*/
public int getCountryTotal() {
return countryTotal;
}
/**
* @param cityName 依据传入的城市名称,拿到城市总数
* @return
*/
public int getCityTotal(String cityName) {
return cityStats.get(cityName).get(COUNT_INDEX).intValue();
}
public String toString() {
return "Total Count for " + countryName + " is "
+ Integer.toString(countryTotal) + "\n" + "Cities: "
+ cityStats.toString();
}
}
private OutputCollector collector;
// CountryStats 是一个内部类的对象
private Map<String, CountryStats> stats = new HashMap<String, CountryStats>();
@Override
public void prepare(Map map, TopologyContext topologyContext,
OutputCollector outputCollector) {
this.collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
String country = tuple.getStringByField(storm.cookbook.Fields.COUNTRY);
String city = tuple.getStringByField(storm.cookbook.Fields.CITY);
// 如果国家不存在的时候,新增加一个国家,国家的统计
if (!stats.containsKey(country)) {
stats.put(country, new CountryStats(country));
}
// 这里拿到新的统计,cityFound 是拿到某个城市的值
stats.get(country).cityFound(city);
collector.emit(new Values(country,
stats.get(country).getCountryTotal(), city, stats.get(country)
.getCityTotal(city)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(
storm.cookbook.Fields.COUNTRY,
storm.cookbook.Fields.COUNTRY_TOTAL,
storm.cookbook.Fields.CITY, storm.cookbook.Fields.CITY_TOTAL));
}
}
有关地理位置的统计,附带上程序其他的使用类
package storm.cookbook;
/**
*/
public class Fields {
public static final String IP = "ip";
public static final String URL = "url";
public static final String CLIENT_KEY = "clientKey";
public static final String COUNTRY = "country";
public static final String COUNTRY_NAME = "country_name";
public static final String CITY = "city";
//唯一的,独一无二的
public static final String UNIQUE = "unique";
//城镇整数
public static final String COUNTRY_TOTAL = "countryTotal";
//城市整数
public static final String CITY_TOTAL = "cityTotal";
//总共计数
public static final String TOTAL_COUNT = "totalCount";
//总共独一无二的
public static final String TOTAL_UNIQUE = "totalUnique";
}
package storm.cookbook;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
public class HttpIPResolver implements IPResolver, Serializable {
static String url = "http://api.hostip.info/get_json.php";
@Override
public JSONObject resolveIP(String ip) {
URL geoUrl = null;
BufferedReader in = null;
try {
geoUrl = new URL(url + "?ip=" + ip);
URLConnection connection = geoUrl.openConnection();
in = new BufferedReader(new InputStreamReader(
connection.getInputStream()));
String inputLine;
JSONObject json = (JSONObject) JSONValue.parse(in);
in.close();
return json;
} catch (IOException e) {
e.printStackTrace();
} finally {
// 每当in为空的时候我们不进行如下的close操作,只有在in不为空的时候进行close操作
if (in != null) {
try {
in.close();
} catch (IOException e) {
}
}
}
return null;
}
}
package storm.cookbook;
import org.json.simple.JSONObject;
/**
* Created with IntelliJ IDEA.
* User: admin
* Date: 2012/12/07
* Time: 5:29 PM
* To change this template use File | Settings | File Templates.
*/
public interface IPResolver {
public JSONObject resolveIP(String ip);
}
到此,相信大家对“Storm流方式的统计系统怎么实现”有了更深的了解,不妨来实际操作一番吧!这里是天达云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!