mqtt协议-broker之moqutte源码研究六之集群
更新:HHH   时间:2023-1-7


moquette的集群功能是通过Hazelcast来实现的,对Hazelcast不了解的同学可以自行Google以下。
在讲解moquette的集群功能之前需要讲解一下moquette的拦截器,因为moquette对Hazelcast的集成本身就是通过拦截器来实现的。

一。拦截器
io.moquette.spi.impl.ProtocolProcessor类里面有一个BrokerInterceptor类,这个类就是broker拦截器,这个对象,在processConnect,processPubAck,processPubComp,processDisconnect,processConnectionLost,processUnsubscribe,processSubscribe,processPublish等八个地方都用到了,说明在broker处理各个报文的关键期间都会用到,我们先看一这个类的结构

 private static final Logger LOG = LoggerFactory.getLogger(BrokerInterceptor.class);
private final Map<Class<?>, List<InterceptHandler>> handlers;
private final ExecutorService executor;

private BrokerInterceptor(int poolSize, List<InterceptHandler> handlers) {
    LOG.info("Initializing broker interceptor. InterceptorIds={}", getInterceptorIds(handlers));
    this.handlers = new HashMap<>();
    for (Class<?> messageType : InterceptHandler.ALL_MESSAGE_TYPES) {
        this.handlers.put(messageType, new CopyOnWriteArrayList<InterceptHandler>());
    }
    for (InterceptHandler handler : handlers) {
        this.addInterceptHandler(handler);
    }
    executor = Executors.newFixedThreadPool(poolSize);
}

里面有个map,有一个,看构造方法,发现key是(InterceptConnectMessage.class, InterceptDisconnectMessage.class,
InterceptConnectionLostMessage.class, InterceptPublishMessage.class, InterceptSubscribeMessage.class,
InterceptUnsubscribeMessage.class, InterceptAcknowledgedMessage.class)其中的一个,发现总共有七个消息类,这七个消息类,刚好与上面的八个使用的地方对应上(processPubAck,processPubComp都对应InterceptAcknowledgedMessage)。value是一个list,里面放的是InterceptHandler,即处理器。
从这个map可以看出来,moquette允许你在,各个关键阶段注册一系列的处理器,供它回调。

二。HazelcastInterceptHandler
HazelcastInterceptHandler集成了AbstractInterceptHandler,AbstractInterceptHandler实现了InterceptHandler,这里吐槽一下Java,Java要求实现一个Java的interface就要实现所有的接口,可是很多时候我就只想实现其中一个方法,如果接口里面有10个方法结果
我的实现类里面会出现9个空方法,显得代码非常不简洁,当然,用个抽象类缓冲一下是一个办法,但是我又得加一个类,还好Java8,有个默认方法,能够解决这个问题。
我们跟踪一下,看一下HazelcastInterceptHandler,是什么时候被放进BrokerInterceptor的。

从启动类Server里面的main一路跟踪到下面的代码

    public void startServer(IConfig config, List<? extends InterceptHandler> handlers, ISslContextCreator sslCtxCreator,
        IAuthenticator authenticator, IAuthorizator authorizator) throws IOException {
    if (handlers == null) {
        handlers = Collections.emptyList();
    }
    LOG.info("Starting Moquette Server. MQTT message interceptors={}", getInterceptorIds(handlers));

    scheduler = Executors.newScheduledThreadPool(1);

    final String handlerProp = System.getProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME);
    if (handlerProp != null) {
        config.setProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME, handlerProp);
    }
    configureCluster(config);
    final String persistencePath = config.getProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME);
    LOG.info("Configuring Using persistent store file, path={}", persistencePath);
    m_processorBootstrapper = new ProtocolProcessorBootstrapper();
    final ProtocolProcessor processor = m_processorBootstrapper.init(config, handlers, authenticator, authorizator,
        this);
    LOG.info("Initialized MQTT protocol processor");
    if (sslCtxCreator == null) {
        LOG.warn("Using default SSL context creator");
        sslCtxCreator = new DefaultMoquetteSslContextCreator(config);
    }

    LOG.info("Binding server to the configured ports");
    m_acceptor = new NettyAcceptor();
    m_acceptor.initialize(processor, config, sslCtxCreator);
    m_processor = processor;

    LOG.info("Moquette server has been initialized successfully");
    m_initialized = true;
}

    上面有三点需要注意,
    1.是从启动参数里面获取INTERCEPT_HANDLER_PROPERTY_NAME = "intercept.handler",放入config,说明注册处理器,是需要通过启动参数指定的,而且目前的版本还只能指定一个。
    2.configureCluster(config);配置集群,我们暂且不看怎么配置的。后面再讲
    3.通过ProtocolProcessorBootstrapper初始化ProtocolProcessor,这个类有多重要,前面几篇文章已经讲过了。它的init方法,很有意思,里面做了很多事情,基本上就是初始化ProtocolProcessor里面的各种对象,其中与本文要讲的摘抄下来

    LOG.info("Configuring message interceptors...");

    List<InterceptHandler> observers = new ArrayList<>(embeddedObservers);
    String interceptorClassName = props.getProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME);
    if (interceptorClassName != null && !interceptorClassName.isEmpty()) {
        InterceptHandler handler = loadClass(interceptorClassName, InterceptHandler.class, Server.class, server);
        if (handler != null) {
            observers.add(handler);
        }
    }
    BrokerInterceptor interceptor = new BrokBrokerInterceptor(props, observers);

这里面就是根据classname,实例化处理器,然后创建BrokerInterceptor对象,它的狗爪方法里面调用了这个方法addInterceptHandler

     @Override
public void addInterceptHandler(InterceptHandler interceptHandler) {
    Class<?>[] interceptedMessageTypes = getInterceptedMessageTypes(interceptHandler);
    LOG.info("Adding MQTT message interceptor. InterceptorId={}, handledMessageTypes={}",
        interceptHandler.getID(), interceptedMessageTypes);
    for (Class<?> interceptMessageType : interceptedMessageTypes) {
        this.handlers.get(interceptMessageType).add(interceptHandler);
    }
}

    private static Class<?>[] getInterceptedMessageTypes(InterceptHandler interceptHandler) {
    Class<?>[] interceptedMessageTypes = interceptHandler.getInterceptedMessageTypes();
    if (interceptedMessageTypes == null) {
        return InterceptHandler.ALL_MESSAGE_TYPES;
    }
    return interceptedMessageTypes;
}

调用了interceptHandler.getInterceptedMessageTypes();说明每个InterceptHandler都实现了这个方法,即它自身必须告诉别人,它准备注册到哪几个事件上,即上面的七个类型。
一个处理器,可以注册到上面七个中的一个,或多个,或全部,如果返回的是空,则默认是全部。HazelcastInterceptHandler就注册到了所有的上面

到此我们就明白了HazelcastInterceptHandler注册到拦截器的全过程。

三。集群间通信。
我们回答一下上面的问题,看一下configureCluster的全过程

private void configureCluster(IConfig config) throws FileNotFoundException {
    LOG.info("Configuring embedded Hazelcast instance");
    String interceptHandlerClassname = config.getProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME);
    if (interceptHandlerClassname == null || !HZ_INTERCEPT_HANDLER.equals(interceptHandlerClassname)) {
        LOG.info("There are no Hazelcast intercept handlers. The server won't start a Hazelcast instance.");
        return;
    }
    String hzConfigPath = config.getProperty(BrokerConstants.HAZELCAST_CONFIGURATION);
    if (hzConfigPath != null) {
        boolean isHzConfigOnClasspath = this.getClass().getClassLoader().getResource(hzConfigPath) != null;
        Config hzconfig = isHzConfigOnClasspath
                ? new ClasspathXmlConfig(hzConfigPath)
                : new FileSystemXmlConfig(hzConfigPath);
        LOG.info("Starting Hazelcast instance. ConfigurationFile={}", hzconfig);
        hazelcastInstance = Hazelcast.newHazelcastInstance(hzconfig);
    } else {
        LOG.info("Starting Hazelcast instance with default configuration");
        hazelcastInstance = Hazelcast.newHazelcastInstance();
    }
    listenOnHazelCastMsg();
}

1.判断是否注册了HazelcastInterceptHandler,如果没有直接跳出方法
2.从config里面获取hazelcast.configuration的位置,并且加载配置文件,同时创建HazelcastInstance实例。这个配置文件在moquette-master-improve/distribution/src/main/resources下有一个模版,也可以参考下面的

    <network>
<public-address>IP1:5701</public-address>
<port>5701</port>
<join>
   <multicast enabled="false" />
   <tcp-ip enabled="true">
          <required-member>IP2:5701</required-member>
   </tcp-ip>
</join>
</network>
    3.监听HazelcastInstance,具体的监听对象是HazelcastListener,这个类刚好和HazelcastInterceptHandler构成一对。刚好用来完成集群间的同步

先看一下HazelcastInterceptHandler,里面只有一个方法,onPublish,即监听事件的发生,发送消息。这里面我最初有一个疑惑,因为我总觉得HazelcastInterceptHandler其实是没必要注册到所有的事件上的,但是它确实是注册到了七个事件。直到看到这个里面只有一个onPublish,我才明白,原来,它确实是注册了七个事件,brokerintercept也确实都回调了它,但是它只实现了这一个方法,其他的方法都是通过抽象方法继承来的空方法,也就是什么都没做。这样当然扩展性稍微好一点,主动权在处理器上面,但其实没有太大的必要。因为每个处理器想监听哪些事件,其实是知道的,监听几个注册几个就完了。
然后看一下HazelcastListener,里面也只有一个方法,onMessage,即当有集群中的其他节点发送消息给当前机器的时候,由Hazelcast,调用HazelcastListener的onMessage方法,看一下这个方法的逻辑

    public void onMessage(Message<HazelcastMsg> msg) {
    try {
        if (!msg.getPublishingMember().equals(server.getHazelcastInstance().getCluster().getLocalMember())) {
            HazelcastMsg hzMsg = msg.getMessageObject();
            LOG.info("{} received from hazelcast for topic {} message: {}", hzMsg.getClientId(), hzMsg.getTopic(),
                hzMsg.getPayload());
            // TODO pass forward this information in somehow publishMessage.setLocal(false);

            MqttQoS qos = MqttQoS.valueOf(hzMsg.getQos());
            MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, false, 0);
            MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(hzMsg.getTopic(), 0);
            ByteBuf payload = Unpooled.wrappedBuffer(hzMsg.getPayload());
            MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, varHeader, payload);
            server.internalPublish(publishMessage, hzMsg.getClientId());
        }
    } catch (Exception ex) {
        LOG.error("error polling hazelcast msg queue", ex);
    }
}

逻辑比较简单,构建消息体,调用server.internalPublish(publishMessage, hzMsg.getClientId());原来Server里面的这个方法干这个用的,我之前看到这个方法是一脸懵逼呀。说白了当收到其他的节点发过来的消息的时候,伪造成某个client的publish报文,接着往下看
这个方法内部其实调用的是ProtocolProcessor.internalPublish(msg, clientId),这个方法里面调用的是MessagesPublisher.publish3Subscribers(toStoreMsg, topic),这个方法很眼熟,是因为在这一篇中讲过,https://blog.51cto.com/13579730/2074290

因为Qos0PublishHandler,Qos1PublishHandler,Qos2PublishHandler底层都调的这个方法,至此发现集群间的消息同步,其实就是模仿的client向broker发送publish消息,或者可以理解成:
当某个broker节点收到来自一个连接到它的client发送的publish消息的时候,它不仅会分发给订阅这个消息并且连接到它的其他clients,同时会把消息分发给集群中的其他节点,以供集群中其他节点继续分发下去。
终于把集群讲完了。后面还有一篇,讲topic目录树的,这个系列就算完结了。哈哈

返回开发技术教程...