这篇文章将为大家详细讲解有关Storm如何和Kafka进行整合,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
对于Storm 如何和Kafka进行整合
package com.mixbox.storm.kafka;
import backtype.storm.Config;
import backtype.storm.metric.api.IMetric;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import kafka.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mixbox.storm.kafka.PartitionManager.KafkaMessageId;
import java.util.*;
/**
* @author Yin Shuai
*/
public class KafkaSpout extends BaseRichSpout {
public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
/**
* 内部类,Message和Offset的偏移量对象
*
* @author Yin Shuai
*/
public static class MessageAndRealOffset {
public Message msg;
public long offset;
public MessageAndRealOffset(Message msg, long offset) {
this.msg = msg;
this.offset = offset;
}
}
/**
* 发射的枚举类
* @author Yin Shuai
*/
static enum EmitState {
EMITTED_MORE_LEFT, EMITTED_END, NO_EMITTED
}
String _uuid = UUID.randomUUID().toString();
SpoutConfig _spoutConfig;
SpoutOutputCollector _collector;
// 分区的协调器,getMyManagedPartitions 拿到我所管理的分区
PartitionCoordinator _coordinator;
// 动态的分区链接:保存到kafka各个节点的连接,以及负责的topic的partition号码
DynamicPartitionConnections _connections;
// 提供了从zookeeper读写kafka 消费者信息的功能
ZkState _state;
// 上次更新的毫秒数
long _lastUpdateMs = 0;
// 当前的分区
int _currPartitionIndex = 0;
public KafkaSpout(SpoutConfig spoutConf) {
_spoutConfig = spoutConf;
}
@SuppressWarnings("unchecked")
@Override
public void open(Map conf, final TopologyContext context,
final SpoutOutputCollector collector) {
_collector = collector;
List<String> zkServers = _spoutConfig.zkServers;
// 初始化的时候如果zkServers 为空,那么初始化 默认的配置Zookeeper
if (zkServers == null) {
zkServers = new ArrayList<String>() {
{
add("192.168.50.144");
add("192.168.50.169");
add("192.168.50.207");
}
};
// zkServers =
// (List<String>)conf.get(Config.STORM_ZOOKEEPER_SERVERS);
System.out.println(" 使用的是Storm默认配置的Zookeeper List : " + zkServers);
}
Integer zkPort = _spoutConfig.zkPort;
// 在这里我们也同时 来检查zookeeper的端口是否为空
if (zkPort == null) {
zkPort = 2181;
// zkPort = ((Number)
// conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
}
Map stateConf = new HashMap(conf);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
// 通过保存的配置文件,我们持有了一个zookeeper的state,支持节点内容的创建和删除
_state = new ZkState(stateConf);
// 对于连接的维护
_connections = new DynamicPartitionConnections(_spoutConfig,
KafkaUtils.makeBrokerReader(conf, _spoutConfig));
// using TransactionalState like this is a hack
// 拿到总共的任务次数
int totalTasks = context
.getComponentTasks(context.getThisComponentId()).size();
// 判断当前的主机是否是静态的statichost
if (_spoutConfig.hosts instanceof StaticHosts) {
_coordinator = new StaticCoordinator(_connections, conf,
_spoutConfig, _state, context.getThisTaskIndex(),
totalTasks, _uuid);
// 当你拿到的spoutConfig是zkhost的时候
} else {
_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig,
_state, context.getThisTaskIndex(), totalTasks, _uuid);
}
context.registerMetric("kafkaOffset", new IMetric() {
KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(
_spoutConfig.topic, _connections);
@Override
public Object getValueAndReset() {
List<PartitionManager> pms = _coordinator
.getMyManagedPartitions();
Set<Partition> latestPartitions = new HashSet();
for (PartitionManager pm : pms) {
latestPartitions.add(pm.getPartition());
}
_kafkaOffsetMetric.refreshPartitions(latestPartitions);
for (PartitionManager pm : pms) {
_kafkaOffsetMetric.setLatestEmittedOffset(
pm.getPartition(), pm.lastCompletedOffset());
}
return _kafkaOffsetMetric.getValueAndReset();
}
}, _spoutConfig.metricsTimeBucketSizeInSecs);
context.registerMetric("kafkaPartition", new IMetric() {
@Override
public Object getValueAndReset() {
List<PartitionManager> pms = _coordinator
.getMyManagedPartitions();
Map concatMetricsDataMaps = new HashMap();
for (PartitionManager pm : pms) {
concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
}
return concatMetricsDataMaps;
}
}, _spoutConfig.metricsTimeBucketSizeInSecs);
}
@Override
public void close() {
_state.close();
}
@Override
public void nextTuple() {
// Storm-spout 是从kafka 消费数据,把 kafka 的 consumer
// 当成是一个spout,并且向其他的bolt的发送数据
// 拿到当前我管理的这些PartitionsManager
List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) {
// 对于每一个分区的 PartitionManager
// in case the number of managers decreased
// 当前的分区
_currPartitionIndex = _currPartitionIndex % managers.size();
// 拿到当前的分区,并且发送,这里把SpoutOutputCollector传递进去了,由他发射元祖
EmitState state = managers.get(_currPartitionIndex)
.next(_collector);
// 如果发送状态为:发送-还有剩余
if (state != EmitState.EMITTED_MORE_LEFT) {
_currPartitionIndex = (_currPartitionIndex + 1)
% managers.size();
}
// 如果发送的状态为: 发送-没有剩余
if (state != EmitState.NO_EMITTED) {
break;
}
}
long now = System.currentTimeMillis();
if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
commit();
}
}
@Override
public void ack(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
m.ack(id.offset);
}
}
@Override
public void fail(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
m.fail(id.offset);
}
}
@Override
public void deactivate() {
// 停止工作
commit();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
System.out.println(_spoutConfig.scheme.getOutputFields());
declarer.declare(_spoutConfig.scheme.getOutputFields());
}
private void commit() {
_lastUpdateMs = System.currentTimeMillis();
for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
manager.commit();
}
}
}
在粗浅的代码阅读之后,在这里进行详细的分析:
1 KafkaSpout之中持有了一个 MessageAndRealOffset 的内部类
public static class MessageAndRealOffset
{
public Message msg;
public long offset;
public MessageAndRealOffset(Message msg,long offset)
{
this.msg = msg;
this.offset = offset;
}
}
2 在Spout之中我们还持有了一个PartitionCoordinator的分区协调器,默认的情况我们实例化的对象
是ZKCoordinator
关于Storm如何和Kafka进行整合就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。