本文共 11043 字,大约阅读时间需要 36 分钟。
RocketMQ出了4的版本,而且本身这个mq有事务消息,在分布式的场景中有很好的启发性和作用,而且本身它也是阿里开源到apache的一个项目,从出身还是实力来说都很不错的。
1、新建项目sc-rocketmq,对应的pom.xml如下
4.0.0 spring-cloud sc-rocketmq 0.0.1-SNAPSHOT jar sc-rocketmq http://maven.apache.org org.springframework.boot spring-boot-starter-parent 2.0.4.RELEASE org.springframework.cloud spring-cloud-dependencies Finchley.RELEASE pom import UTF-8 1.8 1.8 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-test test org.apache.rocketmq rocketmq-client 4.2.0 com.alibaba fastjson 1.2.44 org.slf4j slf4j-api 1.7.25
Producer单从分类producer的官网doc来看主要分成3种:
DefaultMQProducer
TransactionMQProducer
messagingAccessPoint.createProducer()
本文主要说的是DefaultMQProducer和TransactionMQProducer
默认的producer是DefaultMQProducer,从官方的文档来看,前四个都是对这个producer的运用只是set的值不同而已,而且是很细微的变化而已。
2、新建配置文件application.yml
server: port: 8182spring: application: name: sc-rocketmqrocketmq: consumer: groupName: consumerGroup # 消费者的组名 consumeThreadMin: 2 consumeThreadMax: 5 consumeMessageBatchMaxSize: 10 topics: rocketTopic,rocketTag producer: groupName: producerGroup # 生产者的组名 maxMessageSize: 100 sendMsgTimeout: 1000 retryTimesWhenSendFailed: 3 namesrvAddr: 127.0.0.1:9876 # NameServer地址
3、新建消息生产者类
读取application.yml配置:
package sc.rocketmq.config;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Configuration;@ConfigurationProperties(prefix = "rocketmq.producer")@Configurationpublic class ProducerConfig { private String namesrvAddr; private String groupName; public String getNamesrvAddr() { return namesrvAddr; } public void setNamesrvAddr(String namesrvAddr) { this.namesrvAddr = namesrvAddr; } public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } @Override public String toString() { return "ProducerConfig [namesrvAddr=" + namesrvAddr + ", groupName=" + groupName + "]"; }}
消息生产者:
package sc.rocketmq.config;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class ProducerConfigure { Logger log = LoggerFactory.getLogger(ProducerConfigure.class); @Autowired private ProducerConfig producerConfigure; /** * 创建普通消息发送者实例 * * @return * @throws MQClientException */ @Bean// @ConditionalOnProperty(prefix = "rocketmq.producer", value = "default", havingValue = "true") public DefaultMQProducer defaultProducer() throws MQClientException { log.info(producerConfigure.toString()); log.info("defaultProducer 正在创建---------------------------------------"); DefaultMQProducer producer = new DefaultMQProducer(producerConfigure.getGroupName()); producer.setNamesrvAddr(producerConfigure.getNamesrvAddr()); producer.setVipChannelEnabled(false); producer.setRetryTimesWhenSendAsyncFailed(10); producer.start(); log.info("rocketmq producer server开启成功---------------------------------."); return producer; }}
4、新建消息消费者类
读取application.yml配置:
package sc.rocketmq.config;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Configuration;@ConfigurationProperties(prefix = "rocketmq.consumer")@Configurationpublic class ConsumerConfig { private String groupName; private String namesrvAddr; public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getNamesrvAddr() { return namesrvAddr; } public void setNamesrvAddr(String namesrvAddr) { this.namesrvAddr = namesrvAddr; } @Override public String toString() { return "ConsumerConfig [groupName=" + groupName + ", namesrvAddr=" + namesrvAddr + "]"; }}
消息消费者类(抽象类):
package sc.rocketmq.config;import java.util.List;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Configuration;@Configurationpublic abstract class DefaultConsumerConfigure { Logger log = LoggerFactory.getLogger(DefaultConsumerConfigure.class); @Autowired private ConsumerConfig consumerConfig; // 开启消费者监听服务 public void listener(String topic, String tag) throws MQClientException { log.info("开启" + topic + ":" + tag + "消费者-------------------"); log.info(consumerConfig.toString()); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerConfig.getGroupName()); consumer.setNamesrvAddr(consumerConfig.getNamesrvAddr()); consumer.subscribe(topic, tag); // 开启内部类实现监听 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { return DefaultConsumerConfigure.this.dealBody(msgs); } }); consumer.start(); log.info("rocketmq启动成功---------------------------------------"); } // 处理body的业务 public abstract ConsumeConcurrentlyStatus dealBody(List msgs);}
具体消息消费者类:
package sc.rocketmq.service;import java.io.UnsupportedEncodingException;import java.util.List;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.context.ApplicationListener;import org.springframework.context.annotation.Configuration;import org.springframework.context.event.ContextRefreshedEvent;import sc.rocketmq.config.DefaultConsumerConfigure;@Configurationpublic class CustomConsumer extends DefaultConsumerConfigure implements ApplicationListener{ Logger log = LoggerFactory.getLogger(CustomConsumer.class); @Override public void onApplicationEvent(ContextRefreshedEvent arg0) { try { super.listener("t_TopicTest", "Tag1"); } catch (MQClientException e) { log.error("消费者监听器启动失败", e); } } @Override public ConsumeConcurrentlyStatus dealBody(List msgs) { int num = 1; log.info("进入"); for (MessageExt msg : msgs) { log.info("第" + num + "次消息"); try { String msgStr = new String(msg.getBody(), "utf-8"); log.info(msgStr); } catch (UnsupportedEncodingException e) { log.error("body转字符串解析失败"); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}
这个CustomConsumer类实现了ApplicationListener,让他在启动的时候就开启执行DefaultConsumerConfigure的listener方法
5、新建springboot启动类RocketMqApplication.java
package sc.rocketmq;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class RocketMqApplication { public static void main(String[] args) { SpringApplication.run(RocketMqApplication.class, args); }}
6、新建一个Controller,引入消息生产者
package sc.rocketmq.controller;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import com.alibaba.fastjson.JSON;import sc.rocketmq.service.CustomConsumer;@RestControllerpublic class ProducerController { Logger log = LoggerFactory.getLogger(CustomConsumer.class); @Autowired private DefaultMQProducer producer; // @Autowired // private TransactionMQProducer producer;// @Autowired// private TestTransactionListener testTransactionListener; @GetMapping("/msg/product") public void test(String info) throws Exception { Message message = new Message("TopicTest", "Tag1", "12345", "rocketmq测试成功".getBytes()); // 这里用到了这个mq的异步处理,类似ajax,可以得到发送到mq的情况,并做相应的处理 // 不过要注意的是这个是异步的 producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("传输成功"); log.info(JSON.toJSONString(sendResult)); } @Override public void onException(Throwable e) { log.error("传输失败", e); } }); }}
7、验证是否成功
访问http://127.0.0.1:8080/msg/product
可以看到controller产生消息,然后CustomConsumer类的dealBody方法消息消息。
源码:
https://gitee.com/hjj520/spring-cloud-2.x/tree/master/sc-apache-rocketmq
本文作者: java乐园
本文来自云栖社区合作伙伴“”,了解相关信息可以关注“”
转载地址:http://mittx.baihongyu.com/