在 Spring 生态中玩转 RockMQ (it-ebooks) (Z-Library)

Author: it-ebooks

教育

No Description

📄 File Format: PDF
💾 File Size: 1.5 MB
32
Views
0
Downloads
0.00
Total Donations

📄 Text Preview (First 20 pages)

ℹ️

Registered users can read the full content for free

Register as a Gaohf Library member to read the complete e-book online for free and enjoy a better reading experience.

📄 Page 1
(This page has no text content)
📄 Page 2
RocketMQ中国社区钉钉群 微信扫码关注 阿里云开发者“藏经阁” 欢迎各位开发者进群交流、勘误 “阿里巴巴云原生公众号” 海量电子书免费下载
📄 Page 3
目录 开篇:在 Spring 生态中玩转 RocketMQ 4 RocketMQ Spring 最初的故事:罗美琪和春波特的故事... 13 RocketMQ-Spring 毕业两周年,为什么能成为 Spring 生态中最受欢迎的 messaging 实现 30 方法一:使用 rocketmq-spring-boot- starter 来配置、发送和消费 Rock etMQ消息 39 方法二:Spring Cloud Stream 体系及原理介绍:spring-cloud-stream- binder-rocektmq 53 方法三:Spring Cloud Bus 消息总线介绍 70
📄 Page 4
开篇:在Spring 生态中玩转RocketMQ < 4 开篇:在 Spring 生态中玩转RocketMQ Apache RocketMQ 作为阿里开源的业务消息的首选,通过双 11 业务打磨,在消息 和流处理领域被广泛应用。而微服务生态 Spring 框架也是业务开发中最受开发者欢迎的 框架之一,两者的完美契合使得 RocketMQ 成为 Spring Messaing 实现中最受欢迎的消 息实现。 在Spring生态中使用RocketMQ到底有多少种方式?他们各自适用于什么场景?各 自有什么优劣势? 如何开始实战?本书将一一解答。 我们先会带领各位开发者:  回顾罗美琪(RocketMQ)和春波特(SpringBoot)故事开始的时候,rocketmq- spring 经过 6个多月的孵化,作为 Apache RocketMQ 的子项目正式毕业。  回顾 rocketmq-spring 毕业后的两年,是如何成为 Spring 生态中最受欢迎的 messaging 实现的? 最后将通过图文和实操地方式带来给位开发者玩转在Spring生态中使用RocketMQ 的三种主流方式。
📄 Page 5
5 > 开篇:在 Spring 生态中玩转RocketMQ 所有动手实操的部分大家可以登录 start.aliyun.com 知行动手实验室免费体验。 开卷有益,希望各位开发者通过阅读本书、动手实操有所收获。 一、RocketMQ与 Spring 的碰撞 在介绍 RocketMQ 与 Spring 故事之前,不得不提到 Spring 中的两个关于消息的框 架,Spring Messaging 和 Spring Cloud Stream。它们都能够与 Spring Boot 整合并 提供了一些参考的实现。和所有的实现框架一样,消息框架的目的是实现轻量级的消息 驱动的微服务,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员 可以有更多的精力关注于核心业务逻辑的处理。 二、Spring Messaging Spring Messaging 是 Spring Framework 4 中添加的模块,是 Spring 与消息系统 集成的一个扩展性的支持。它实现了从基于 JmsTemplate 的简单的使用 JMS 接口到异 步接收消息的一整套完整的基础架构,Spring AMQP 提供了该协议所要求的类似的功能 集。 在与 Spring Boot 的集成后,它拥有了自动配置能力,能够在测试和运行时与相应 的消息传递系统进行集成。
📄 Page 6
开篇:在Spring 生态中玩转RocketMQ < 6 单纯对于客户端而言,Spring Messaging 提供了一套抽象的 API 或者说是约定的标 准,对消息发送端和消息接收端的模式进行规定,比如消息 Messaging 对应的模型就 包括一个消息体 Payload 和消息头 Header。不同的消息中间件提供商可以在这个模式 下提供自己的 Spring 实现:在消息发送端需要实现的是一个 XXXTemplate 形式的 Java Bean,结合 Spring Boot 的自动化配置选项提供多个不同的发送消息方法;在消息的消 费端是一个 XXXMessageListener 接口(实现方式通常会使用一个注解来声明一个消息驱 动的 POJO),提供回调方法来监听和消费消息,这个接口同样可以使用 Spring Boot 的自动化选项和一些定制化的属性。 在 Apache RocketMQ 生态中,RocketMQ-Spring-Boot-Starter(下文简称 RocketMQ-Spring)就是一个支持 Spring Messaging API 标准的项目。该项目把 RocketMQ 的客户端使用 Spring Boot 的方式进行了封装,可以让用户通过简单的 annotation 和标准的 Spring Messaging API 编写代码来进行消息的发送和消费,也支 持扩展出RocketMQ 原生 API 来支持更加丰富的消息类型。在 RocketMQ-Spring 毕业初 期,RocketMQ 社区同学请 Spring 社区的同学对 RocketMQ-Spring 代码进行 review,
📄 Page 7
7 > 开篇:在 Spring 生态中玩转RocketMQ 引出一段罗美琪(RocketMQ)和春波特(Spring Boot)故事的佳话[1],著名 Spring 布道师 Josh Long 向国外同学介绍如何使用 RocketMQ-Spring 收发消息[2]。 RocketMQ-Spring 也在短短两年时间超越 Spring-Kafka 和 Spring-AMQP(注:两者均由 Spring 社区维护),成为 Spring Messaging 生态中最活跃的消息项目。 三、Spring Cloud Stream Spring Cloud Stream结合了 Spring Integration 的注解和功能,它的应用模型如 下:
📄 Page 8
开篇:在Spring 生态中玩转RocketMQ < 8 Spring Cloud Stream框架中提供一个独立的应用内核,它通过输入(@Input)和输 出(@Output)通道与外部世界进行通信,消息源端(Source)通过输入通道发送消息,消费 目标端(Sink)通过监听输出通道来获取消费的消息。这些通道通过专用的 Binder 实现与 外部代理连接。开发人员的代码只需要针对应用内核提供的固定的接口和注解方式进行 编程,而不需要关心运行时具体的Binder 绑定的消息中间件。 在运行时,Spring Cloud Stream 能够自动探测并使用在 classpath 下找到的 Binder。这样开发人员可以轻松地在相同的代码中使用不同类型的中间件:仅仅需要在 构建时包含进不同的 Binder。在更加复杂的使用场景中,也可以在应用中打包多个 Binder 并让它自己选择 Binder,甚至在运行时为不同的通道使用不同的Binder。 Binder 抽象使得 Spring Cloud Stream 应用可以灵活的连接到中间件,加之 Spring Cloud Stream 使用利用了 Spring Boot 的灵活配置配置能力,这样的配置可以 通过外部配置的属性和 Spring Boot 支持的任何形式来提供(包括应用启动参数、环境 变量和 application.yml 或者 application.properties 文件),部署人员可以在运行时动 态选择通道连接 destination(例如,RocketMQ 的 topic 或者 RabbitMQ 的 exchange)。 Spring Cloud Stream 屏蔽了底层消息中间件的实现细节,希望以统一的一套 API 来进行消息的发送/消费,底层消息中间件的实现细节由各消息中间件的 Binder 完成。 Spring 官方实现了 Rabbit binder 和 Kafka Binder。Spring Cloud Alibaba 实现了 RocketMQ Binder[3],其主要实现原理是把发送消息最终代理给了 RocketMQ-Spring 的 RocketMQTemplate,在消费端则内部会启动 RocketMQ-Spring Consumer Container 来接收消息。以此为基础,Spring Cloud Alibaba 还实现了 Spring Cloud Bus RocketMQ,用户可以使用 RocketMQ 作为 Spring Cloud 体系内的消息总线,来
📄 Page 9
9 > 开篇:在 Spring 生态中玩转RocketMQ 连接分布式系统的所有节点。通过 Spring Cloud Stream RocketMQ Binder, RocketMQ 可以与 Spring Cloud 生态更好的结合。比如与 Spring Cloud Data Flow、 Spring Cloud Funtion 结合,让 RocketMQ 可以在 Spring 流计算生态、Serverless( FaaS)项目中被使用。 如今 Spring Cloud Stream RocketMQ Binder 和 Spring Cloud Bus RocketMQ 做为 Spring Cloud Alibaba 的实现已登陆 Spring 的官网[4],Spring Cloud Alibaba 也 成为 Spring Cloud 最活跃的实现。 四、如何在 Spring 生态中选择 RocketMQ实现? 通过介绍 Spring 中的消息框架,介绍了以 RocketMQ 为基础与 Spring 消息框架结 合的几个项目,主要是 RocketMQ-Spring、Spring Cloud Stream RocketMQ Binder、 Spring Cloud Bus RocketMQ、Spring Data Flow 和 Spring Cloud Function。它们 之间的关系可以如下图表示。
📄 Page 10
开篇:在Spring 生态中玩转RocketMQ < 10 如何在实际业务开发中选择相应项目进行使用?下表列出了每个项目的特点和使用 场景。
📄 Page 11
11 > 开篇:在 Spring 生态中玩转RocketMQ 项目 特点 使用场景 RocketMQ-Spring 1.作为起步依赖,简单引入一个包就能在 Spring 生态用到 RocketMQ 客户端的所有功能。 2.利用了大量自动配置和注解简化了编程模型, 并且支持 Spring Messaging API 3.与 RocketMQ 原生 Java SDK 的功能完全对 齐 适合在 Sp r i n g Boot中使用Rock etMQ 的用户,希 望能用到 Rocket MQ原生java客户 端的所有功能,并 通过 Spring 注解 和自动配置简化 编程模型。 Spring Cloud Stream RocketMQ Binder 1.屏蔽底层 MQ 实现细节,上层 Spring Cloud Stream 的 API 是统一的。如果想从 Kafka 切到 RocketMQ,直接改个配置即可。 2.与 Spring Cloud 生态整合更加方便。比如 S pring Cloud Data Flow,这上面的流计算都是 基于 Spring Cloud Stream;Spring Cloud B us 消息总线内部也是用的 Spring Cloud Stre am。 3.Spring Cloud Stream 提供的注解,编程体验 都是非常棒。 在代码层面能完 全屏蔽底层消息 中间件的用户,并 且希望能项目能 更好的接入 Sprin g Cloud 生态(S pring Cloud Dat a Flow、Spring Cloud Funtcion 等)。
📄 Page 12
开篇:在Spring 生态中玩转RocketMQ < 12 项目 特点 使用场景 Spring Cloud Bus RocketMQ 将 RocketMQ 作为事件的“传输器”,通过发 送事件(消息)到消息队列上,从而广播到订阅 该事件(消息)的所有节点上。完成事件的分发 和通知。 在 Spring 生态中 希望用RocketMQ 做消息总线的用 户,可以用在应用 间事件的通信,配 置中心客户端刷 新等场景 Spring Cloud Data Flow 以 Source/Processor/Sink 组件进行流式任务 处理。RocketMQ 作为流处理过程中的中间存储 组件 流处理,大数据处 理场景 Spring Cloud Function 消息的消费/生产/处理都是一次函数调用,融合 Java 生态的 Function 模型 Serverless 场景 RocketMQ 作为业务消息的首选,在消息和流处理领域被广泛应用。而微服务生态 Spring 框架也是业务开发中最被,两者的完美契合使得 RocketMQ 成为 Spring Messaing 实现中最受欢迎的消息实现。书的后半部分讲给各位开发者详细讲述在 Spring 生态中使用 RocketMQ 的三种主流的方式。
📄 Page 13
13 > RocketMQ Spring 最初的故事:罗美琪和春波特的故事... RocketMQ Spring 最初的故事:罗美琪 和春波特的故事... 作者:辽天 rocketmq-spring 经过 6 个多月的孵化,作为 Apache RocketMQ 的子项目正式毕 业,发布了第一个 Release 版本 2.0.1。这个项目是把 RocketMQ 的客户端使用 Spring Boot 的方式进行了封装,可以让用户通过简单的 annotat ion 和标准的 Spring Messaging API 编写代码来进行消息的发送和消费。 在项目发布阶段我们很荣幸的邀请了 Spring 社区的原创人员对我们的代码进行了 Review,通过几轮 slack 上的深入交流感受到了 Spring 团队对开源代码质量的标准,对 SpringBoot 项目细节的要求。本文是对 Review 和代码改进过程中的经验和技巧的总结, 希望从事 Spring Boot 开发的同学有帮助。我们把这个过程整理成 RocketMQ 社区的贡 献者罗美琪和 Spring 社区的春波特(SpringBoot)的故事。 一、故事的开始 故事的开始是这样的,罗美琪美眉有一套 RocketMQ 的客户端代码,负责发送消息 和消费消息。早早的听说春波特小哥哥的大名,通过 Spring Boot 可以把自己客户端调 用变得非常简单,只使用一些简单的注解(annotation)和代码就可以使用独立应用的方 式启动,省去了复杂的代码编写和参数配置。
📄 Page 14
RocketMQ Spring 最初的故事:罗美琪和春波特的故事... < 14 聪明的她参考了业界已经实现的消息组件的 Spring 实现了一个 RocketMQ Spring 客户端: 1.需要一个消息的发送客户端,它是一个自动创建的 Spring Bean,并且相关属性要 能够根据配置文件的配置自动设置, 命名它为: RocketMQTemplate,同时让它封装发 送消息的各种同步和异步的方法。 @Resourceprivate RocketMQTemplate rocketMQTemplate; ... SendResult sendResult = rocketMQTemplate.syncSend(xxxTopic, "Hello, World!"); 2.需要消息的接收客户端,它是一个能够被应用回调的 Listener, 来将消费消息回调 给用户进行相关的处理。 @Service@RocketMQMessageListener(topic = "xxx", consumerGroup = "xxx_consumer") public class StringConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.printf("------- StringConsumer received: %s \n", message); } } 特别说明一下:这个消费客户端 Listener 需要通过一个自定义的注解。 @RocketMQMessageListener 来标注,这个注解的作用有两个:  定义消息消费的配置参数(如: 消费的 topic, 是否顺序消费,消费组等);
📄 Page 15
15 > RocketMQ Spring 最初的故事:罗美琪和春波特的故事...  可以让spring-boot在启动过程中发现标注了这个注解的所有Listener, 并进行初始 化,详见 ListenerContainerConfiguration 类及其实现 SmartInitializingSingleton 的接口方法 afterSingletonsInstantiated()。 通过研究发现,Spring-Boot 最核心的实现是自动化配置(auto configuration),它 需要分为三个部分:  AutoConfiguration 类,它由@Configuration 标注,用来创建 RocketMQ 客户端所 需要的 SpringBean,如上面所提到的 RocketMQTemplate 和能够处理消费回调 L i s t e n e r 的容器,每个 L i s t e n e r 对应一个容器 Sp r i n g B e a n 来启动 MQPushConsumer,并将来将监听到的消费消息并推送给 Listener 进行回调。可参 考 RocketMQ AutoConfiguration.java (编者注: 这个是最终发布的类,没有 review 的痕迹啦)  上面定义的 Configuration 类,它本身并不会“自动”配置,需要由 META-INF/ spring.factories 来声明,可参考 spring.factories 使用这个META 配置的好处是上 层用户不需要关心自动配置类的细节和开关,只要 classpath 中有这个META-INF 文件和 Configuration 类,即可自动配置。  另外,上面定义的 Configuration 类,还定义了@EnableConfiguraitonProperties 注解来引入 ConfigurationProperties 类,它的作用是定义自动配置的属性,可参考 RocketMQProperties.java 上层用户可以根据这个类里定义的属性来配置相关的属 性文件(即 META-INF/application.properties 或 META-INF/application.yaml) 二、故事的发展
📄 Page 16
RocketMQ Spring 最初的故事:罗美琪和春波特的故事... < 16 罗美琪美眉按照这个思路开发完成了 RocketMQ SpringBoot 封装并形成了 starter 交给社区的小伙伴们试用,nice,大家使用后反馈效果不错。但是还是想请教一下专业 的春波特小哥哥,看看他的意见。 春波特小哥哥相当的负责地对罗美琪的代码进行了 Review, 首先他抛出了两个链 接: https://github.com/spring-projects/spring-boot/wiki/Building-On-Spring-Boot https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features -developing-auto-configuration.html 然后解释道:"在 Spring Boot 中包含两个概念: auto-configuration 和 starter- POMs, 它们之间相互关联,但是不是简单绑定在一起的: a. auto-configuration 负责响应应用程序的当前状态并配置适当的 Spring Bean。 它放在用户的CLASSPATH中结合在CLASSPATH中的其它依赖就可以提供相关的功能; b. Starter-POM 负责把 auto-configuration 和一些附加的依赖组织在一起,提供开 箱即用的功能,它通常是一个maven project, 里面只是一个 POM文件,不需要包含任 何附加的 classes 或 resources. 换句话说,starter-POM 负责配置全量的 classpath, 而 auto-configuration 负责具 体的响应(实现);前者是 total-solution, 后者可以按需使用。
📄 Page 17
17 > RocketMQ Spring 最初的故事:罗美琪和春波特的故事... 你现在的系统是单一的一个module 把 auto-configuration 和 starter-POM 混在了 一起,这个不利于以后的扩展和模块的单独使用。" 罗美琪了解到了区分确实对日后的项目维护很重要,于是将代码进行了模块化 |--- rocketmq-spring-boot-parent 父 POM |--- rocketmq-spring-boot auto-configuraiton 模块 |--- rocketmq-spring-stater starter 模块 (实际上只包含一个 pom.xml 文 件) |--- rocketmq-spring-samples 调用 starter 的示例样本 "很好,这样的模块结构就清晰多了",春波特小哥哥点头,"但是这个 AutoConfiguration 文件里的一些标签的用法并不正确,帮你注释一下,另外,考虑到 Spring 官方到明年 8 月 Spring Boot 1.X 将不再提供支持,所以建议实现直接支持 Spring Boot 2.X" @Configuration @EnableConfigurationProperties(RocketMQProperties.class) @ConditionalOnClass(MQClientAPIImpl.class) @Order ~~春波特: 这个类里使用Order 很不合理呵,不建议使用,完全可以通过其他方式控制 runtime 是 Bean 的构建顺序 @Slf4j
📄 Page 18
RocketMQ Spring 最初的故事:罗美琪和春波特的故事... < 18 public class RocketMQAutoConfiguration { @Bean @ConditionalOnClass(DefaultMQProducer.class) ~~春波特: 属性直接使用类是不科学的,需 要用(name="类全名") 方式,这样在类不在 classpath 时,不会抛出 CNFE @ConditionalOnMissingBean(DefaultMQProducer.class) @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"nameServer", "producer.g roup"}) ~~春波特: nameServer 属性名要写成 name-server [1] @Order(1) ~~春波特: 删掉呵 public DefaultMQProducer mqProducer(RocketMQPropert ies rocketMQProperties) { ... } @Bean @ConditionalOnClass(ObjectMapper.class) @ConditionalOnMissingBean(name = "rocketMQMessageObjectMapper") ~~春波特: 不建 议与具体的实例名绑定,设计的意图是使用系统中已经存在的ObjectMapper, 如果没有,则在这里实 例化一个,需要改成 @ConditionalOnMissingBean(ObjectMapper.class)
📄 Page 19
19 > RocketMQ Spring 最初的故事:罗美琪和春波特的故事... public ObjectMapper rocketMQMessageObjectMapper() { return new ObjectMapper(); } @Bean(destroyMethod = "destroy") @ConditionalOnBean(DefaultMQProducer.class) @ConditionalOnMissingBean(name = "rocketMQTemplate") ~~春波特: 与上面一样 @Order(2) ~~春波特: 删掉呵 public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, @Autowired(required = false) ~~春波特: 删掉 @Qualifier("rocketMQMessageObjectMapper") ~~春波特: 删掉,不要与具体实例绑定 ObjectMapper objectMapper) { RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); rocketMQTemplate.setProducer(mqProducer); if (Objects.nonNull(objectMapper)) {
📄 Page 20
RocketMQ Spring 最初的故事:罗美琪和春波特的故事... < 20 rocketMQTemplate.setObjectMapper(objectMapper); } return rocketMQTemplate; } @Bean(name = RocketMQConfigUtils.ROCKETMQ_TRANSACTION_ANNOTATION_PROCESS OR_BEAN_NAME) @ConditionalOnBean(TransactionHandlerRegistry.class) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) ~~春波特: 这个 bean(RocketMQTransacti onAnnotationProcessor)建议声明成 static 的,因为这个 RocketMQTransactionAnnotationProcess or 实现了 BeanPostProcessor 接口,接口里方法在调用的时候(创建 Transaction 相关的 Bean 的时候) 可以直接使用这个 static 实例,而不要等到这个Configuration 类的其他的 Bean 都构建好 [2] public RocketMQTransactionAnnotationProcessor transactionAnnotationProcessor( TransactionHandlerRegistry transactionHandlerRegistry) { return new RocketMQTransactionAnnotationProcessor(transactionHandlerRegistry); } @Configuration ~~春波特: 这个内嵌的 Configuration 类比较复杂,建议独立成一个顶级类, 并且使用
The above is a preview of the first 20 pages. Register to read the complete e-book.

💝 Support Author

0.00
Total Amount (¥)
0
Donation Count

Login to support the author

Login Now
Back to List