Loading...
墨滴

oldfarmer

2021/05/13  阅读:65  主题:灵动蓝

RocketMQ 简单入门

RocketMQ 是阿里巴巴使用 Java 开发并开源的一款消息中间件。由于阿里巴巴的本地优势以及 Java 语言的优势,越来越多国内的公司将消息队列从 Kafka 切换到 RocketMQ。由本人最近面过的一些大厂来看,面试中的框架源码部分已经越来越多的向中间件问题迁移,个人认为这也是合乎情理的,毕竟中间件是一种更通用的问题解决思路,而不是像框架一样只局限于语言(当然像 Spring 这种优秀的框架其中有很多的优秀的设计思想值得我们学习)。Java 开发常用的中间件包括消息队列、RPC、注册中心等。个人认为其中消息队列的使用场景、原理都是最容易理解的,所以从消息队列开始了解中间件是个不错的选择。

在 MQ 中, Kafka 目前是基于 Scala 实现,虽然也属于 JVM 系语言,不难理解。但是 Java 实现的 RocketMQ 对于个人来说无疑是更好的选择,遇到问题时能够直接跟进 DEBUG,追寻实现思路,而不用担心被一些其它语言的骚写法弄迷糊。

最近也在网上看到 Apache Pulsar 逐渐兴起。它最大的一个特点是将流消息模型与队列消息模型整合了,所以能够支持两种场景。除此之外,它提供了 Kafka-On-Pulsar(KoP) 组件支持 Kafka 应用程序,用户不用改动客户端的任何 Kafka 代码就能直接使用 Pulsar,算是促进用户尝鲜的一种方式,类似于 TiDB 对 MySQL 的兼容。但是目前中文网上关于 Pulsar 的文章较少,所以暂时先不多做说明。

所以,从本篇开始,我将从认识到实践,再到底层原理,一步步揭开 RocketMQ 消息中间件的内幕。

RocketMQ 与 Kafka 的比较

其实常常讨论的 MQ 包括四种:ActiveMQ、RabbitMQ、Kafka、RocketMQ。

其中 ActiveMQ 由于出现时间较早,性能相对较差,社区目前已经不活跃了,所以不再推荐使用。

RabbitMQ 吞吐量方面稍逊于 Kafka 和 RocketMQ,但是它基于 erlang 开发,性能极好,并发能力强,延时极低,达到微秒级别。但是这也是它的致命缺点,erlang 在国内相当于黑洞语言,能基于 erlang 对 RabbitMQ 做改造与定制需求的公司少之又少,所以在国内不那么活跃。

所以对于剩下的两种,我们来进行一个全面的对比。

首先抛出一个个人观点:由于 Kafka 天然适合大数据领域及日志采集,所以为了方便,许多国内公司同时也选择了它作为 MQ 组件。

回到正题,

  • 吞吐量

    Kakfa 单机写入 10 字节大小消息能达到百万/秒,而同等条件下 RocketMQ 最高为 12 万条/秒。这两者都能满足要求,毕竟单机消息到达 几万条/秒时,意味着我们该对服务器水平扩展了。

  • 时效性

    Kafka 与 RocketMQ 的延迟都是 ms 级

  • 功能性

    Kafka 支持的功能不多,主要是围绕消息队列收发消息进行处理。而 RocketMQ 则功能毕竟完善,对于消息查询,事务消息等支持更加完善。

综上所述,Kafka 是个相对毕竟纯粹的消息收发系统,毕竟它一开始的目的只是用于日志收集和传输。而 RocketMQ 有更多扩展功能,适用于金融等对于可靠性要求更高的场景,所以这两者的选择主要取决于业务的数据可靠性与复杂度。

RocketMQ 组件介绍

RocketMQ 主要包括四个部分:Producer、Consumer、Name Server、Broker。

  • Producer:消息发布者,通过 Name Server 获取 Broker 集群的路由信息,从而进行消息的投递,投递的过程支持快速失败

  • Consumer:消息消费者。支持以推、拉两种模式对消息进行消费。也支持广播方式的消费

  • Name Server:Topic 路由注册中心,与 Kafka 中 zookeeper 一致(最新版本 Kafka 已经移除与 zk 的绑定),支持 Broker 的动态注册与发现。NameServer 会告诉生产者与消费者整个 Broker 集群的消息。并对 Broker 信息进行管理,提供心跳机制,检测 Broker 是否存活,并保持存活 Broker 的注册信息以提供给 Producer 和 Consumer 路由使用

  • Broker:负责消息的存储、投递的查询以及服务高可用的保证

先对这些组件及它的功能有个印象即可,后续会以组件的每个功能为维度来进行底层源码的分析。

RocketMQ 使用实例

运行依赖配置

访问 RocketMQ Quick Start,按照文档逐步操作。

首先是环境要求:

  1. 64 位操作系统
  2. 64位 JDK 1.8 及以上版本
  3. Maven 3.2.x 及以上版本
  4. Git
  5. 4g 以上空闲磁盘空间

本人使用腾讯云服务器, CentOS 系统搭建。

首先下载 4.8.0-source-release,这是源码包,我们需要解压后,使用 maven 编译。

操作命令如下:

> unzip rocketmq-all-4.8.0-source-release.zip
> cd rocketmq-all-4.8.0/
> mvn -Prelease-all -DskipTests clean install -U
> cd distribution/target/rocketmq-4.8.0/rocketmq-4.8.0

此时看到的文件夹目录如下图所示:

directory

其中 conf 目录下存放着 broker 相关的配置文件,包括集群相关的配置文件。bin 目录下存放着 nameserver、broker 等组件的脚本。

在官方文档中,下一步就可以开始启动 nameserver 了,但是 bin/runserver.sh 与 bin/runbroker.sh 中默认配置的都是以 G 为单位来配置堆内存的,对于弱鸡的云服务器来说,显然是没有这么充裕的资源的。所以我们需要修改其中的部分虚拟机配置参数。

更改 bin/runserver.sh,将其中的堆大小相关参数修改为

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

修改 bin/runbroker.sh,将其中的堆大小相关参数修改为

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"

在更改完运行脚本之后,对于阿里云、腾讯云等用户,需要在 broker 配置文件中指定 brokerIP 为公网 IP,这样在其他机器上也能访问到该 broker,修改 conf/broker.conf 为以下内容

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 允许自动创建不存在的 Topic,生产环境建议关闭
autoCreateTopicEnable=true
# 对于云服务器部署 RocketMQ 用户,需要将此处指定为公网 IP
brokerIP1=111.231.26.137
# 指定连接的 nameserver 地址,启动时不需要再加相关参数
namesrvAddr=111.231.26.137:9876

Linux 运行 RocketMQ

接下来,按照官网 Quick Start 以后台方式运行 namesrv 以及 broker。

# 后台运行 namesrv
nohup sh bin/mqnamesrv &
# 查看 namesrv 进程是否正常启动
jps
# 查看 namesrv 启动日志是否正常
tail -f ~/log/rockegmqlog/namesrv.log

运行成功结果如下图所示:

run namesrv

Namesrv.log 默认路径就存在 ~/log/rockegmqlog/ 下,运行时可能存在 RocketMQ 没有权限在指定的目录下创建 namesrv.log 的情况,可以先用 admin 用户创建相关目录之后,再运行 namesrv.sh。

接下来启动 broker

# 后台运行 mqbroker,-c 命令用于指定本次启动使用的配置文件
nohup sh bin/mqbroker -c conf/broker.conf &
# jps 查看 broker 进程是否正常启动
jps
# 检查 broker 启动日志
tail -f ~/logs/rocketmqlogs/broker.log 

运行成功结果如下:

run broker

启动成功之后,接下来应该模拟发送与接收消息。不过在此之前,我们需要做两件事。

第一是修改 bin/tools.sh 中的虚拟机运行参数,将堆内存相关设置为如下:

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"

第二是需要设置环境变量 NAMESRV_ADDR。使用 export 来设置,可以确保只有本次登录时,该环境变量有效。

export NAMESRV_ADDR=localhost:9876

接下来使用测试来发送消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

其中的 Producer 类会尝试发送 100 条消息,执行情况如下

send test success
send test success

看到 SendResult [sendStatus=SEND_OK... 表示消息发送成功。

使用消费者进行消费:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
consumer test success
consumer test success

看到 Receive New Messages: [MessageExt [ 表示消费成功了。

除了运行之外,我们还需要知道如何优雅地结束 namesrv 与 broker。之前使用过 kill -9 杀死对应的进程,导致出现了很多网上都查不到解决方案的错误,所以两个字:优雅!

先关闭 broker,再关闭 namesrv

> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

>
 sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK

之后会一直运行云服务器上的 namesrv 与 broker ,应用程序都会连接到云服务器上部署的 RocketMQ 上

应用发送消息

创建一个 SpringBoot 应用,在 pom.xml 中加入 RocketMQ 依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

先创建一个 普通 Java 程序测试发送:

package me.oldfarmer.lab.rocketmq.producer;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

public class Producer {

    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("custom-producer");
        producer.setNamesrvAddr("111.231.26.137:9876");
        producer.start();

        Message message = new Message("topicA""Hello world".getBytes(StandardCharsets.UTF_8));
        SendResult sendResult = producer.send(message);
        System.out.println(sendResult);

        producer.shutdown();
    }
}

代码都比较简单,所以略去了注释。

运行结果如下:

producer main result
producer main result

发送成功。

创建一个消费者

package me.oldfarmer.lab.rocketmq.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("custom_consumer");
        consumer.setNamesrvAddr("111.231.26.137:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//从最后开始消费

        consumer.subscribe("topic""*"); //消费 topic 下所有消息
        // 注册消息监听器,在收到消息时进行处理
        consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            MessageExt messageExt = list.get(0);
            try {
                System.out.println(messageExt);
            } catch (Exception e) {
                e.printStackTrace();
                // 需要根据消息处理结果告知 broker 对该消息处理方式
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
    }
}

运行结果如下:

consumer main result
consumer main result

这里总共打印了四条是之前测试多发送了几次。

结果说明消费者在消费到消息之后按照,监听器对消息进行了处理。


到这里 RocketMQ 的简单入门就结束啦,相对来说比较容易。之后会根据主要的步骤来解析每个组件起到了什么样的作用。

oldfarmer

2021/05/13  阅读:65  主题:灵动蓝

作者介绍

oldfarmer