ActiveMQ
概述
ActiveMQ是一种开源的、多协议、基于Java的消息代理。
具备消息队列的特性:解耦、异步、消峰。即应用间通信耦合度降低,异步处理应用间消息,保护消费系统免受冲击。
安装
下载安装包解压即可使用,需要提前安装好对应的JDK;
相关操作均在对应的安装目录下的bin目录下进行;
- 启停命令
# 启动
./activemq start
# 停止
./activemq stop
# 指定日志启动,默认的日志记录在安装目录下data/activemq.log
./activemq start > /xx/xx.log
# 启动一个activemq实例(指定配置文件)
./activemq start xbean:file:/xx/xx.xml
- 端口信息
进程默认占用端口为61616;
web界面控制台默认端口为8161;
- web界面默认账户密码为admin/admin,也可以通过修改conf目录下的jetty-realm.properties文件添加用户或者修改密码;
此处为添加一个zmyx用户(zmyx/zmyx),角色为admin;
Hello World
在消息模型中,存在以下角色:生产者,消费者,消息队列,消息;
消息生产者将消息发送到MQ,MQ以队列或者Topic的形式存储消息,消费者从MQ处消费消息;
将消息发送至队列(Queue)并消费
生产者:
package site.zhaoyangjue.mq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class QueueProducer {
private static final String ACTIVE_URL = "tcp://192.168.80.82:61616";
private static final String QUEUE_NAME = "messageQueue";
public static void main(String[] args) throws JMSException {
// 创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
// 开启连接
connection.start();
// 获取session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 设置消息发送目的地,此处为queue
Destination destination = session.createQueue(QUEUE_NAME);
// 创建生产者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 5; i++) {
// 创建消息
TextMessage textMessage = session.createTextMessage("消息-" + i);
// 发送消息
producer.send(textMessage);
}
// 关闭session和连接
session.close();
connection.close();
System.out.println("消息发送成功");
}
}
此时可以从控制台上看到对应的消息队列中的信息:
此处PendingMessage标识的是等待消费的消息;
Consumers标识消费者的数量,此时消费者端没有启动,所以此处为0;
Enqueued标识进入队列的消息,此处标识有5条消息进入队列;
Dequeued标识出队列的消息,因为没有消费者消费,此时数量为0;
消费者:
package site.zhaoyangjue.mq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class QueueConsumer {
private static final String ACTIVE_URL = "tcp://192.168.80.82:61616";
private static final String QUEUE_NAME = "messageQueue";
public static void main(String[] args) throws JMSException {
// 创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
// 开启连接
connection.start();
// 获取session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 设置消息发送目的地,此处为queue
Destination destination = session.createQueue(QUEUE_NAME);
// 创建消费者
MessageConsumer consumer = session.createConsumer(destination);
// 接收消息
while(true) {
Message message = consumer.receive(4000L);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("消费者接收到消息:" + textMessage.getText());
} else {
break;
}
}
// 关闭session和connection
session.close();
connection.close();
System.out.println("消息消费成功");
}
}
消费者端执行之后,在消费者端可以看到已经获取到了消息,且MQ控制台上对应的消费数也发生了变化,表明消已成功消费:
基于监听机制的消费者:
package site.zhaoyangjue.mq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class QueueConsumerWithListener {
private static final String ACTIVE_URL = "tcp://192.168.80.82:61616";
private static final String QUEUE_NAME = "messageQueue";
public static void main(String[] args) throws JMSException, IOException {
// 创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
// 开启连接
connection.start();
// 获取session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 设置消息发送目的地,此处为queue
Destination destination = session.createQueue(QUEUE_NAME);
// 创建消费者
MessageConsumer consumer = session.createConsumer(destination);
// 设置监听并接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费者接收到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 保持程序运行
System.in.read();
// 关闭session和connection
session.close();
connection.close();
System.out.println("消息消费成功");
}
}
将消息发送至主题(Topic)并消费
与此前队列的代码类似,只不过需要更改下消息的目的地;
// 生产者
Destination destination = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(destination);
// 消费者
Destination destination = session.createTopic(TOPIC_NAME);
MessageConsumer consumer = session.createConsumer(destination);
不同的是,Topic下,消费者需要先启动,生产者后启动。
多个消费者消费场景
-
对于Queue而言,多个消费者同时消费,MQ会均衡分配消息进行消费;
先运行的消费者会消费对应的消息,后运行的消费者则拿不到此前的消息。
-
对于Topic而言,对于已订阅的消费者,MQ会将订阅之后的消息发送给各个消费者。
消息
一条完整的消息包含了消息头、消息体、消息属性三部分;
消息头
JMSDestination:消息发送的终点,Topic或者Queue;
JMSDeliveryMode:持久化模式和非持久化模式:持久化则表示服务宕机之后,消息不会丢失,非持久化表示服务宕机之后,消息会被丢失。
JMSExpiration:设置消息在一定时间后过期,默认是永不过期;
JMSPriority:消息优先级,0-9十个级别,默认为4级;
JMSMessageID:唯一标识一条消息;
消息体
消息格式有以下五种,常用的是TextMessage和MapMessage两种:
TextMessage:普通字符串消息;
MapMessage:一个Map类型的消息,Key为String,Value为基本类型;
BytesMessage:二进制数组消息;
StreamMessage:Java数据流消息,用标准流操作来顺序的填充和读取;
ObjectMessage:对象消息,包含一个可序列化的java对象;
消息属性
除了消息头设置的字段之外的其他值,都可以作为消息属性。
消息的可靠性从三方面可以体现:消息持久化、消息事务、消息签收;
消息持久化
ActiveMQ下Queue默认是持久化的,Topic默认是非持久化的;
Topic持久化之后,只要消费者订阅过,都可以消费到订阅之后的消息;
// 此为订阅操作,订阅之后消费者可以下线,也可以在线;
connection.setClientID("消费者1号");
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"sub1");
消息事务
生产者和消费者均可以设置消息事务:
生产者设置事务,只有消息被commit之后消息才会被提交,否则不会被提交;此处的true即为开启事务
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 提交事务
session.commit();
// 异常之后事务回滚
session.rollback();
消息签收
在有事务的情况下:事务被成功提交则消息自动被签收;事务回滚,则消息会被再次传送;
在没有事务的情况下:消息的确认需要由应答模式来决定,默认为自动签收,如果设置了手动签收,则以手动签收为主。
综述:消息事务高于消息签收。
消息持久化机制
Activemq的持久化机制有AMQ、KahaDB、LevelDB、JDBC等方式;
- AMQ:基于文件的存储方式,即将消息写入指定文件来实现;文件大小默认为32M,当文件中的消息被消费了之后,就会被标记为清除,在清除阶段会将该文件清理。适用于V5.3之前的版本;
- KahaDB:基于日志文件的存储方式,V5.4+的默认持久化机制;
使用事务日志和索引文件来存储所有地址,消息被写入.log文件中,默认文件大小为32M,如果.log中的消息不再被引用时则被删除或者归档;
.data文件:包含了持久化的索引信息,内中记录的索引指向了.log中的具体消息。本质是一个BTree;
.free文件:记录了.data文件中记录的空页面的ID;
.redo文件:用于消息恢复,类似与mysql中的redoLog作用;
lock文件:记录了当前有读写权限的broker。
<persistenceAdapter>
<kahaDB directory="activemq-data" journalMaxFileLength="32mb"/>
</persistenceAdapter>
<!--
journalDiskSyncStrategy:磁盘同步策略:
always(默认):每次日志写入之后都有磁盘同步,即在每次消息写入时都需要记录日志,最安全,速度最慢;
periodic(间隔写入):在写入消息时,磁盘将按设定的时间间隔同步,默认间隔为1秒;此种情况下最多丢失1秒内的消息。
never:永远不会显式调用同步,它将由操作系统刷新到磁盘,此种策略最快,但是最不安全。
-->
- LevelDB:基于文件的本地数据库存储形式,不使用BTree实现来索引预写日志,使用基于LevelDB的索引。速度比kahaDB要快。
<persistenceAdapter>
<levelDB directory = "activemq-data">
</persistenceAdapter>
- JDBC:将持久化消息写入外部数据库中存储;
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds">
</persistenceAdapter>
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destory-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://xx:xx/activemq?relaxAutoCommit=true"/>
<property name="username" value="xxxx"/>
<property name="password" value="xxxx"/>
</bean>
使用dataSource指定数据库连接的bean,会在配置之后重启broker的时候在数据库中生成指定名称的三张表,用于消息存储和记录;
ACTIVEMQ_MSGS:消息记录表:
对于Queue而言,消息会被保存在该表中,消息被消费之后,则会清楚该表中的记录;
对于Topic而言,消息在生产者发送之后,会记录在msgs表中,消费者消费之后不会被清除。
ACTIVEMQ_ACKS:消息签收表:对于Topic,有消费者订阅之后,会写入ack表中;
ACTIVEMQ_LOCK:消息锁定表:记录的是当前集群中的主节点的ID。
- jdbc+Journal机制:消息产生之后,会将消息写入到Jouranl文件中,消费者在数据同步到数据库之前消费了部分消息,那么只有剩下的部分消息需要同步到数据库,减少了数据库的写入频率,效果更好。
传输协议
activeMQ支持的传输协议有很多:TCP/NIO/AMQP/MQTT等等;
- TCP:默认协议,端口占用为61616;传输可靠性高,稳定性强。
tcp://hostname:port?key=value&key=value...
- NIO:性能更好的类TCP协议,可用于性能优化,更适合于高频访问及负载更大的场景使用。
<transportConnectors>
<transportConnector name="nio" uri="nio://0.0.0.0:61616"/>
</transportConnectors>
- auto:auto协议可以覆盖原生的OpenWire(TCP),AMQP,STOMP,MQTT等协议;
<transportConnector name="auto" uri="auto://localhost:5671"/>
<!--也可以使用auto.protocols指定只能使用默认的OpenWire协议和stomp协议-->
<transportConnector name="auto" uri="auto://localhost:5671?auto.protocols=default,stomp"/>
- auto+xx:可以使用+连接auto和其他协议,即xx协议也可适配;
<!--此协议表示服务端接收OpenWire(TCP),AMQP,STOMP,MQTT、NIO协议-->
<transportConnector name="auto+nio" uri="auto+nio://localhost:5671"/>
集群-高可用
activeMQ使用集群实现高可用,常见的方式有基于zookeeper+LevelDB方式、基于JDBC、基于可复制的LevelDB。
基于可复制的LevelDB:为主从机制,主节点提供服务,从节点不提供服务,只提供数据复制。
高级特性
避免重复消费
根据消息ID进行判断,如果此ID已经被消费过,则无需再消费;
可将消息ID作为写入数据库的唯一主键,如果已经存在,则重复消费的记录无法写入到数据库中;
也可将已消费的消息ID及消息写入缓存,比如redis中,在新消息过来之后,只需要比对对应的消息ID,如果已经在缓存中存在,则无需处理,为重复消息,如果没有,则可以进行消费。
消息消费的重试机制
发生场景:
1.使用事务会话并且调用了rollback();
2.事务在commit()之前,会话关闭了;
3.会话使用CLIENT_ACKNOWLEDGE,且Session.recover()被调用;
4.客户端连接超时。
消息会间隔1秒进行重发,重发6次,如果6次之后还是失败,则会发送一条“posion ack”,来表示此消息有问题,broker不会重发,将该消息放入死信队列中。
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500); // 初始重新投递延时
policy.setBackOffMultiplier(2); // 重连时间间隔递增倍数
policy.setUseExponentialBackOff(true); // 是否启用指数递增的方式增加延迟时间
policy.setMaximumRedeliveries(3); // 最大重新投递次数
死信队列
用于存放异常消息的队列。
<!--对queue使用policyEntry中的“>”表示针对所有队列-->
<policyEntry queue=">">
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="false" />
</deadLetterStrategy>
</policyEntry>
相关属性:
1.processExpired:是否将过期消息放入死信队列,默认为true;
2.deadLetterQueue:指定死信队列的名称;
3.queuePrefix:指定死信队列的名称前缀;
4.useQueueForQueueMessages:是否将队列死信放入死信队列中;
5.useQueueForTopicMessages: 是否将Topic的死信放入死信队列中;
6.processNonPersistent:是否将非持久消息写入死信队列;
延迟或定时投递
适配需要延迟或者定时发送的场景;均在发送消息时设置,即在生产者端设置。
<!--在配置文件中broker节点中需要开启schedulerSupport属性支持-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" schedulerSupport="true" dataDirectory="${activemq.data}">
相关属性设置:
1.AMQ_SCHEDULED_DELAY:延迟投递的时间;Long
2.AMQ_SCHEDULED_PERIOD:重复投递的时间间隔;Long
3.AMQ_SCHEDULED_REPEAT:重复投递的次数;int
4.AMQ_SCHEDULED_CRON:使用cron表达式;String
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long delay = 30 * 1000;
long period = 10 * 1000;
int repeat = 9;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
producer.send(message)
异步发送
可以增加broker的吞吐量,但可能存在少量消息丢失的情况;
设置方式如下:
1.tcp://xx:61616?jms.useAsyncSend=true;
2.connectionFactory.setUseAsyncSend(true);
3.connecton.setUseAsyncSend(true);
该模式下需要接收消息回执并判断是否发送成功。