这篇文章主要介绍“PartitionManager分区管理器怎么使用”,在日常操作中,相信很多人在PartitionManager分区管理器怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”PartitionManager分区管理器怎么使用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
阅读背景:对于java内部类有一个粗浅的认识
阅读目的:了解kafka 分区是如何在Storm接口之中进行管理的
最终主题:详尽的梳理PartitionManager的整个过程
package com.mixbox.storm.kafka;
import backtype.storm.Config;
import backtype.storm.metric.api.CombinedMetric;
import backtype.storm.metric.api.CountMetric;
import backtype.storm.metric.api.MeanReducer;
import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.spout.SpoutOutputCollector;
import com.google.common.collect.ImmutableMap;
import com.mixbox.storm.kafka.KafkaSpout.EmitState;
import com.mixbox.storm.kafka.KafkaSpout.MessageAndRealOffset;
import com.mixbox.storm.kafka.trident.MaxMetric;
import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
* 分区的管理器
*
* @author Yin Shuai
*
*/
public class PartitionManager {
public static final Logger LOG = LoggerFactory
.getLogger(PartitionManager.class);
private final CombinedMetric _fetchAPILatencyMax;
private final ReducedMetric _fetchAPILatencyMean;
private final CountMetric _fetchAPICallCount;
private final CountMetric _fetchAPIMessageCount;
/**
* kafka MessageID 封装了 partition 和offset
*
* @author Yin Shuai
*/
static class KafkaMessageId {
public Partition partition;
public long offset;
public KafkaMessageId(Partition partition, long offset) {
this.partition = partition;
this.offset = offset;
}
}
// 被发送的偏移量
Long _emittedToOffset;
SortedSet<Long> _pending = new TreeSet<Long>();
// 已经提交的
Long _committedTo;
// 等待去发射
LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>();
// 分区
Partition _partition;
// Storm Spout的配置文件
SpoutConfig _spoutConfig;
// topology 的实例ID
String _topologyInstanceId;
// kafka 底层的消费者ID
SimpleConsumer _consumer;
// 动态的分区Connection
DynamicPartitionConnections _connections;
//ZKState 状态的维护
ZkState _state;
//Storm的配置文件
Map _stormConf;
//
@SuppressWarnings("unchecked")
public PartitionManager(DynamicPartitionConnections connections,
String topologyInstanceId, ZkState state, Map stormConf,
SpoutConfig spoutConfig, Partition id) {
_partition = id;
_connections = connections;
_spoutConfig = spoutConfig;
_topologyInstanceId = topologyInstanceId;
_consumer = connections.register(id.host, id.partition);
_state = state;
_stormConf = stormConf;
String jsonTopologyId = null;
Long jsonOffset = null;
String path = committedPath();
try {
Map<Object, Object> json = _state.readJSON(path);
LOG.info("Read partition information from: " + path + " --> "
+ json);
if (json != null) {
jsonTopologyId = (String) ((Map<Object, Object>) json
.get("topology")).get("id");
jsonOffset = (Long) json.get("offset");
}
} catch (Throwable e) {
LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
}
if (jsonTopologyId == null || jsonOffset == null) { // failed to parse
// JSON?
_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic,
id.partition, spoutConfig);
LOG.info("No partition information found, using configuration to determine offset");
} else if (!topologyInstanceId.equals(jsonTopologyId)
&& spoutConfig.forceFromStart) {
_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic,
id.partition, spoutConfig.startOffsetTime);
LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
} else {
_committedTo = jsonOffset;
LOG.info("Read last commit offset from zookeeper: " + _committedTo
+ "; old topology_id: " + jsonTopologyId
+ " - new topology_id: " + topologyInstanceId);
}
LOG.info("Starting " + _partition + " from offset " + _committedTo);
_emittedToOffset = _committedTo;
_fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
_fetchAPILatencyMean = new ReducedMetric(new MeanReducer());
_fetchAPICallCount = new CountMetric();
_fetchAPIMessageCount = new CountMetric();
}
public Map getMetricsDataMap() {
Map ret = new HashMap();
ret.put(_partition + "/fetchAPILatencyMax",
_fetchAPILatencyMax.getValueAndReset());
ret.put(_partition + "/fetchAPILatencyMean",
_fetchAPILatencyMean.getValueAndReset());
ret.put(_partition + "/fetchAPICallCount",
_fetchAPICallCount.getValueAndReset());
ret.put(_partition + "/fetchAPIMessageCount",
_fetchAPIMessageCount.getValueAndReset());
return ret;
}
// returns false if it's reached the end of current batch
public EmitState next(SpoutOutputCollector collector) {
//等待去发送的 为空了。
if (_waitingToEmit.isEmpty()) {
// 开始装载
fill();
}
while (true) {
//检索并移除List中间的第一个元素
MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
//要发送的为空的时候, 没有发生的
if (toEmit == null) {
return EmitState.NO_EMITTED;
}
// 这里的tups
Iterable<List<Object>> tups = KafkaUtils.generateTuples(
_spoutConfig, toEmit.msg);
if (tups != null) {
for (List<Object> tup : tups) {
collector.emit(tup, new KafkaMessageId(_partition,
toEmit.offset));
}
break;
} else {
ack(toEmit.offset);
}
}
if (!_waitingToEmit.isEmpty()) {
return EmitState.EMITTED_MORE_LEFT;
} else {
return EmitState.EMITTED_END;
}
}
/**
* 填充的行为
* 这里真正的决定了你有哪些数据需要去发送
*/
private void fill() {
long start = System.nanoTime();
/*
* 拿到MessageSet
*/
ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig,
_consumer, _partition, _emittedToOffset);
long end = System.nanoTime();
long millis = (end - start) / 1000000;
_fetchAPILatencyMax.update(millis);
_fetchAPILatencyMean.update(millis);
_fetchAPICallCount.incr();
int numMessages = countMessages(msgs);
_fetchAPIMessageCount.incrBy(numMessages);
if (numMessages > 0) {
LOG.info("Fetched " + numMessages + " messages from: " + _partition);
}
for (MessageAndOffset msg : msgs) {
_pending.add(_emittedToOffset);
_waitingToEmit.add(new MessageAndRealOffset(msg.message(),
_emittedToOffset));
_emittedToOffset = msg.nextOffset();
}
if (numMessages > 0) {
LOG.info("Added " + numMessages + " messages from: " + _partition
+ " to internal buffers");
}
}
private int countMessages(ByteBufferMessageSet messageSet) {
int counter = 0;
for (MessageAndOffset messageAndOffset : messageSet) {
counter = counter + 1;
}
return counter;
}
public void ack(Long offset) {
_pending.remove(offset);
}
public void fail(Long offset) {
// TODO: should it use in-memory ack set to skip anything that's been
// acked but not committed???
// things might get crazy with lots of timeouts
if (_emittedToOffset > offset) {
_emittedToOffset = offset;
_pending.tailSet(offset).clear();
}
}
public void commit() {
// 最新完成的偏移量
long lastCompletedOffset = lastCompletedOffset();
//写最新的完全的偏移量到zk,的某个分区,到某一个topology
if (lastCompletedOffset != lastCommittedOffset()) {
LOG.info("Writing last completed offset (" + lastCompletedOffset
+ ") to ZK for " + _partition + " for topology: "
+ _topologyInstanceId);
Map<Object, Object> data = ImmutableMap
.builder()
.put("topology",
ImmutableMap.of("id", _topologyInstanceId, "name",
_stormConf.get(Config.TOPOLOGY_NAME)))
.put("offset", lastCompletedOffset)
.put("partition", _partition.partition)
.put("broker",
ImmutableMap.of("host", _partition.host.host,
"port", _partition.host.port))
.put("topic", _spoutConfig.topic).build();
// 直接JSON 写入
_state.writeJSON(committedPath(), data);
_committedTo = lastCompletedOffset;
LOG.info("Wrote last completed offset (" + lastCompletedOffset
+ ") to ZK for " + _partition + " for topology: "
+ _topologyInstanceId);
} else {
LOG.info("No new offset for " + _partition + " for topology: "
+ _topologyInstanceId);
}
}
//提交的路径
private String committedPath() {
return "/" + _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/"
+ _partition.getId();
}
//拿到最新的分区便宜量
public long queryPartitionOffsetLatestTime() {
return KafkaUtils.getOffset(_consumer, _spoutConfig.topic,
_partition.partition, OffsetRequest.LatestTime());
}
//最新的提交的便宜量
public long lastCommittedOffset() {
return _committedTo;
}
public long lastCompletedOffset() {
if (_pending.isEmpty()) {
return _emittedToOffset;
} else {
return _pending.first();
}
}
//拿到最新的分区
public Partition getPartition() {
return _partition;
}
public void close() {
_connections.unregister(_partition.host, _partition.partition);
}
}
1 PartitionManager封装了一个Static 的class kafkaMessageId,并且封装了某个分区和偏移量
static class KafkaMessageId {
public Partition partition;
public long offset;
public KafkaMessageId(Partition partition, long offset) {
this.partition = partition;
this.offset = offset;
}
}
2: 在PartitionManager中同时持有了一下的实例变量:
2.1 已经发射的数据 pending
2.2 已经提交的 committedTo
2.3 等待去发射的 _waitingToEmit
2.4 具体的分区 _partition
其中 _waitingToEmit 是一个LinkedList<MessageAndRealOffset>
3 : PartitionManager 在初始化的时候,需要传递的参数是
topologyInstanceId
DynamicPartitionConnections
ZkState
Map
SpoutConfig
Partition
SimpleConsumer 对象,SimpleConsumer对象将在 DynamicPartitionConnections中
通过register的方法进行注册
到此,关于“PartitionManager分区管理器怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注天达云网站,小编会继续努力为大家带来更多实用的文章!