小编给大家分享一下Kafka通讯协议是怎么样的,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
Kafka的Producer、Broker和Consumer之间采用的是一套自行设计的基于TCP层的协议。Kafka的这套协议完全是为了Kafka自身的业务需求而定制的,而非要实现一套类似于Protocol Buffer的通用协议。
基本数据类型
定长数据类型:int8,int16,int32和int64,对应到Java中就是byte, short, int和long。
变长数据类型:bytes和string。变长的数据类型由两部分组成,分别是一个有符号整数N(表示内容的长度)和N个字节的内容。其中,N为-1表示内容为null。bytes的长度由int32表示,string的长度由int16表示。
数组:数组由两部分组成,分别是一个由int32类型的数字表示的数组长度N和N个元素。
Request和Response的基本结构
Kafka中两个角色之间通讯的基本单位是Request/Response,Request和Response的基本结构如下:
RequestOrResponse => MessageSize (RequestMessage | ResponseMessage)
其中各字段的含义为:
名称 | 类型 | 描述 |
---|
MessageSize | int32 | 表示RequestMessage或者ResponseMessage的长度 |
RequestMessage/ResponseMessage | - | 表示Request或者Response的内容,在下面将会介绍其具体格式。 |
这个结构定义了通讯双方交换数据的基本结构。通讯的过程可以简单地表示为:客户端打开与服务器端的Socket,然后往Socket写入一个int32的数字表示这次发送的Request有多少字节,然后继续往Socket中写入对应字节数的数据。服务器端先读出一个int32的整数从而获取这次Request的大小,然后读取对应字节数的数据从而得到Request的具体内容。服务器端处理了请求后,也用同样的方式来发送响应。
RequestMessage的结构
RequestMessage的结构如下:
RequestMessage => ApiKey ApiVersion CorrelationId ClientId Request
名称 | 类型 | 描述 |
---|
ApiKey | int16 | 表示这次请求的API编号 |
ApiVersion | int16 | 表示请求的API的版本,有了版本后就可以做到后向兼容 |
CorrelationId | int32 | 由客户端指定的一个数字唯一标示这次请求的id,服务器端在处理完请求后也会把同样的CorrelationId写到Response中,这样客户端就能把某个请求和响应对应起来了。 |
ClientId | string | 客户端指定的用来描述客户端的字符串,会被用来记录日志和监控,它唯一标示一个客户端。 |
Request | - | Request的具体内容。 |
ResponseMessage的结构
ResponseMessage的结构如下:
ResponseMessage => CorrelationId Response
名称 | 类型 | 描述 |
---|
CorrelationId | int32 | 对应Request的CorrelationId。 |
Response | - | 对应Request的Response,不同的Request的Response的字段是不一样的。 |
Message
Kafka是一个分布式消息系统,Producer生产消息并推送(Push)给Broker,然后Consumer再从Broker那里取走(Pull)消息。Producer生产的消息就是由Message来表示的,对用户来讲,它就是键-值对,来看看它的结构。
Message => Crc MagicByte Attributes Key Value
名称 | 类型 | 描述 |
---|
CRC | int32 | 表示这条消息(不包括CRC字段本身)的校验码 |
MagicByte | int8 | 表示消息格式的版本,用来做后向兼容,目前值为0 |
Attributes | int8 | 表示这条消息的元数据,目前最低两位用来表示压缩格式 |
Key | bytes | 表示这条消息的Key,可以为null |
Value | bytes | 表示这条消息的Value。Kafka支持消息嵌套,也就是把一条消息作为Value放到另外一条消息里面。 |
MessageSet
MessageSet用来组合多条Message,它在每条Message的基础上加上了Offset和MessageSize,其结构是:
MessageSet => [Offset MessageSize Message]
它的含义是MessageSet是个数组,数组的每个元素由三部分组成,分别是Offset,MessageSize和Message,它们的含义分别是:
名称 | 类型 | 描述 |
---|
Offset | int64 | 它用来作为log中的序列号,Producer在生产消息的时候还不知道具体的值是什么,可以随便填个数字进去 |
MessageSize | int32 | 表示这条Message的大小 |
Message | - | 表示这条Message的具体内容,其格式见上一小节。 |
Message的压缩
Kafka支持下面几种压缩方式,
压缩方式 | 编码 |
---|
不压缩 | 0 |
Gzip | 1 |
Snappy | 2 |
LZ4 | 3 |
其中编码就是Message的Attribute的最低两位的值。
因为单条消息中重复内容可能不多,所以通常把多条消息放在一起组成MessageSet,然后再把MessageSet放到一条Message里面去,从而提高压缩比率。
Request/Respone和Message/MessageSet的关系
以上是“Kafka通讯协议是怎么样的”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注天达云行业资讯频道!