moquette源码分析之七--qos1和qos2消息的处理
更新:HHH   时间:2023-1-7


首先解释一下mqtt协议的session的概念,因为只有有了session才会存在消息质量保证一说

如果清理会话(CleanSession)标志被设置为0,服务端必须基于当前会话(使用客户端标识符识别)的状态恢复与客户端的通信。如果没有与这个客户端标识符关联的会话,服务端必须创建一个新的会话。在连接断开之后,当连接断开后,客户端和服务端必须保存会话信息 [MQTT-3.1.2-4]。当清理会话标志为0的会话连接断开之后,服务端必须将之后的QoS 1和QoS 2级别的消息保存为会话状态的一部分,如果这些消息匹配断开连接时客户端的任何订阅 [MQTT-3.1.2-5]。服务端也可以保存满足相同条件的QoS 0级别的消息。
如果清理会话(CleanSession)标志被设置为1,客户端和服务端必须丢弃之前的任何会话并开始一个新的会话。会话仅持续和网络连接同样长的时间。与这个会话关联的状态数据不能被任何之后的会话重用 [MQTT-3.1.2-6]。
客户端的会话状态包括:
● 已经发送给服务端,但是还没有完成确认的QoS 1和QoS 2级别的消息
● 已从服务端接收,但是还没有完成确认的QoS 2级别的消息。
服务端的会话状态包括:
● 会话是否存在,即使会话状态的其它部分都是空。
● 客户端的订阅信息。
● 已经发送给客户端,但是还没有完成确认的QoS 1和QoS 2级别的消息。
● 即将传输给客户端的QoS 1和QoS 2级别的消息。
● 已从客户端接收,但是还没有完成确认的QoS 2级别的消息。
● 可选,准备发送给客户端的QoS 0级别的消息。
保留消息不是服务端会话状态的一部分,会话终止时不能删除保留消息 [MQTT-3.1.2.7]。
有关状态存储的限制和细节见第 4.1节。
当清理会话标志被设置为1时,客户端和服务端的状态删除不需要是原子操作。
非规范评注
为了确保在发生故障时状态的一致性,客户端应该使用会话状态标志1重复请求连接,直到连接成功。
非规范评注
一般来说,客户端连接时总是将清理会话标志设置为0或1,并且不交替使用两种值。这个选择取决于具体的应用。清理会话标志设置为1的客户端不会收到旧的应用消息,而且在每次连接成功后都需要重新订阅任何相关的主题。清理会话标志设置为0的客户端会收到所有在它连接断开期间发布的QoS 1和QoS 2级别的消息。因此,要确保不丢失连接断开期间的消息,需要使用QoS 1或 QoS 2级别,同时将清理会话标志设置为0。
非规范评注
清理会话标志0的客户端连接时,它请求服务端在连接断开后保留它的MQTT会话状态。如果打算在之后的某个时间点重连到这个服务端,客户端连接应该只使用清理会话标志0。当客户端决定之后不再使用这个会话时,应该将清理会话标志设置为1最后再连接一次,然后断开连接。
一。mqtt协议支持三种消息等级,分别是:

1.0--AT_MOST_ONCE,至多一次
2.1--AT_LEAST_ONCE,至少一次
3.2--EXACTLY_ONCE,有且仅有一次

二。其实AMQP协议也支持这三种。我们来看一下mqtt对于这三种消息的实现方式,对于理解AMQP的消息等级也有帮助

AT_MOST_ONCE比较简单,这里不说了,只说后两种

先看消息等级为1,moquette是怎么实现的。
先上一张图,便于理解,

moquette对于session的实现在这里:这里以session存在在内存的实现来讲解
io.moquette.persistence.MemorySessionStore

    class Session {
    final String clientID;
    final ClientSession clientSession;
    final Map<Topic, Subscription> subscriptions = new ConcurrentHashMap<>();
    final AtomicReference<PersistentSession> persistentSession = new AtomicReference<>(null);
    final BlockingQueue<StoredMessage> queue = new ArrayBlockingQueue<>(Constants.MAX_MESSAGE_QUEUE);
    final Map<Integer, StoredMessage> secondPhaseStore = new ConcurrentHashMap<>();
    final Map<Integer, StoredMessage> outboundFlightMessages =
            Collections.synchronizedMap(new HashMap<Integer, StoredMessage>());
    final Map<Integer, StoredMessage> inboundFlightMessages = new ConcurrentHashMap<>();

    Session(String clientID, ClientSession clientSession) {
        this.clientID = clientID;
        this.clientSession = clientSession;
    }
}

    对于broker分发的qos1消息,在没有收到ack消息之前是存储在outboundFlightMessages里面的。
    结合上面的流程图,很好理解了
    这里补充说明一点,moquette的实现并没有对以发送出去,而没有收到ack消息做处理,按理说这是session的一部分,如果客户端要求保持会话,客户端断线重连之后,是应该进行重发的,当然大家也可以自己实现。

再看消息等级为2,看下面的图

结合上面的图就很好理解了,这里注意,moquette 其实在分发消息之前,是应该先从nflightBound..remove(messageId_1)的,但是它没有这么做,这其实是个bug,大家如果没用到cleanSession和qos2消息还好,用到了就有可能存在内存泄露

三。末尾说一下mqtt的重发机制,

mqtt- broker只会在下面一种情况下重发消息:
必须满足两个条件:
1.客户端要求broker保持会话,就是说qos1和qos2消息是会话的一部分。(上一次连接的时候要求保存会话)
2.客户端重连成功,就是说连接正常能够通信,也就是说broker对于qos1和qos2消息只做支持,不做保证(tcp连接正常,四层传输正常)
因为它没法做保证,网络不通,机器断电,磁盘损害等。qos1和qos2只在连接正常的前提之下做出保证。
返回开发技术教程...