这篇文章主要介绍Storm-kafka中如何封装DynamicBrokerReader类,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
在细节上把握 DynamicBrokerReder的封装类 - ZkBrokerReader
package com.mixbox.storm.kafka.trident;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mixbox.storm.kafka.DynamicBrokersReader;
import com.mixbox.storm.kafka.ZkHosts;
import java.util.Map;
/**
* 2014/07/22
* 在ZK中间拿到 GlobalPartitionInformation
*
* ZkBrokerReader 是对于DynamicBrokersReader的一个简单的封装
* @author Yin Shuai
*/
public class ZkBrokerReader implements IBrokerReader {
public static final Logger LOG = LoggerFactory
.getLogger(ZkBrokerReader.class);
GlobalPartitionInformation cachedBrokers;
DynamicBrokersReader reader;
long lastRefreshTimeMs;
long refreshMillis;
/**
*
* @param conf
* @param topic
* 指定topic的zkBrokerReader
* @param hosts
*/
public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) {
reader = new DynamicBrokersReader(conf, hosts.brokerZkStr,
hosts.brokerZkPath, topic);
cachedBrokers = reader.getBrokerInfo();
lastRefreshTimeMs = System.currentTimeMillis();
refreshMillis = hosts.refreshFreqSecs * 1000L;
}
@Override
public GlobalPartitionInformation getCurrentBrokers() {
long currTime = System.currentTimeMillis();
// 很简单, 指定了你多长时间开始去刷新Brokerlibiao
if (currTime > lastRefreshTimeMs + refreshMillis) {
LOG.info("brokers need refreshing because " + refreshMillis
+ "ms have expired");
cachedBrokers = reader.getBrokerInfo();
lastRefreshTimeMs = currTime;
}
return cachedBrokers;
}
@Override
public void close() {
reader.close();
}
}
总览我们的Code :
ZkBrokerReader 是对于 DynamicBrokersReader的一个简单封装,ZkBrokerReader之中持有2个主要的Class
1 GlobalPartitionInformatio cachedBroker;
2 DynamicBrokersReader reader;
3 long lastRefreshTimeMs; 最新的刷新时间
lastRefreshTimeMs = System.currentTimeMillis(); 最新的刷新时间为系统的当前时间
4 long refreshMillis
refreshMillis = host.refreshFreqSecs * 1000L 设定刷新的毫秒数为
5
public GlobalPartitionInformation getCurrentBrokers() {
long currTime = System.currentTimeMillis();
// 很简单, 指定了你多长时间开始去刷新Brokerlibiao
if (currTime > lastRefreshTimeMs + refreshMillis) {
LOG.info("brokers need refreshing because " + refreshMillis
+ "ms have expired");
cachedBrokers = reader.getBrokerInfo();
lastRefreshTimeMs = currTime;
}
return cachedBrokers;
}
每一次调用getCurrentBrokers,首先会取System.currentTimeMillis 当当前的系统时间超过了 最早的刷新时间+刷新
的间隔,就会再次的去跟新:
cachedBrokers = reader.getBrokerInfo(); getBrokerInfo()方法每调用一次,也就重新在zk之中重新去取
一次。
ZkBrokerReader是对于DynamicBrokerReader的一个封装,DynamicBrokerReader的Dynamic性质并不程序动态的因数,而只是简单在读取ZK数据的过程之中,Zk数据已经动态的发生变化?
以上是“Storm-kafka中如何封装DynamicBrokerReader类”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注天达云行业资讯频道!