Kafka网络引擎的核心字段及初始化是什么样的,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
Kafka的网络层采用多线程,多个Selector的设计,这跟RMNT的思路差不多(Redis特殊一些,后面不再强调这1点)。
核心类是SocketServer,包含1个Acceptor用于接受新连接,每个Acceptor对应多个Processor线程,每个Processor线程拥有自己独立的Selector.主要用于从连接中读取请求和写回响应。
每个Processor拥有自己的Selector,这样才可以将某个socket限制在自己的处理范围内,直到这个Socket的生命周期结束。
而且不但读取请求是这个Processor的Selector线程处理,写回响应也是它处理,
一言以蔽之:我会对你负责到底的!!! :)
其实想想也正常,总不可能同1个 socket一会被Selector A处理,一会被B处理,这就乱套了,还是稳定起来好!
每个Acceptor也对应多个Handle线程,一般业内称之为业务处理线程池
所以如果你去别的单位面试,
问你Netty如何处理耗时业务的,你不说要新开一个业务线程池,相信我,面试官内心会把你鄙视一顿的 :)
千万别说Netty的业务处理跟IO线程池在一个线程处理的,绝对要丢分!
此话一出,基本Netty这一项就不用问下去了
---这里要注意的是:业务线程池的结果是要返回给IO线程池的,也就是Processor线程组,
这2种线程之间通过RequestChannel进行通信
在Thrift中,是通过Runnable封装FrameBuffer来实现的
protected Runnable getRunnable(FrameBuffer frameBuffer) {
return new Invocation(frameBuffer);
}
FrameBuffer封装的就是业务逻辑完整的一个请求体,
你就理解为一个完整意义的HTTP请求体一样的
技术背景:TCP的字节流协议特性!!!不多说了
SocketServer的核心字段
源码位置
find ./ -name SocketServer.*
./core/src/main/scala/kafka/network/SocketServer.scala
1)endpoints:
EndPoint集合。一般服务器有多个网卡,这就可以配置多个IP,Kafka可以同时监听多个端口,
一个endpoint就定义了host,port,网络协议等信息,
每个Endpoint对应1个Acceptor对象
这个其实有点类似于ActiveMQ的概念,ActiveMQ支持多个协议,每个协议开启了一个TCP协议的监听端口,
所以一个ActiveMQ进程其实占用了很多个listening port.
2)numProcessorThreads & totalProcessorThreads
numProcessorThreads 的意思是 每个endpoint的Processor线程的个数
那么后面1个呢?因为有多个endpoint,所以就是endpoint的个数* numProcessorThreads
3)maxQueuedRequests: 缓存的最大请求个数
想一想,在Thrift中,最多可以缓存多少个? :)
这个其实是通过ProcessorThread对Socket进行读取后得到请求,塞到这个队列里进行缓冲
4)maxConnectionsPerIp: 每个IP上能创建的最大连接数
正常来说,不会有限制吧,难道要限制 client不连过来吗???
5)maxConnectionsPerIpOverrides: 略
6)requestChannel: 队列
kafka里的一个逻辑完整请求封装对应的队列,想想http的请求体对应的是HttpRequest
Thrift对应的是
在Thrift中,是通过Runnable封装FrameBuffer来实现的
protected Runnable getRunnable(FrameBuffer frameBuffer) {
return new Invocation(frameBuffer);
}
FrameBuffer封装的就是业务逻辑完整的一个请求体,
你就理解为一个完整意义的HTTP请求体一样的
技术背景:TCP的字节流协议特性!!!不多说了
不解释!
7)Acceptors:
Acceptor对象集合,每个Endpoint对应一个这样的对象,不解释!
8)Processors:
IO线程的集合,不解释!
===介绍完了核心字段,下面看SocketServer的初始化流程===
首先,老规矩,构造Linux的debug环境。
1)启动Kafka server
2)查看启动命令
/root/myAllFiles/jdk1.8.0_111/bin/java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../logs -Dlog4j.configuration=file:bin/../config/log4j.properties -cp /root/leveldb_0.9:/root/leveldb_0.9/*::/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/aopalliance-repackaged-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/argparse4j-0.7.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/commons-lang3-3.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-api-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-file-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-json-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-runtime-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-transforms-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/guava-20.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-api-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-locator-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-utils-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-annotations-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-core-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-databind-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-jaxrs-base-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-jaxrs-json-provider-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-module-jaxb-annotations-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javassist-3.21.0-GA.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.annotation-api-1.2.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.inject-1.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.inject-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.servlet-api-3.1.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-client-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-common-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-container-servlet-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-container-servlet-core-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-guava-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-media-jaxb-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-server-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-continuation-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-http-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-io-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-security-9.2.15.v20160210.jar:
3)构造debug命令
jdb -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../logs -Dlog4j.configuration=file:bin/../config/log4j.properties -classpath /root/leveldb_0.9:/root/leveldb_0.9/*::/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/aopalliance-repackaged-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/argparse4j-0.7.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/commons-lang3-3.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-api-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-file-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-json-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-runtime-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/connect-transforms-0.11.0.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/guava-20.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-api-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-locator-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/hk2-utils-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-annotations-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-core-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-databind-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-jaxrs-base-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-jaxrs-json-provider-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jackson-module-jaxb-annotations-2.8.5.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javassist-3.21.0-GA.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.annotation-api-1.2.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.inject-1.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.inject-2.5.0-b05.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.servlet-api-3.1.0.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/javax.ws.rs-api-2.0.1.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-client-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-common-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-container-servlet-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-container-servlet-core-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-guava-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-media-jaxb-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jersey-server-2.24.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-continuation-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-http-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-io-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/bin/../libs/jetty-security-9.2.15.v20160210.jar:/root/kafka_newest_edition/kafka_2.11-0.11.0.0_cluster0/libs/*: kafka.Kafka config/server.properties
第3步如果报错,读者可以自己体会然后修正 :)
好,通过jdb跑起来之后,我们的目标是大体了解SocketServer的执行过程,具体每个组件的实现会在后面详细介绍。
SocketServer会在启动时遍历EndPoint,启动对应的各种线程 :)
安装scala插件 见 http://www.cnblogs.com/xiyuan2016/p/6626825.html
http://scala-ide.org/download/prev-stable.html
------开始尝试debug,来热热身------
先来个断点
stop in kafka.network.SocketServer:54
然后可以开始debug了,必要的话,请自己加上源码的文件夹即可。
如图所示:
================下面正式debug==================
小贴士:
属性文件的对应关系,请参考:
kafka.server.KafkaConfig.scala
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {
//默认就1个
private val endpoints = config.listeners.map(l => l.listenerName -> l).toMap
private val numProcessorThreads = config.numNetworkThreads//配置文件中是3,val NumNetworkThreadsProp = "num.network.threads"
private val maxQueuedRequests = config.queuedMaxRequests//默认500, val QueuedMaxRequestsProp = "queued.max.requests"
private val totalProcessorThreads = numProcessorThreads * endpoints.size//3*1
private val maxConnectionsPerIp = config.maxConnectionsPerIp//默认值Int.MaxValue->2147483647 val MaxConnectionsPerIpProp = "max.connections.per.ip"
private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "
接下来初始化RequestChannel对象
val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
跟进去
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
private var responseListeners: List[(Int) => Unit] = Nil
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)//构造与业务线程池的通道
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)//构造业务线程池的返回通道
for(i <- 0 until numProcessors)//初始化,用了LinkedBlockingQueue
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
回到SocketServer.scala
private val processors = new Array[Processor](totalProcessorThreads)//准备构造IO线程池
private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()//Acceptor
private var connectionQuotas: ConnectionQuotas = _
// register the processor threads for notification of responses
requestChannel.addResponseListener(id => processors(id).wakeup())
意思就是说当有业务响应准备好时,需要wakeup当前io线程的Selector.
接下来,执行startup方法,这是核心
Step completed: "thread=main", kafka.network.SocketServer.startup(), line=74 bci=0
我们来看看做了哪些事情
/**
* Start the socket server
*/
def startup() {
this.synchronized {
//限额
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
//一些TCP的参数
val sendBufferSize = config.socketSendBufferBytes//配置文件:102400 val SocketSendBufferBytesProp = "socket.send.buffer.bytes"
val recvBufferSize = config.socketReceiveBufferBytes//配置文件:102400 val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes"
val brokerId = config.brokerId//这个就不用说了
var processorBeginIndex = 0
config.listeners.foreach { endpoint =>
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
val processorEndIndex = processorBeginIndex + numProcessorThreads//每个endpoint都启动这么多个线程
for (i <- processorBeginIndex until processorEndIndex)
processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)//初始化Processor线程
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)//初始化Acceptor线程
acceptors.put(endpoint, acceptor)
Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start()
acceptor.awaitStartup()
processorBeginIndex = processorEndIndex
}
}
关闭就比较简单了
/**
* Shutdown the socket server
*/
def shutdown() = {//关闭操作
info("Shutting down")
this.synchronized {
acceptors.values.foreach(_.shutdown)//关闭acceptor
processors.foreach(_.shutdown)//关闭processor
}
info("Shutdown completed")
}
AbstractServerThread
看下面2个
/**
* Thread that accepts and configures new connections. There is one of these per endpoint.
*/
private[kafka] class Acceptor(val endPoint: EndPoint,
val sendBufferSize: Int,
val recvBufferSize: Int,
brokerId: Int,
processors: Array[Processor],
connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
/**
* Thread that processes all requests from a single connection. There are N of these running in parallel
* each of which has its own selector
*/
private[kafka] class Processor(val id: Int,
time: Time,
maxRequestSize: Int,
requestChannel: RequestChannel,
connectionQuotas: ConnectionQuotas,
connectionsMaxIdleMs: Long,
listenerName: ListenerName,
securityProtocol: SecurityProtocol,
config: KafkaConfig,
metrics: Metrics,
credentialProvider: CredentialProvider) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
知道,Acceptor和Processor都是继承了AbstractServerThread这个类
/**
* A base class with some helper variables and methods
*/
private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {
它实现了Runnable接口的抽象类,分别为acceptor和Processor线程提供了具体的startup/shutdown功能!
小贴士:
停下来,回顾一下Netty的玩法,是不是很熟悉?
/**
* A base class with some helper variables and methods
*/
private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging {
private val startupLatch = new CountDownLatch(1)//标识是否已经启动完毕
// `shutdown()` is invoked before `startupComplete` and `shutdownComplete` if an exception is thrown in the constructor
// (e.g. if the address is already in use). We want `shutdown` to proceed in such cases, so we first assign an open
// latch and then replace it in `startupComplete()`.
@volatile private var shutdownLatch = new CountDownLatch(0)//标记是否关闭完毕
private val alive = new AtomicBoolean(true)//是否存活
def wakeup(): Unit
/**
* Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete
*/
def shutdown(): Unit = {
alive.set(false)
wakeup()
shutdownLatch.await()
}
/**
* Wait for the thread to completely start up
*/
def awaitStartup(): Unit = startupLatch.await
/**
* Record that the thread startup is complete
*/
protected def startupComplete(): Unit = {
// Replace the open latch with a closed one
shutdownLatch = new CountDownLatch(1)
startupLatch.countDown()
}
/**
* Record that the thread shutdown is complete
*/
protected def shutdownComplete(): Unit = shutdownLatch.countDown()
/**
* Is the server still running?
*/
protected def isRunning: Boolean = alive.get
/**
* Close the connection identified by `connectionId` and decrement the connection count.
*/
def close(selector: KSelector, connectionId: String): Unit = {//关闭socket,减少连接数统计
val channel = selector.channel(connectionId)
if (channel != null) {
debug(s"Closing selector connection $connectionId")
val address = channel.socketAddress
if (address != null)
connectionQuotas.dec(address)
selector.close(connectionId)
}
}
/**
* Close `channel` and decrement the connection count.
*/
def close(channel: SocketChannel): Unit = {
if (channel != null) {
debug("Closing connection from " + channel.socket.getRemoteSocketAddress())
connectionQuotas.dec(channel.socket.getInetAddress)
swallowError(channel.socket().close())
swallowError(channel.close())
}
}
}
关于Kafka网络引擎的核心字段及初始化是什么样的问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注天达云行业资讯频道了解更多相关知识。