小编给大家分享一下Storm-kafka中如何实现一个对于kafkaBroker动态读取的Class,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
实现一个对于kafkaBroker 动态读取的Class - DynamicBrokersReader
DynamicBrokersReader
package com.mixbox.storm.kafka;
import backtype.storm.Config;
import backtype.storm.utils.Utils;
import com.mixbox.storm.kafka.trident.GlobalPartitionInformation;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.retry.RetryNTimes;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
/**
* 动态的Broker读 我们维护了有一个与zk之间的连接,提供了获取指定topic的每一个partition正在活动着的leader所对应的broker
* 这样你有能力知道,当前的这些topic,哪一些broker是活动的 * @author Yin Shuai
*/
public class DynamicBrokersReader {
public static final Logger LOG = LoggerFactory
.getLogger(DynamicBrokersReader.class);
// 对于Client CuratorFrameWork的封装
private CuratorFramework _curator;
// 在Zk上注册的位置
private String _zkPath;
// 指定的_topic
private String _topic;
public DynamicBrokersReader(Map conf, String zkStr, String zkPath,
String topic) {
_zkPath = zkPath;
_topic = topic;
try {
_curator = CuratorFrameworkFactory
.newClient(
zkStr,
Utils.getInt(conf
.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
15000,
new RetryNTimes(
Utils.getInt(conf
.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
Utils.getInt(conf
.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
_curator.start();
}
public DynamicBrokersReader(String zkPath) {
this._zkPath = zkPath;
}
/**
* 确定指定topic下,每一个partition的leader,所对应的 主机和端口, 并将它们存入到全部分区信息中
*
*/
public GlobalPartitionInformation getBrokerInfo() {
GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
try {
// 拿到当前的分区数目
int numPartitionsForTopic = getNumPartitions();
/**
* /brokers/ids
*/
String brokerInfoPath = brokerPath();
// 默认的我们的分区数目就只有 0, 1 两个
for (int partition = 0; partition < numPartitionsForTopic; partition++) {
// 这里请主要参考分区和领导者的关系
int leader = getLeaderFor(partition);
// 拿到领导者以后的zookeeper路径
String leaderPath = brokerInfoPath + "/" + leader;
try {
byte[] brokerData = _curator.getData().forPath(leaderPath);
/**
* 在这里, 我们拿到的brokerData为:
* {"jmx_port":-1,"timestamp":"1403076810435"
* ,"host":"192.168.50.207","version":1,"port":9092} 注意
* 这里是字节数组开始转json
*/
Broker hp = getBrokerHost(brokerData);
/**
* 记录好 每一个分区 partition 所对应的 Broker
*/
globalPartitionInformation.addPartition(partition, hp);
} catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
LOG.error("Node {} does not exist ", leaderPath);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
LOG.info("Read partition info from zookeeper: "
+ globalPartitionInformation);
return globalPartitionInformation;
}
/**
* @return 拿到指定topic下的分区数目
*/
private int getNumPartitions() {
try {
String topicBrokersPath = partitionPath();
List<String> children = _curator.getChildren().forPath(
topicBrokersPath);
return children.size();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* @return 拿到的topic在zookeeper注册的分区地址
* brokers/topics/storm-sentence/partitions
*/
public String partitionPath() {
return _zkPath + "/topics/" + _topic + "/partitions";
}
/**
* 持有的是Broker节点的id号码,这个id号是在配置的过程中为每一个Broker分配的
* @return /brokers/ids
*/
public String brokerPath() {
return _zkPath + "/ids";
}
/**
* get /brokers/topics/distributedTopic/partitions/1/state {
* "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1,
* "version":1 }
*
* 说明一下,在kafka之中,每一个分区都会有一个Leader,有0个或者多个的followers, 一个leader会处理这个分区的所有请求
* @param partition
* @return
*/
private int getLeaderFor(long partition) {
try {
String topicBrokersPath = partitionPath();
byte[] hostPortData = _curator.getData().forPath(
topicBrokersPath + "/" + partition + "/state");
@SuppressWarnings("unchecked")
Map<Object, Object> value = (Map<Object, Object>) JSONValue
.parse(new String(hostPortData, "UTF-8"));
Integer leader = ((Number) value.get("leader")).intValue();
return leader;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void close() {
_curator.close();
}
/**
* [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0 {
* "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }
*
*
* @param contents
* @return
*/
private Broker getBrokerHost(byte[] contents) {
try {
@SuppressWarnings("unchecked")
Map<Object, Object> value = (Map<Object, Object>) JSONValue
.parse(new String(contents, "UTF-8"));
String host = (String) value.get("host");
Integer port = ((Long) value.get("port")).intValue();
return new Broker(host, port);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
}
对于以上代码须知:
1 : 我们持有了一个ZkPath , 在Storm-kafka的class之中我们默认的是/brokers
2 : _topic , 目前我们是针对的是Topic, 也就是说我们的partition,leader都是针对于单个Topic的
3:
1 int numPartitionsForTopic = getNumPartitions();
针对与一个Topic,首先我们要取当前的分区数,一般的情况,我们在kafka之中默认的分区数为2
2 String brokerInfoPath = brokerPath();
拿到 /brokers/ids 的分区号
3: for (int partition = 0; partition < numPartitionsForTopic; partition++) {
依次的遍历每一个分区
4:int leader = getLeaderFor(partition);String leaderPath = brokerInfoPath + "/" + leader;byte[] brokerData = _curator.getData().forPath(leaderPath);
再通过分区拿到领导者,以及领导者的路径,最后拿到领导者的数据:
我们举一个小例子
* 在这里, 我们拿到的brokerData为:
* {"jmx_port":-1,"timestamp":"1403076810435"
* ,"host":"192.168.50.207","version":1,"port":9092}
4:Broker hp = getBrokerHost(brokerData);
拿到某一个Topic自己的分区在kafka所对应的Broker,并且其封装到 globalPartitionInformation
5 globalPartitionInformation.addPartition(partition, hp);
GlobalPartitionInformaton底层维护了一个HashMap
简单的来说:DynamicBrokersReader 针对某一个Topic维护了 每一个分区 partition 所对应的 Broker
以上是“Storm-kafka中如何实现一个对于kafkaBroker动态读取的Class”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注天达云行业资讯频道!