红泥小火炉


ActiveMQ

Nathaniel 2022-02-04 1120浏览 0条评论
首页/正文
分享到: / / / /

ActiveMQ

概述

ActiveMQ是一种开源的、多协议、基于Java的消息代理。

具备消息队列的特性:解耦、异步、消峰。即应用间通信耦合度降低,异步处理应用间消息,保护消费系统免受冲击。

安装

下载安装包解压即可使用,需要提前安装好对应的JDK;

相关操作均在对应的安装目录下的bin目录下进行;

  • 启停命令
# 启动
./activemq start 
# 停止
./activemq stop
# 指定日志启动,默认的日志记录在安装目录下data/activemq.log
./activemq start > /xx/xx.log
# 启动一个activemq实例(指定配置文件)
./activemq start xbean:file:/xx/xx.xml
  • 端口信息
进程默认占用端口为61616;
web界面控制台默认端口为8161;
  • web界面默认账户密码为admin/admin,也可以通过修改conf目录下的jetty-realm.properties文件添加用户或者修改密码;

image-20220201170123454

此处为添加一个zmyx用户(zmyx/zmyx),角色为admin;

Hello World

在消息模型中,存在以下角色:生产者,消费者,消息队列,消息;

消息生产者将消息发送到MQ,MQ以队列或者Topic的形式存储消息,消费者从MQ处消费消息;

将消息发送至队列(Queue)并消费

生产者:

package site.zhaoyangjue.mq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class QueueProducer {
    
    private static final String ACTIVE_URL = "tcp://192.168.80.82:61616";
    private static final String QUEUE_NAME = "messageQueue";

    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        // 创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 开启连接
        connection.start();
        // 获取session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 设置消息发送目的地,此处为queue
        Destination destination = session.createQueue(QUEUE_NAME);
        // 创建生产者
        MessageProducer producer = session.createProducer(destination);
        for (int i = 0; i < 5; i++) {
            // 创建消息
            TextMessage textMessage = session.createTextMessage("消息-" + i);
            // 发送消息
            producer.send(textMessage);
        }
        // 关闭session和连接
        session.close();
        connection.close();

        System.out.println("消息发送成功");
    }
}

此时可以从控制台上看到对应的消息队列中的信息:

image-20220201175700760

此处PendingMessage标识的是等待消费的消息;

​ Consumers标识消费者的数量,此时消费者端没有启动,所以此处为0;

​ Enqueued标识进入队列的消息,此处标识有5条消息进入队列;

​ Dequeued标识出队列的消息,因为没有消费者消费,此时数量为0;

消费者:

package site.zhaoyangjue.mq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class QueueConsumer {
    
    private static final String ACTIVE_URL = "tcp://192.168.80.82:61616";
    private static final String QUEUE_NAME = "messageQueue";

    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        // 创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 开启连接
        connection.start();
        // 获取session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 设置消息发送目的地,此处为queue
        Destination destination = session.createQueue(QUEUE_NAME);
        // 创建消费者
        MessageConsumer consumer = session.createConsumer(destination);
        // 接收消息
        while(true) {
            Message message = consumer.receive(4000L);
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("消费者接收到消息:" + textMessage.getText());
            } else {
                break;
            }
        }
        // 关闭session和connection
        session.close();
        connection.close();
        System.out.println("消息消费成功");
    }
}

消费者端执行之后,在消费者端可以看到已经获取到了消息,且MQ控制台上对应的消费数也发生了变化,表明消已成功消费:

image-20220201180254777

image-20220201180325594

基于监听机制的消费者:

package site.zhaoyangjue.mq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;

public class QueueConsumerWithListener {
    
    private static final String ACTIVE_URL = "tcp://192.168.80.82:61616";
    private static final String QUEUE_NAME = "messageQueue";

    public static void main(String[] args) throws JMSException, IOException {
        // 创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        // 创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 开启连接
        connection.start();
        // 获取session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 设置消息发送目的地,此处为queue
        Destination destination = session.createQueue(QUEUE_NAME);
        // 创建消费者
        MessageConsumer consumer = session.createConsumer(destination);
        // 设置监听并接收消息
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("消费者接收到消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        // 保持程序运行
        System.in.read();
        // 关闭session和connection
        session.close();
        connection.close();
        System.out.println("消息消费成功");
    }
}

将消息发送至主题(Topic)并消费

与此前队列的代码类似,只不过需要更改下消息的目的地;

// 生产者
Destination destination = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(destination);
// 消费者
Destination destination = session.createTopic(TOPIC_NAME);
MessageConsumer consumer = session.createConsumer(destination);

不同的是,Topic下,消费者需要先启动,生产者后启动。

多个消费者消费场景

  • 对于Queue而言,多个消费者同时消费,MQ会均衡分配消息进行消费;

    先运行的消费者会消费对应的消息,后运行的消费者则拿不到此前的消息。

  • 对于Topic而言,对于已订阅的消费者,MQ会将订阅之后的消息发送给各个消费者。

消息

一条完整的消息包含了消息头、消息体、消息属性三部分;

消息头

JMSDestination:消息发送的终点,Topic或者Queue;
JMSDeliveryMode:持久化模式和非持久化模式:持久化则表示服务宕机之后,消息不会丢失,非持久化表示服务宕机之后,消息会被丢失。
JMSExpiration:设置消息在一定时间后过期,默认是永不过期;
JMSPriority:消息优先级,0-9十个级别,默认为4级;
JMSMessageID:唯一标识一条消息;

消息体

消息格式有以下五种,常用的是TextMessage和MapMessage两种:
TextMessage:普通字符串消息;
MapMessage:一个Map类型的消息,Key为String,Value为基本类型;
BytesMessage:二进制数组消息;
StreamMessage:Java数据流消息,用标准流操作来顺序的填充和读取;
ObjectMessage:对象消息,包含一个可序列化的java对象;

消息属性

除了消息头设置的字段之外的其他值,都可以作为消息属性。

消息的可靠性从三方面可以体现:消息持久化、消息事务、消息签收;

消息持久化

ActiveMQ下Queue默认是持久化的,Topic默认是非持久化的;

Topic持久化之后,只要消费者订阅过,都可以消费到订阅之后的消息;

// 此为订阅操作,订阅之后消费者可以下线,也可以在线;
connection.setClientID("消费者1号");
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"sub1");

消息事务

生产者和消费者均可以设置消息事务:
生产者设置事务,只有消息被commit之后消息才会被提交,否则不会被提交;此处的true即为开启事务
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 提交事务
session.commit();
// 异常之后事务回滚
session.rollback();

消息签收

在有事务的情况下:事务被成功提交则消息自动被签收;事务回滚,则消息会被再次传送;
在没有事务的情况下:消息的确认需要由应答模式来决定,默认为自动签收,如果设置了手动签收,则以手动签收为主。
综述:消息事务高于消息签收。

消息持久化机制

Activemq的持久化机制有AMQ、KahaDB、LevelDB、JDBC等方式;

  • AMQ:基于文件的存储方式,即将消息写入指定文件来实现;文件大小默认为32M,当文件中的消息被消费了之后,就会被标记为清除,在清除阶段会将该文件清理。适用于V5.3之前的版本;
  • KahaDB:基于日志文件的存储方式,V5.4+的默认持久化机制;
使用事务日志和索引文件来存储所有地址,消息被写入.log文件中,默认文件大小为32M,如果.log中的消息不再被引用时则被删除或者归档;
.data文件:包含了持久化的索引信息,内中记录的索引指向了.log中的具体消息。本质是一个BTree;
.free文件:记录了.data文件中记录的空页面的ID;
.redo文件:用于消息恢复,类似与mysql中的redoLog作用;
lock文件:记录了当前有读写权限的broker。
<persistenceAdapter>
   <kahaDB directory="activemq-data" journalMaxFileLength="32mb"/>
</persistenceAdapter>
<!--
journalDiskSyncStrategy:磁盘同步策略:
always(默认):每次日志写入之后都有磁盘同步,即在每次消息写入时都需要记录日志,最安全,速度最慢;
periodic(间隔写入):在写入消息时,磁盘将按设定的时间间隔同步,默认间隔为1秒;此种情况下最多丢失1秒内的消息。
never:永远不会显式调用同步,它将由操作系统刷新到磁盘,此种策略最快,但是最不安全。
-->
  • LevelDB:基于文件的本地数据库存储形式,不使用BTree实现来索引预写日志,使用基于LevelDB的索引。速度比kahaDB要快。
<persistenceAdapter>
	<levelDB directory = "activemq-data">
</persistenceAdapter>
  • JDBC:将持久化消息写入外部数据库中存储;
<persistenceAdapter>
	<jdbcPersistenceAdapter dataSource="#mysql-ds">
</persistenceAdapter>

<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destory-method="close">
	<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
	<property name="url" value="jdbc:mysql://xx:xx/activemq?relaxAutoCommit=true"/>
	<property name="username" value="xxxx"/>
	<property name="password" value="xxxx"/>
</bean>

使用dataSource指定数据库连接的bean,会在配置之后重启broker的时候在数据库中生成指定名称的三张表,用于消息存储和记录;

ACTIVEMQ_MSGS:消息记录表:
	对于Queue而言,消息会被保存在该表中,消息被消费之后,则会清楚该表中的记录;
	对于Topic而言,消息在生产者发送之后,会记录在msgs表中,消费者消费之后不会被清除。
ACTIVEMQ_ACKS:消息签收表:对于Topic,有消费者订阅之后,会写入ack表中;
ACTIVEMQ_LOCK:消息锁定表:记录的是当前集群中的主节点的ID。
  • jdbc+Journal机制:消息产生之后,会将消息写入到Jouranl文件中,消费者在数据同步到数据库之前消费了部分消息,那么只有剩下的部分消息需要同步到数据库,减少了数据库的写入频率,效果更好。

传输协议

activeMQ支持的传输协议有很多:TCP/NIO/AMQP/MQTT等等;

  • TCP:默认协议,端口占用为61616;传输可靠性高,稳定性强。
tcp://hostname:port?key=value&key=value...
  • NIO:性能更好的类TCP协议,可用于性能优化,更适合于高频访问及负载更大的场景使用。
<transportConnectors>
    <transportConnector name="nio" uri="nio://0.0.0.0:61616"/>  
</transportConnectors>
  • auto:auto协议可以覆盖原生的OpenWire(TCP),AMQP,STOMP,MQTT等协议;
<transportConnector name="auto" uri="auto://localhost:5671"/>
<!--也可以使用auto.protocols指定只能使用默认的OpenWire协议和stomp协议-->
<transportConnector name="auto" uri="auto://localhost:5671?auto.protocols=default,stomp"/>
  • auto+xx:可以使用+连接auto和其他协议,即xx协议也可适配;
<!--此协议表示服务端接收OpenWire(TCP),AMQP,STOMP,MQTT、NIO协议-->
<transportConnector name="auto+nio" uri="auto+nio://localhost:5671"/>

集群-高可用

activeMQ使用集群实现高可用,常见的方式有基于zookeeper+LevelDB方式、基于JDBC、基于可复制的LevelDB。

基于可复制的LevelDB:为主从机制,主节点提供服务,从节点不提供服务,只提供数据复制。

高级特性

避免重复消费

根据消息ID进行判断,如果此ID已经被消费过,则无需再消费;
	可将消息ID作为写入数据库的唯一主键,如果已经存在,则重复消费的记录无法写入到数据库中;
	也可将已消费的消息ID及消息写入缓存,比如redis中,在新消息过来之后,只需要比对对应的消息ID,如果已经在缓存中存在,则无需处理,为重复消息,如果没有,则可以进行消费。

消息消费的重试机制

发生场景:
	1.使用事务会话并且调用了rollback();
	2.事务在commit()之前,会话关闭了;
	3.会话使用CLIENT_ACKNOWLEDGE,且Session.recover()被调用;
	4.客户端连接超时。
消息会间隔1秒进行重发,重发6次,如果6次之后还是失败,则会发送一条“posion ack”,来表示此消息有问题,broker不会重发,将该消息放入死信队列中。
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500); // 初始重新投递延时
policy.setBackOffMultiplier(2); // 重连时间间隔递增倍数
policy.setUseExponentialBackOff(true); // 是否启用指数递增的方式增加延迟时间
policy.setMaximumRedeliveries(3); // 最大重新投递次数

死信队列

用于存放异常消息的队列。
<!--对queue使用policyEntry中的“>”表示针对所有队列-->
<policyEntry queue=">">
   <deadLetterStrategy>
      <sharedDeadLetterStrategy processExpired="false" />
   </deadLetterStrategy>
</policyEntry>
相关属性:
	1.processExpired:是否将过期消息放入死信队列,默认为true;
	2.deadLetterQueue:指定死信队列的名称;
	3.queuePrefix:指定死信队列的名称前缀;
	4.useQueueForQueueMessages:是否将队列死信放入死信队列中;
	5.useQueueForTopicMessages: 是否将Topic的死信放入死信队列中;
	6.processNonPersistent:是否将非持久消息写入死信队列;

延迟或定时投递

适配需要延迟或者定时发送的场景;均在发送消息时设置,即在生产者端设置。
<!--在配置文件中broker节点中需要开启schedulerSupport属性支持-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" schedulerSupport="true" dataDirectory="${activemq.data}">
相关属性设置:
	1.AMQ_SCHEDULED_DELAY:延迟投递的时间;Long
	2.AMQ_SCHEDULED_PERIOD:重复投递的时间间隔;Long
	3.AMQ_SCHEDULED_REPEAT:重复投递的次数;int
	4.AMQ_SCHEDULED_CRON:使用cron表达式;String
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long delay = 30 * 1000;
long period = 10 * 1000;
int repeat = 9;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
producer.send(message)

异步发送

可以增加broker的吞吐量,但可能存在少量消息丢失的情况;

设置方式如下:

1.tcp://xx:61616?jms.useAsyncSend=true;
2.connectionFactory.setUseAsyncSend(true);
3.connecton.setUseAsyncSend(true);

该模式下需要接收消息回执并判断是否发送成功。

最后修改:2022-02-04 18:11:34 © 著作权归作者所有
上一篇

评论列表

还没有人评论哦~赶快抢占沙发吧~