Storm如何和Kafka进行整合
更新:HHH   时间:2023-1-7


这篇文章将为大家详细讲解有关Storm如何和Kafka进行整合,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

 对于Storm 如何和Kafka进行整合

package com.mixbox.storm.kafka;

import backtype.storm.Config;
import backtype.storm.metric.api.IMetric;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import kafka.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mixbox.storm.kafka.PartitionManager.KafkaMessageId;

import java.util.*;

/**
 * @author Yin Shuai
 */

public class KafkaSpout extends BaseRichSpout {

	public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);

	/**
	 * 内部类,Message和Offset的偏移量对象
	 * 
	 * @author Yin Shuai
	 */

	public static class MessageAndRealOffset {
		public Message msg;
		public long offset;

		public MessageAndRealOffset(Message msg, long offset) {
			this.msg = msg;
			this.offset = offset;
		}
	}

	/**
	 * 发射的枚举类
	 * @author Yin Shuai
	 */
	static enum EmitState {
		EMITTED_MORE_LEFT, EMITTED_END, NO_EMITTED
	}

	String _uuid = UUID.randomUUID().toString();
	
	SpoutConfig _spoutConfig;
	
	SpoutOutputCollector _collector;

	// 分区的协调器,getMyManagedPartitions 拿到我所管理的分区
	PartitionCoordinator _coordinator;

	// 动态的分区链接:保存到kafka各个节点的连接,以及负责的topic的partition号码
	DynamicPartitionConnections _connections;

	// 提供了从zookeeper读写kafka 消费者信息的功能
	ZkState _state;

	// 上次更新的毫秒数
	long _lastUpdateMs = 0;

	// 当前的分区
	int _currPartitionIndex = 0;

	public KafkaSpout(SpoutConfig spoutConf) {
		_spoutConfig = spoutConf;
	}

	@SuppressWarnings("unchecked")
	@Override
	public void open(Map conf, final TopologyContext context,
			final SpoutOutputCollector collector) {
		_collector = collector;

		List<String> zkServers = _spoutConfig.zkServers;

		// 初始化的时候如果zkServers 为空,那么初始化 默认的配置Zookeeper
		if (zkServers == null) {

			zkServers = new ArrayList<String>() {

				{
					add("192.168.50.144");
					add("192.168.50.169");
					add("192.168.50.207");
				}
			};

			// zkServers =
			// (List<String>)conf.get(Config.STORM_ZOOKEEPER_SERVERS);
			System.out.println(" 使用的是Storm默认配置的Zookeeper List : " + zkServers);

		}

		Integer zkPort = _spoutConfig.zkPort;

		// 在这里我们也同时 来检查zookeeper的端口是否为空
		if (zkPort == null) {

			zkPort = 2181;
			// zkPort = ((Number)
			// conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
		}

		Map stateConf = new HashMap(conf);

		stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
		stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
		stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);

		// 通过保存的配置文件,我们持有了一个zookeeper的state,支持节点内容的创建和删除
		_state = new ZkState(stateConf);

		// 对于连接的维护
		_connections = new DynamicPartitionConnections(_spoutConfig,
				KafkaUtils.makeBrokerReader(conf, _spoutConfig));

		// using TransactionalState like this is a hack
		// 拿到总共的任务次数

		int totalTasks = context
				.getComponentTasks(context.getThisComponentId()).size();

		// 判断当前的主机是否是静态的statichost
		if (_spoutConfig.hosts instanceof StaticHosts) {
			_coordinator = new StaticCoordinator(_connections, conf,
					_spoutConfig, _state, context.getThisTaskIndex(),
					totalTasks, _uuid);

			// 当你拿到的spoutConfig是zkhost的时候
		} else {
			_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig,
					_state, context.getThisTaskIndex(), totalTasks, _uuid);
		}

		context.registerMetric("kafkaOffset", new IMetric() {
			KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(
					_spoutConfig.topic, _connections);

			@Override
			public Object getValueAndReset() {
				List<PartitionManager> pms = _coordinator
						.getMyManagedPartitions();
				Set<Partition> latestPartitions = new HashSet();
				for (PartitionManager pm : pms) {
					latestPartitions.add(pm.getPartition());
				}
				_kafkaOffsetMetric.refreshPartitions(latestPartitions);
				for (PartitionManager pm : pms) {
					_kafkaOffsetMetric.setLatestEmittedOffset(
							pm.getPartition(), pm.lastCompletedOffset());
				}
				return _kafkaOffsetMetric.getValueAndReset();
			}
		}, _spoutConfig.metricsTimeBucketSizeInSecs);

		context.registerMetric("kafkaPartition", new IMetric() {
			@Override
			public Object getValueAndReset() {
				List<PartitionManager> pms = _coordinator
						.getMyManagedPartitions();
				Map concatMetricsDataMaps = new HashMap();
				for (PartitionManager pm : pms) {
					concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
				}
				return concatMetricsDataMaps;
			}
		}, _spoutConfig.metricsTimeBucketSizeInSecs);
	}

	@Override
	public void close() {
		_state.close();
	}

	@Override
	public void nextTuple() {
		// Storm-spout 是从kafka 消费数据,把 kafka 的 consumer
		// 当成是一个spout,并且向其他的bolt的发送数据

		// 拿到当前我管理的这些PartitionsManager
		List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
		for (int i = 0; i < managers.size(); i++) {

			// 对于每一个分区的 PartitionManager

			// in case the number of managers decreased
			// 当前的分区

			_currPartitionIndex = _currPartitionIndex % managers.size();

			// 拿到当前的分区,并且发送,这里把SpoutOutputCollector传递进去了,由他发射元祖
			EmitState state = managers.get(_currPartitionIndex)
					.next(_collector);

			// 如果发送状态为:发送-还有剩余
			if (state != EmitState.EMITTED_MORE_LEFT) {
				_currPartitionIndex = (_currPartitionIndex + 1)
						% managers.size();
			}

			// 如果发送的状态为: 发送-没有剩余
			if (state != EmitState.NO_EMITTED) {
				break;
			}
		}

		long now = System.currentTimeMillis();
		if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
			commit();
		}
	}

	@Override
	public void ack(Object msgId) {
		KafkaMessageId id = (KafkaMessageId) msgId;
		PartitionManager m = _coordinator.getManager(id.partition);
		if (m != null) {
			m.ack(id.offset);
		}
	}

	@Override
	public void fail(Object msgId) {
		KafkaMessageId id = (KafkaMessageId) msgId;
		PartitionManager m = _coordinator.getManager(id.partition);
		if (m != null) {
			m.fail(id.offset);
		}
	}

	@Override
	public void deactivate() {
		// 停止工作
		commit();
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {

		System.out.println(_spoutConfig.scheme.getOutputFields());
		declarer.declare(_spoutConfig.scheme.getOutputFields());
	}

	private void commit() {
		_lastUpdateMs = System.currentTimeMillis();
		for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
			manager.commit();
		}
	}

}

       在粗浅的代码阅读之后,在这里进行详细的分析:

      1  KafkaSpout之中持有了一个 MessageAndRealOffset 的内部类

public static class MessageAndRealOffset
{
    public Message msg;
    
    public long offset;
    
    public MessageAndRealOffset(Message msg,long offset)
    {
        this.msg = msg;
        this.offset = offset;
    }
}

    2 在Spout之中我们还持有了一个PartitionCoordinator的分区协调器,默认的情况我们实例化的对象

是ZKCoordinator

    

关于Storm如何和Kafka进行整合就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

返回云计算教程...