ActiveMQ

一. 初步安装使用

ActiveMQ 的官网 : http://activemq.apache.org

ActiveMQ

扩展出:

​ API 接受发送

​ MQ 的高可用

​ MQ 的集群容错配置

​ MQ 的持久化

​ 延时发送

​ 签收机制

​ Spring/SpringBoot 整合

​ 等

​ // MQ 都需要满足的技术

​ MQ : 消息中间件/消息队列

​ 为什么要使用 MQ ?

​ 解决了耦合调用、异步模型、抵御洪峰流量,保护了主业务,消峰。

​ 在linux 的opt 目录下上传 mq 的压缩包,(使用vmware-tools 上传的)

​ 并且将压缩包放到 /myactivemq 下

​ 直接进入myactivemq 的 文件下的activemq 下的 bin 目录,使用 ./activemq start 命令启动

​ 检查activemq 是否启动的三种方法: 也是三种查看后台进程的方法

ps -ef|grep activemq|grep -v grep      // grep -v  grep 可以不让显示grep 本来的信息
netstat -anp|grep 61616    // activemq 的默认后台端口是61616
lsof -i:61616

​ 让启动的日志信息不在控制台打印,而放到专门的文件中:

./activemq start >  /myactivemq/myrunmq.log

二 . 部署和代码尝试

​ \1. 部署在linux 上的acvtiveMQ 要可以通过前台windows 的页面访问,必须把linux 的IP和 windows的 IP 地址配置到同一个网关下 。这种情况一般都是修改 linux 的IP 地址,修改网卡文件对应的IP 地址

​ 修改linux 的ip 地址:

cd   /etc/sysconfig/network-scripts
vi  ifcfg-eth0 

img

​ 这是修改之后的网卡文件配置,IP 地址为:192.168.17.3 (因为我的windows 的IP 地址为192.168.17.1,将他们配置在了同一个网关下)

​ 配置成功后 ,可以用 windows ping linux , linux ping windows ,当全部ping 通后,可以使用图形化界面访问activeMQ

​ // ActiveMQ 的前台端口为 8161 , 提供控制台服务 后台端口为61616 ,提供 JMS 服务

img

​ // 192.168.17.3 为 linux 的IP 地址, 使用 IP+端口 访问了ActiveMQ , 登陆之后的样子如上。(能访问成功首先得在linux 上启动activeMQ 的服务),首次登录的默认账户密码为 账号:admin 密码:admin

​ 访问不到的坑: 1 可能是你的linux 和 windows 没有在一个网关下

​ 2 可能你windows 的防火墙或者 linux 的防火墙没有关掉(是的,先得关掉防火墙)

​ 3 你忘记启动activemq 的服务了

​ 4 你启动失败了,可能是你得java 环境没配好,必须是jdk 8 或者以上

​ JMS : Java 消息中间件的服务接口规范,activemq 之上是 mq , 而 mq 之上是JMS 定义的消息规范 。 activemq 是mq 技术的一种理论实现(与之相类似的实现还有 Kafka RabbitMQ RockitMQ ),而 JMS 是更上一级的规范。

img

​ 在点对点的消息传递时,目的地称为 队列 queue

​ 在发布订阅消息传递中,目的地称为 主题 topic

​ \2. demo 初试 一个简单的生产者消费者

​ 生产者:

public class JmsProduce {
            //  linux 上部署的activemq 的 IP 地址 + activemq 的端口号
    public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws  Exception{
        // 1 按照给定的url创建连接工程,这个构造器采用默认的用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2 通过连接工厂连接 connection  和 启动
        javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
        //  启动
        connection.start();
        // 3 创建回话  session
        // 两个参数,第一个事务, 第二个签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4 创建目的地 (两种 : 队列/主题   这里用队列)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5 创建消息的生产者
        MessageProducer messageProducer = session.createProducer(queue);
        // 6 通过messageProducer 生产 3 条 消息发送到消息队列中
        for (int i = 1; i < 4 ; i++) {
            // 7  创建字消息
            TextMessage textMessage = session.createTextMessage("msg--" + i);
            // 8  通过messageProducer发布消息
            messageProducer.send(textMessage);
        }
        // 9 关闭资源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** 消息发送到MQ完成 ****");
    }
}

img

以及在页面上的显示:

img

与之相对应的消息消费者(处理消息的系统)代码及运行

public class JmsConsumer {
    public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
    public static final String QUEUE_NAME = "queue01";   // 1对1 的队列
    public static void main(String[] args) throws Exception{
        // 1 按照给定的url创建连接工程,这个构造器采用默认的用户名密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2 通过连接工厂连接 connection  和 启动
        javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
        //  启动
        connection.start();
        // 3 创建回话  session
        // 两个参数,第一个事务, 第二个签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4 创建目的地 (两种 : 队列/主题   这里用队列)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5 创建消息的消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        while(true){
            // 这里是 TextMessage 是因为消息发送者是 TextMessage , 接受处理的
            // 也应该是这个类型的消息
            TextMessage message = (TextMessage)messageConsumer.receive();
            if (null != message){
                System.out.println("****消费者的消息:"+message.getText());
            }else {
                break;
            }
        }
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

img

img

​ 这个代表有一个消息消费者处理消息,并且处理了三条消息

     // 通过监听的方式来消费消息
  // 通过异步非阻塞的方式消费消息
  // 通过messageConsumer 的setMessageListener 注册一个监听器,
  // 当有消息发送来时,系统自动调用MessageListener 的 onMessage 方法处理消息
  messageConsumer.setMessageListener(new MessageListener() {   // 可以用监听器替换之前的同步receive 方法
      public void onMessage(Message message)  {
              if (null != message  && message instanceof TextMessage){
                  TextMessage textMessage = (TextMessage)message;
                  try {
               System.out.println("****消费者的消息:"+textMessage.getText());
              }catch (JMSException e) {
                      e.printStackTrace();
                  }
          }
      }
  });

这里的一点经验: activemq 好像自带负载均衡,当先启动两个队列(Queue)的消费者时,在启动生产者发出消息,此时的消息平均的被两个消费者消费。 并且消费者不会消费已经被消费的消息(即为已经出队的消息)

​ 但是当有多个主题(Topic)订阅者时,发布者发布的消息,每个订阅者都会接收所有的消息。topic 更像是被广播的消息,但是缺点是不能接受已经发送过的消息。

img

​ 先要有订阅者,生产者才有意义。

三 . JMS

1.JAVAEE 是一套使用Java 进行企业级开发的13 个核心规范工业标准 , 包括:

JDBC 数据库连接

JNDI Java的命名和目录接口

EJB Enterprise java bean

RMI 远程方法调用 一般使用TCP/IP 协议

Java IDL 接口定义语言

JSP

Servlet

XML

JMS Java 消息服务

JTA

JTS

JavaMail

JAF

JMS 部件 JMS provider JMS producer JMS consumer JMS message
含义 实现JMS 的消息中间件,也就是MQ服务器 消息生产者,创建和发送消息的客户端 消息消费者,接收和处理消息的客户端 JMS 消息,分为消息头、消息属性、消息体

5 个主要的消息头

消息头 JMSDestination JMSDeliveryMode JMSExpiration JMSPriority JMSMessageId
含义 头在哪儿 是持久还是非持久 过期时间,默认永久 优先级,默认是4 有0~9 ,5-9 是紧急的,0-4 是普通的 唯一的消息ID

消息体;封装具体的消息数据

5 种消息体格式:

5种消息体 TextMessage Mapmessage BytesMessage StreamMessage ObjectMessage
含义 普通字符串消息,包含一个String Map 类型的消息, k-> String v -> Java 基本类型 二进制数组消息,包含一个byte[] Java 数据流消息,用标准流操作来顺序的填充读取 对象消息,包含一个可序列化的Java 对象

发送和接收的消息类型必须一致

消息属性:识别、去重、重点标注

\2. 如何保证消息的可靠性

​ JMS 可靠性:Persistent 持久性 、 事务 、 Acknowledge 签收

2.1 持久化

// 在队列为目的地的时候持久化消息
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 队列为目的地的非持久化消息
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

持久化的消息,服务器宕机后消息依旧存在,只是没有入队,当服务器再次启动,消息任就会被消费。

但是非持久化的消息,服务器宕机后消息永远丢失。 而当你没有注明是否是持久化还是非持久化时,默认是持久化的消息。

对于目的地为主题(topic)来说,默认就是非持久化的,让主题的订阅支持化的意义在于:对于订阅了公众号的人来说,当用户手机关机,在开机后任就可以接受到关注公众号之前发送的消息。

代码实现:持久化topic 的消费者

           ……    // 前面代码相同,不复制了      
        Topic topic = session.createTopic(TOPIC_NAME);
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");
         // 5 发布订阅
        connection.start();
        Message message = topicSubscriber.receive();// 一直等
         while (null != message){
             TextMessage textMessage = (TextMessage)message;
             System.out.println(" 收到的持久化 topic :"+textMessage.getText());
             message = topicSubscriber.receive(3000L);    // 等1秒后meesage 为空,跳出循环,控制台关闭
         }
   ……

持久化生产者

          ……  
        MessageProducer messageProducer = session.createProducer(topic);
        // 6 通过messageProducer 生产 3 条 消息发送到消息队列中
        // 设置持久化topic 在启动
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); 
        connection.start();
        for (int i = 1; i < 4 ; i++) {
            // 7  创建字消息
            TextMessage textMessage = session.createTextMessage("topic_name--" + i);
            // 8  通过messageProducer发布消息
            messageProducer.send(textMessage);
            MapMessage mapMessage = session.createMapMessage();
            //    mapMessage.setString("k1","v1");
            //     messageProducer.send(mapMessage);
        }
        // 9 关闭资源
      …… 

img

当生产者启动后:

img

消息被消费,并且: (因为我在receive方法中设置了如果接收到消息后3秒还没有消息就离线,也也可以设置永久存活)

img

2.2 事务

​ createSession的第一个参数为true 为开启事务,开启事务之后必须在将消息提交,才可以在队列中看到消息

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

提交:

session.commit(); 

事务开启的意义在于,如果对于多条必须同批次传输的消息,可以使用事务,如果一条传输失败,可以将事务回滚,再次传输,保证数据的完整性。

对于消息消费者来说,开启事务的话,可以避免消息被多次消费,以及后台和服务器数据的不一致性。举个栗子:

如果消息消费的 createSession 设置为 ture ,但是没有 commit ,此时就会造成非常严重的后果,那就是在后台看来消息已经被消费,但是对于服务器来说并没有接收到消息被消费,此时就有可能被多次消费。

2.3 Acknowledge 签收 (俗称ack)

​ 非事务 :

Session.AUTO_ACKNOWLEDGE      自动签收,默认
Session.CLIENT_ACKNOWLEDGE     手动签收
手动签收需要acknowledge   
textMessage.acknowledge();

而对于开启事务时,设置手动签收和自动签收没有多大的意义,都默认自动签收,也就是说事务的优先级更高一些。

Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//Session session = connection.createSession(true,Session.CLIENT_ACKNOWLEDGE);   //  也是自动签收   
        ……
session.commit();  

但是开启事务没有commit 任就会重复消费

小知识: broker

broker 就是实现了用代码形式启动 ActiveMQ 将 MQ 内嵌到 Java 代码中,可以随时启动,节省资源,提高了可靠性。

就是将 MQ 服务器作为了 Java 对象

使用多个配置文件启动 activemq

cp activemq.xml  activemq02.xml 
// 以active02 启动mq 服务器
./activemq start xbean:file:/myactivemq/apache-activemq-5.15.9/conf/activemq02.xml 

把小型 activemq 服务器嵌入到 java 代码: 不在使用linux 的服务器

需要的包:

<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.9.5</version>
</dependency>

代码实现:

public class Embebroker {
    public static void main(String[] args) throws Exception {
        // broker 服务
        BrokerService brokerService = new BrokerService();
        // 把小型 activemq 服务器嵌入到 java 代码
        brokerService.setUseJmx(true);
        // 原本的是 192.……  是linux 上的服务器,而这里是本地windows 的小型mq 服务器
        brokerService.addConnector("tcp://localhost:61616");
        brokerService.start();
    }
}

四. Spring / SpringBoot 整合 ActiveMQ

\1. 对 Spring 的整合

1.1 所需jar 包

<!--  activeMQ  jms 的支持  -->
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-jms</artifactId>
    <version>4.3.23.RELEASE</version>
</dependency>
<dependency>    <!--  pool 池化包相关的支持  -->
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-pool</artifactId>
  <version>5.15.9</version>
</dependency>
<!--  aop 相关的支持  -->
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-core</artifactId>
  <version>4.3.23.RELEASE</version>
</dependency>
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
  <version>4.3.23.RELEASE</version>
</dependency>

1.2 写xml 文件 (applicationContext.xml)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://camel.apache.org/schema/spring"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
         http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/aop
     http://www.springframework.org/schema/aop/spring-aop.xsd
     http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
    <context:commponent-scan base-package="com.at.activemq"/>
    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"  destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp://192.168.17.3:61616"></property>
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>
    <!-- 队列目的地 -->
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="spring-active-queue"></constructor-arg>
    </bean>
    <!--  jms 的工具类 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsFactory"/>
        <property name="defaultDestination" ref="destinationQueue"/>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>
</beans>

1.3 编写代码:

@Service
public class SpringMQ_producer {
    @Autowired
    private JmsTemplate jmsTemplate;
    public static void main(String[] args) {
        ApplicationContext  ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
        SpringMQ_producer producer = (SpringMQ_producer) ctx.getBean("springMQ_Producer");
        producer.jmsTemplate.send((session) -> {
            TextMessage textMessage = session.createTextMessage("spring 和 activemq 的整合");
            return textMessage;
        });
        System.out.println(" *** send task over ***");
    }
}
@Service
public class Spring_MQConsummer {
    @Autowired
    private JmsTemplate jmsTemplate;
    public static void main(String[] args) {
        ApplicationContext  ac = new ClassPathXmlApplicationContext("applicationContext.xml");
        Spring_MQConsummer  sm = (Spring_MQConsummer)ac.getBean("spring_MQConsummer");
        String s = (String) sm.jmsTemplate.receiveAndConvert();
        System.out.println(" *** 消费者消息"+s);
    }
}

并且可以在spring 中设置监听器,不用启动消费者,就可以自动监听到消息,并处理

\2. Spring Boot 整合 ActiveMQ

2.1 建立boot 项目,配置 pom.xml 配置 application.yml 配置 bean

2.2 编写生产者 编写启动类 测试类

按键触发消息和定时发送消息的业务代码:

// 调用一次一个信息发出
public void produceMessage(){
    jmsMessagingTemplate.convertAndSend(queue,"****"+ UUID.randomUUID().toString().substring(0,6));
}
// 带定时投递的业务方法
@Scheduled(fixedDelay = 3000)    // 每3秒自动调用
public void produceMessageScheduled(){
    jmsMessagingTemplate.convertAndSend(queue,"** scheduled **"+ UUID.randomUUID().toString().substring(0,6));
    System.out.println("  produceMessage  send   ok   ");
}

对于消息消费者,在以前使用单独的监听器类,编写监听器代码,但是在spring boot 中,使用注解 JmsListener 即可:

@Component
public class Queue_consummer {
    @JmsListener(destination = "${myqueue}")     // 注解监听  
    public void receive(TextMessage textMessage) throws  Exception{
        System.out.println(" ***  消费者收到消息  ***"+textMessage.getText());
    }
}

这些是之前(队列)消息发送者发送的消息

img

2.3 编写消费者项目

2.4 编写主题的消息生产者和消费者项目,运行demo

代码地址:https://github.com/elstic/ActiveMQ

五. 协议

\1. ActiveMQ 支持的协议有 TCP 、 UDP、NIO、SSL、HTTP(S) 、VM

这是activemq 的activemq.xml 中配置文件设置协议的地方

 <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumCon    nections=1000&wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnect    ions=1000&wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConn    ections=1000&wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnect    ions=1000&wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnection    s=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

默认是使用 openwire 也就是 tcp 连接

默认的Broker 配置,TCP 的Client 监听端口 61616 ,在网络上传输数据,必须序列化数据,消息是通过一个 write protocol 来序列化为字节流。默认情况 ActiveMQ 会把 wire protocol 叫做 Open Wire ,它的目的是促使网络上的效率和数据快速交互 。

使用tcp 的一些优化方案:

​ tcp://hostname:port?key=value

它的参数详情参考:http://activemq.apache.org/tcp-transport-reference

\2. NIO 协议为ActiveMQ 提供更好的性能

适合NIO 使用的场景:

​ 1 当有大量的Client 连接到Broker 上 , 使用NIO 比使用 tcp 需要更少的线程数量,所以使用 NIO

​ 2 可能对于 Broker 有一个很迟钝的网络传输, NIO 的性能高于 TCP

连接形式:

​ nio://hostname:port?key=value

各种协议对比 : http://activemq.apache.org/configuring-version-5-transports.html

img

修改 activemq.xml 使之支持 NIO 协议:

 <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumCon    nections=1000&wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnect    ions=1000&wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConn    ections=1000&wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnect    ions=1000&wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnection    s=1000&wireFormat.maxFrameSize=104857600"/>
 <transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true"/>   <!-- 这是添加的 -->
 </transportConnectors>

img

而使用 NIO 协议,代码修改量极小,只需同时将消息生产者和消费者的 URL 修改即可:

//public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
public static final String ACTIVEMQ_URL = "nio://192.168.17.3:61618";

修改之后即可正确运行

\3. NIO 增强

URI 格式以 nio 开头,表示这个端口使用 tcp 协议为基础的NIO 网络 IO 模型,但这样设置让它只支持 tcp 、 nio 的连接协议。如何让它支持多种协议?

img

Starting with version 5.13.0, ActiveMQ supports wire format protocol detection. OpenWire, STOMP, AMQP, and MQTT can be automatically detected. This allows one transport to be shared for all 4 types of clients.

使用 : auto+nio+ssl

官网介绍 : http://activemq.apache.org/auto

使用 auto 的方式就相当于四合一协议 : STOMP AMQP MQTT TCP NIO

<transportConnector name="auto+nio" uri="auto+nio://localhost:5671"/>

auto 就像是一个网络协议的适配器,可以自动检测协议的类型,并作出匹配

<transportConnector name="auto" uri="auto://localhost:5671?auto.protocols=default,stomp"/>

配置文件修改:

        …… 
<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000
&wireFormat.maxFrameSize=104857600&org.apache.activemq.transport.nio.SelectorManager.corelPoolSize=20
&org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50"/>

连接:

img

消息发送成功

img

同样代码只需修改 URI

public static final String ACTIVEMQ_URL = "nio://192.168.17.3:61608";

对于 NIO 和 tcp 的代码相同,但不代表使用其他协议代码相同,因为底层配置不同,其他协议如果使用需要去修改代码

六. ActiveMQ 的可持久化

将MQ 收到的消息存储到文件、硬盘、数据库 等、 则叫MQ 的持久化,这样即使服务器宕机,消息在本地还是有,仍就可以访问到。

官网 : http://activemq.apache.org/persistence

ActiveMQ 支持的消息持久化机制: 带赋值功能的 LeavelDB 、 KahaDB 、 AMQ 、 JDBC

持久化就是高可用的机制,即使服务器宕机了,消息也不会丢失

AMQ 是文件存储形式,写入快、易恢复 默认 32M 在 ActiveMQ 5.3 之后不再适用

KahaDB : 5.4 之后基于日志文件的持久化插件,默认持久化插件,提高了性能和恢复能力

KahaDB 的属性配置 : http://activemq.apache.org/kahadb

它使用一个事务日志和 索引文件来存储所有的地址

img

db-<数字>.log 存储数据,一个存满会再次创建 db-2 db-3 …… ,当不会有引用到数据文件的内容时,文件会被删除或归档

db.data 是一个BTree 索引,索引了消息数据记录的消息,是消息索引文件,它作为索引指向了 db-.log 里的消息

一点题外话:就像mysql 数据库,新建一张表,就有这个表对应的 .MYD 文件,作为它的数据文件,就有一个 .MYI 作为索引文件。

db.free 存储空闲页 ID 有时会被清除

db.redo 当 KahaDB 消息存储在强制退出后启动,用于恢复 BTree 索引

lock 顾名思义就是锁

​ 四类文件+一把锁 ==》 KahaDB

LeavelDB : 希望作为以后的存储引擎,5.8 以后引进,也是基于文件的本地数据存储形式,但是比 KahaDB 更快

它比KahaDB 更快的原因是她不使用BTree 索引,而是使用本身自带的 LeavelDB 索引

题外话:为什么LeavelDB 更快,并且5.8 以后就支持,为什么还是默认 KahaDB 引擎,因为 activemq 官网本身没有定论,LeavelDB 之后又出了可复制的LeavelDB 比LeavelDB 更性能更优越,但需要基于 Zookeeper 所以这些官方还没有定论,任就使用 KahaDB

JDBC : 有一部分数据会真实的存储到数据库中

使用JDBC 的持久化,

①修改配置文件,默认 kahaDB

修改之前:

<persistenceAdapter>
       <kahaDB directory="${activemq.data}/kahadb"/>  
 </persistenceAdapter>

修改之后:

<persistenceAdapter>
      <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
 </persistenceAdapter>

②在activemq 的lib 目录下添加 jdbc 的jar 包 (connector.jar 我使用5.1.41 版本)

③ 修改配置文件 : activemq.xml 使其连接自己windows 上的数据库,并在本地创建名为activemq 的数据库

img

④ 让linux 上activemq 可以访问到 mysql ,之后产生消息。

​ ActiveMQ 启动后会自动在 mysql 的activemq 数据库下创建三张表:activemq_msgs 、activemq_acks、activemq_lock

​ activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存

​ activemq_lock:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker

​ activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中

点对点会在数据库的数据表 ACTIVEMQ_MSGS 中加入消息的数据,且在点对点时,消息被消费就会从数据库中删除

但是对于主题,订阅方式接受到的消息,会在 ACTIVEMQ_MSGS 存储消息,即使MQ 服务器下线,并在 ACTIVEMQ_ACKS 中存储消费者信息 。 并且存储以 activemq 为主,当activemq 中的消息被删除后,数据库中的也会自动被删除。

坑:

img

JDBC 改进: 加入高速缓存机制 Journal

高速缓存在 activemq.xml 中的配置:

img

持久化消息是指:

MQ 所在的服务器down 了消息也不会丢失

持久化机制演化过程:

从最初的AMQ Message Store 方案到 ActiveMQ V4版本推出的High performance journal (高性能事务)附件,并且同步推出了关系型数据库的存储方案, ActiveMQ 5.3 版本有推出了KahaDB 的支持,(也是5.4之后的默认持久化方案),后来ActiveMQ 从5.8开始支持LevelDB ,现在5.9 提供了 Zookeeper + LevelDB 的集群化方案。

ActiveMQ 消息持久化机制有:

AMQ 基于日志文件

KahaDB 基于日志文件,5.4 之后的默认持久化

JDBC 基于第三方数据库

LevelDB : 基于文件的本地数据库存储,从5.8 之后推出了LevelDB 性能高于 KahaDB

ReplicatedLevelDB Store 从5.8之后提供了基于LevelDB 和Zookeeper 的数据复制方式,用于Master-slave方式的首数据复制选方案

但是无论使用哪种持久化方式,消息的存储逻辑都一样

七. 多节点集群

大概流程:

img

如何保证高可用 ==》 搭建集群

ZooKeeper + Replicated LevelDB Store ==》

​ 集群 http://activemq.apache.org/replicated-leveldb-store

img

​ 这幅图的意思就是 当 Master 宕机后,zookeper 监测到没有心跳信号, 则认为 master 宕机了,然后选举机制会从剩下的 Slave 中选出一个作为 新的 Master

zookeper : 3.4.9 搭建zookeper 集群,搭建 activemq 集群

集群搭建: 新建 /mq_cluster 将原始的解压文件复制三个,修改端口 (jetty.xml)

img

增加IP 到域名的映射(/etc/hosts 文件)

img

修改 为相同的borkername

img

改为 replica levelDB (3个都配,这里列举一个)

<persistenceAdapter>
    <replicatedLevelDB
      directory="{activemq.data}/leveldb"
      replicas="3"
      bind="tcp://0.0.0.0:63631"
      zkAddress="localhost:2191,localhost:2192,localhost:2193"
      zkPassword="123456"
      sync="local_disk"
      zkPath="/activemq/leveldb-stores"
      hostname="wh-mq-server"
      />
  </persistenceAdapter>

改端口 02 节点 =》 61617 03 节点 =》 61618

img

​ 想要启动replica leavel DB 必须先启动所有的zookeper 服务,zookeper 的单机伪节点安装这里不细说了,主要说zookeper 复制三份后改配置文件,并让之自动生成 myid 文件,并将zk的端口改为之前表格中对应的端口 。这是conf 下的配置文件

img

其具体配置为:

tickTime=2000
initLimit=10
syncLimit=5
clientPort=2191    // 自行设置
server.1=192.168.17.3:2888:3888
server.2=192.168.17.3:2887:3887
server.3=192.168.17.3:286:3886
dataDir=/zk_server/data/log1    // 自行设置

我设置了三个,此时方便起见可以写批处理脚本

#!/bin/sh             // 注意这个必须写在第一行
cd /zk_server/zk_01/bin
./zkServer.sh  start
cd /zk_server/zk_02/bin
./zkServer.sh  start
cd /zk_server/zk_03/bin
./zkServer.sh  start                                                                                            

​ 编写这个 zk_batch.sh 之后, 使用

chmod  700    zk_batch.sh

命令即可让它变为可执行脚本, ./zk_batch.sh start 即可 (即启动了三个zk 的服务)

同理可以写一个批处理关闭zk 服务的脚本和 批处理开启mq 服务 关闭 mq 服务的脚本。

完成上述之后连接zk 的一个客户端

./zkCli.sh -server 127.0.0.1:2191

连接之后:

img

表示连接成功

​ 查看我的三个节点: 我的分别是 0…… 3 …… 4 …… 5

img

查看我的节点状态

get /activemq/leveldb-stores/00000000003

img

此次验证表明 00000003 的节点状态是master (即为63631 的那个mq 服务) 而其余的(00000004 00000005) activemq 的节点是 slave

如此集群顺利搭建成功 !

img

此次测试表明只有 8161 的端口可以使用 经测试只有 61 可以使用,也就是61 代表的就是master

img

测试集群可用性:

首先:

img

修改代码

public static final String ACTIVEMQ_URL = "failover:(tcp://192.168.17.3:61616,tcp://192.168.17.3:61617,
tcp://192.168.17.3:61618)?randomize=false";
public static final String QUEUE_NAME = "queue_cluster";

测试:

img

​ 测试通过连接上集群的 61616

MQ服务收到三条消息:

img

消息接收

img

MQ 服务也将消息出队

img

以上代表集群可以正常使用

此时真正的可用性测试:

杀死 8061 端口的进程 !!!

img

刷新页面后 8161 端口宕掉,但是 8162 端口又激活了

img

img

当 61616 宕机,代码不变发消息 自动连接到 61617 了

img

这样! 集群的可用性测试成功!

八. 终章:面试经验

1> 引入消息队列后 如何保证高可用性

​ 持久化、事务、签收、 以及带复制的 Leavel DB + zookeeper 主从集群搭建

2> 异步投递 Async send

​ 对于一个慢消费者,使用同步有可能造成堵塞,消息消费较慢时适合用异步发送消息

​ activemq 支持同步异步 发送的消息,默认异步。当你设定同步发送的方式和 未使用事务的情况下发持久化消息,这时是同步的。

​ 如果没有使用事务,且发送的是持久化消息,每次发送都会阻塞一个生产者直到 broker 发回一个确认,这样做保证了消息的安全送达,但是会阻塞客户端,造成很大延时 。

​ 在高性能要求下,可以使用异步提高producer 的性能。但会消耗较多的client 端内存,也不能完全保证消息发送成功。在 useAsyncSend = true 情况下容忍消息丢失。

//  开启异步投递
activeMQConnectionFactory.setUseAsyncSend(true);    

img

url 后面加参数

开启ActivemqFactury 的Async 为true

将connection 设Async 为true

​ > 如何在投递快还可以保证消息不丢失 ?

异步发送消息丢失的情况场景是: UseAsyncSend 为 true 使用 producer(send)持续发送消息,消息不会阻塞,生产者会认为所有的 send 消息均会被发送到 MQ ,如果MQ 突然宕机,此时生产者端尚未同步到 MQ 的消息均会丢失 。

故 正确的异步发送方法需要接收回调

同步发送和异步发送的区别就在于 :

同步发送send 不阻塞就代表消息发送成功

异步发送需要接收回执并又客户端在判断一次是否发送

在代码中接收回调的函数 :

activeMQConnectionFactory.setUseAsyncSend(true);
    ……  
 for (int i = 1; i < 4 ; i++) {
         textMessage = session.createTextMessage("msg--" + i);
      textMessage.setJMSMessageID(UUID.randomUUID().toString()+"--  orderr");
     String msgid = textMessage.getJMSMessageID();
            messageProducer.send(textMessage, new AsyncCallback() {
                @Override
                public void onSuccess() {
                    // 发送成功怎么样
                    System.out.println(msgid+"has been successful send ");
                }
                @Override
                public void onException(JMSException e) {
                    // 发送失败怎么样
                    System.out.println(msgid+" has been failure send ");
                }
            });
}    

img

3> 延迟投递和定时投递

① 在配置文件中设置定时器开关 为 true

img

② 代码编写

Java 代码中封装的辅助消息类型 ScheduleMessage

​ 可以设置的 常用参数 如下:

img

long delay = 3 * 1000 ;
long perid = 4 * 1000 ;
int repeat = 7 ;
for (int i = 1; i < 4 ; i++) {
    TextMessage textMessage = session.createTextMessage("delay msg--" + i);
    // 消息每过 3 秒投递,每 4 秒重复投递一次 ,一共重复投递 7 次
    textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);
    textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,perid);
    textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);
    messageProducer.send(textMessage);
}

4> ActiveMQ 的消息重试机制 ^_^ 好累啊,快完了,不想手写了,框一些图片吧

img

最多六次还没发出就会

加入DLQ (死信队列)

img

官网介绍 http://activemq.apache.org/redelivery-policy

​ > 死信队列的一些设置

​ 修改,当嫌6 次太多,设置为 3次 // 三次的意思是不计算本来发送的第一次 ,之后再次发送的第三次就被废弃

RedeliveryPolicy  redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);

在spring 中使用 死信机制

img

在业务逻辑中,如果一个订单系统没有问题,则使用正常的业务队列,当出现问题,则加入死信队列 ,此时可以选择人工干预还是机器处理 。

死信队列默认是全部共享的,但是也可以设置独立的死信队列

独立的死信队列配置

img

5> 如何保证消息不被重复消费,幂等性的问题

如果消息是做数据库的插入操作,给这个消息一个唯一的主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据 。

如果不是,可以用redis 等的第三方服务,给消息一个全局 id ,只要消费过的消息,将 id ,message 以 K-V 形式写入 redis ,那消费者开始消费前,先去 redis 中查询有没消费的记录即可。

img

项目地址: https://github.com/elstic/ActiveMQ

观察者模式 、 发布订阅者设计模式 :

观察者模式 : 对象间的一对多的依赖关系

何谓观察者模式?观察者模式定义了对象之间的一对多依赖关系,这样一来,当一个对象改变状态时,它的所有依赖者都会收到通知并且自动更新。

在这里,发生改变的对象称之为观察目标,而被通知的对象称之为观察者。一个观察目标可以对应多个观察者,而且这些观察者之间没有相互联系,所以么可以根据需要增加和删除观察者,使得系统更易于扩展。

​ 发布订阅者 : 是观察者模式的一个概念的变种,

发布/订阅者模式与观察者模式主要有以下几个不同点:

  1. 在观察者模式中,主体维护观察者列表,因此主体知道当状态发生变化时如何通知观察者。然而,在发布者/订阅者中,发布者和订阅者不需要相互了解。它们只需在中间层消息代理(或消息队列)的帮助下进行通信。
  2. 在发布者/订阅者模式中,组件与观察者模式完全分离。在观察者模式中,主题和观察者松散耦合。
  3. 观察者模式主要是以同步方式实现的,即当发生某些事件时,主题调用其所有观察者的适当方法。发布服务器/订阅服务器模式主要以异步方式实现(使用消息队列)。
  4. 发布者/订阅者模式更像是一种跨应用程序模式。发布服务器和订阅服务器可以驻留在两个不同的应用程序中。它们中的每一个都通过消息代理或消息队列进行通信。

Author: Juntech
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint polocy. If reproduced, please indicate source Juntech !
评论
  TOC