这篇文章主要介绍“activemq特性是什么”,在日常操作中,相信很多人在activemq特性是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”activemq特性是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
activemq特点:用通配符订阅多个destination,用组合发布多重destionation
activemq支持destination的层次结构【topic和queen】便于归类和管理。
通配符有三个:
. 用来分隔路径
* 用来匹配路径中的一节
> 用来匹配任意节的路径
opics: <sport><League>.<team>
。例如: football.division.leeds。 如果leeds 参加两种运动--Scccer 和 Rugby,为了方便,我们希望通过一个消息消费者而看到Leeds两种运动的最新战绩,这个时候,通配符就有用武之地了
. : used to separate elements in the destination name
* : used to match one element
> : match one or all trailing elements
所以,对于上面的例子, 你可以订阅这样的主题: *.*.Leeds
如果你想知道division1 这个赛区的所有分数, 你可以订阅这个: soccer.division1.*
如果你想知道Rugby的分数: 你可以订阅这个: rugby.>.
然而, 通配符中是为消费者服务的,如果你发送了这样的一个主题: rugby.>., 这个消息仅会发送到命名了rugby.>.的主题,并不是所有的主题都是以rugby开头的。
这里有一种 方法,使消息生产者能将一条
消息发送到多个目的地。通过使用 composite destination。
将同一条消息发送到不同的目的地是很有用的。 比如一个用来存储信息的应用,会发送一条消息给队列
同时也要将这条消息广播给监控的所有系统。通常,你会通过用两个producer 发送两次消息来达到这个目的。composite destination就是用来解决这种情况的
例如,如果你创建了名子为: store.order.backoffice,store.order.warehouse 的 Queue,这样 就会发送同时两个Queue。
订阅信息 解释
PRICE.> Any price for any product on any exchange
PRICE.STOCK.> Any price for a stock on any exchange
PRICE.STOCK.NASDAQ.* Any stock price on NASDAQ
PRICE.STOCK.*.IBM Any IBM stock price on any exchange
从5.5 版本以后,可以自定义路径分隔符:
<plugins>
.....
<destinationPathSeparatorPlugin/>
</plugins>
此时FOO.BAR.* 可以表示为 FOO/BAR/*
也可以通过pathSeparator 属性定义其他符号位路径分隔符。
public void subscribeToLeeds() throws JMSException {
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic allLeeds = session.createTopic("*.*.Leeds");
MessageConsumer consumer = session.createConsumer(allLeeds);
Message result = consumer.receive();
}
11.1.2发送一个message到多重destinations
发送相同的message到不同的destination上:案列发送一个[queen,opic]组合模式,默认的组合destination用,分隔
列如store.order.backoffice,store.order.warehouse
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue ordersDestination = session.createQueue("store.orders, topic://store.orders");
MessageProducer producer = session.createProducer(ordersDestination);
Message order = session.createObjectMessage();
producer.send(order);
11.2通知消息
单的说就是实现了ActiveMQ的broker上各种操作的记录跟踪和通知。
使用这个功能,你可以实时的知道broker上
创建或销毁了连接,
添加或删除了生存者或消费者,
添加或删除了主题或队列,
有消息发送和接收,
什么时候有慢消费者,
什么时候有快生产者
什么时候什么消息被丢弃
什么时候broker被添加到集群(主从或是网络连接)
这个机制是ActiveMQ对JMS协议的重要补充,也是基于JMS实现的ActiveMQ的可管理性的一部分。多个ActiveMQ的相互协调和互操作的基础设置。
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic connectionAdvisory = org.apache.activemq.advisory.AdvisorySupport.CONNECTION_ADVISORY_TOPIC;
MessageConsumer consumer = session.createConsumer(connectionAdvisory);
ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
DataStructure data = (DataStructure) message.getDataStructure();
if (data.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE) {
ConnectionInfo connectionInfo = (ConnectionInfo) data;
System.out.println("Connection started: " + connectionInfo);
} else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
RemoveInfo removeInfo = (RemoveInfo) data;
System.out.println("Connection stopped: " + removeInfo.getObjectId());
} else {
System.err.println("Unknown message " + data);
}
大多数advisor消息都是完整的对于destiation,但是呢advisorysupport类有一些方法来决定监听哪个advisorytopic,你也能使用通配符-
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
// Lets first create a Consumer to listen too
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Lets first create a Consumer to listen too
Queue queue = session.createQueue("test.Queue");
MessageConsumer testConsumer = session.createConsumer(queue);
// so lets listen for the Consumer starting/stoping
Topic advisoryTopic = org.apache.activemq.advisory.AdvisorySupport.getConsumerAdvisoryTopic(queue);
MessageConsumer consumer = session.createConsumer(advisoryTopic);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message m) {
ActiveMQMessage message = (ActiveMQMessage) m;
try {
System.out.println("Consumer Count = " + m.getStringProperty("consumerCount"));
DataStructure data = (DataStructure) message.getDataStructure();
if (data.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) {
ConsumerInfo consumerInfo = (ConsumerInfo) data;
System.out.println("Consumer started: " + consumerInfo);
} else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
RemoveInfo removeInfo = (RemoveInfo) data;
System.out.println("Consumer stopped: " + removeInfo.getObjectId());
} else {
System.err.println("Unknown message " + data);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});
testConsumer.close();
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" advisoryForSlowConsumers="true">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
ActiveMQ中,topic只有在持久订阅(durablesubscription)下是持久化的。存在持久订阅时,每个持久订阅者,都相当于一个持久化的queue的客户端,它会收取所有消息。这种情况下存在两个问题:
1. 同一应用内consumer端负载均衡的问题:同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个都会获取所有消息。queue模式可以解决这个问题,broker
端又不能将消息发送到多个应用端。所以,既要发布订阅,又要让消费者分组,这个功能jms规范本身是没有的。
2. 同一应用内consumer端failover的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高,为了解决这两个问题,ActiveMQ中实现了虚拟
Topic的功能。使用起来非常简单。对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.TEST。对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为
队列的名称,即可表明自己的身份即可实现消费端应用分组。例如Consumer.A.VirtualTopic.TEST,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.TEST说明是一个名称为B的客户端。
可以在同一个应用里使用多个consumer消费此queue,则可以实现上面两个功能。又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。每个客户端相当于一个持久订
阅者,而且这个客户端可以使用多个消费者共同来承担消费任务。
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection consumerConnection = connectionFactory.createConnection();
consumerConnection.start();
Session consumerSessionA = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue consumerAQueue = consumerSessionA.createQueue("Consumer.A.VirtualTopic.orders");
MessageConsumer consumerA = consumerSessionA.createConsumer(consumerAQueue);
Session consumerSessionB = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue consumerBQueue = consumerSessionB.createQueue("Consumer.B.VirtualTopic.orders");
MessageConsumer consumerB = consumerSessionB.createConsumer(consumerAQueue);
//setup the sender
Connection senderConnection = connectionFactory.createConnection();
senderConnection.start();
Session senerSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic ordersDestination = senerSession.createTopic("VirtualTopic.orders");
MessageProducer producer = senerSession.createProducer(ordersDestination);
同样queue名称的消费者会平分所有消息。
从queue接收到的消息,message.getJMSDestination().toString()为topic://VirtualTopic.TEST,即原始的destination。消息的persistent属性为true,即每个相当于一个持久订阅。
Virtual Topic这个功能特性在broker上有个总开关,useVirtualTopics属性,默认为true,设置为false即可关闭此功能。
当此功能开启,并且使用了持久化的存储时,broker启动的时候会从持久化存储里拿到所有的destinations的名称,如果名称模式与Virtual Topics匹配,则把它们添加到系统的Virtual Topics列表中去。
当然,没有显式定义的Virtual Topics,也可以直接使用的,系统会自动创建对应的实际topic。
当有consumer访问此VirtualTopics时,系统会自动创建持久化的queue,并在每次Topic收到消息时,分发到具体的queue。
可追溯”消费者,只对Topic有效,如果consumer是可追溯的,那么它可以获取实例创建之前的消息。通常而言,订阅者不可能获取实例创建之前的消息,因为broker根本不知道它的存在。对于broker而言,如果
一个Topic通道创建,且有发布者发布消息(Publisher),那么broker将会在内存中(非持久化)或者磁盘中(持久化)保存已经发布的消息,直到所有的订阅者都消费者,才会清除原始消息内容。那么retroactive
类型的订阅者,就可以获取这些原本不属于自己但broker上还保存的旧消息,就像我们订阅一种Feed,可以立即获取旧的内容列表一样。如果此订阅者不是durable(耐久的),它可以获取最近发布的一些消息;如果是durable,它可以获取存储器中尚未删除的所有的旧消息。[下文会详细介绍Topic的数据转发模型]
//在destinationUrl中设置,默认为false
feedTopic?consumer.retroactive=true
在broker端,可以配置当前Topic默认为“可追溯的”,不过Topic并不会在此种情况下额外的保存消息,只不过表示订阅者默认都是可追溯的而已。
<!-- 只对topic有效,默认为false -->
<policyEntry topic="feedTopic" alwaysRetroactive="true" />
String brokerURI = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURI);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("soccer.division1.leeds?consumer.retroactive=true");
MessageConsumer consumer = session.createConsumer(topic);
Message result = consumer.receive();
redeliveryPolicy
consumer使用的重发策略,当消息在client端处理失败(比如onMessage方法抛出异常,事务回滚等),将会触发消息重发。对于Broker端,需要重发的消息将会被立即发送(如果broker端使用异步发送,
且发送队列中还有其他消息,那么重发的消息可能不会被立即到达Consumer)。我们通过此Policy配置最大重发次数、重发频率等,如果你的Consumer客户端处于不良网络环境中,可以适当调整相关参数。参数列表,
请参见(RedeliveryPolicy)
//在brokerUrl中设置
tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=6
. redeliveryPolicy
RedelieveryPolicy policy=connection.getRedelieveryPolicy();
policy.setInitialRedelieveryDelay(500);
policy.setBackOffMultiplier(2)
policy.setUseExponentialBackOff(true)
policy.setMaximumRedelieveries(2)
DLQ-死信队列(Dead Letter Queue)用来保存处理失败或者过期的消息。
出现以下情况时,消息会被redelivered
A transacted session is used and rollback() is called.
A transacted session is closed before commit is called.
A session is using CLIENT_ACKNOWLEDGE and Session.recover() is called.
当一个消息被redelivered超过maximumRedeliveries(缺省为6次,具体设置请参考后面的链接)次数时,会给broker发送一个"Poison ack",这个消息被认为是a poison pill,这时broker会将这
消息发送到DLQ,以便后续处理。缺省的死信队列是ActiveMQ.DLQ,如果没有特别指定,死信都会被发送到这个队列。缺省持久消息过期,会被送到DLQ,非持久消息不会送到DLQ可以通过配置文件(activemq.xml)
来调整死信发送策略
<destinationPolicy>
<policyMap>
<policyEntries>
<!— 设置所有队列,使用 '>' ,否则用队列名称 -->
<policyEntry queue=">">
<deadLetterStrategy>
<!--
queuePrefix:设置死信队列前缀
useQueueForQueueMessages: 设置使用队列保存死信,还可以设置useQueueForTopicMessages,使用Topic来保存死信
-->
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" processExpired="false" processNonPersistent="false"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
...
</broker>
在一个电子系统中可能接受来自不同供应商的各种订单信息,不同类型的订单走的流程不尽相同,为了快速处理各种不同的订单完成不同的业务。特定义不同的路由 信息。根据路由信息的不同,将消息进行不同的处理。如果采用ActiveMQ那么最好采用apache-camel整合,使不同的消息根据不同的流程自动 处理到不同的队列中去。
<beans>
<broker brokerName="testBroker">
<transportConnectors>
<transportConnector uri="tcp://localhos:61616">
</transportConnectors>
<import resource="camel.xml">
</beans>
到此,关于“activemq特性是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注天达云网站,小编会继续努力为大家带来更多实用的文章!