一、RocketMQ环境搭建
1、软件准备
RocketMQ最新版本:4.5.1 下载地址
2、环境要求
JDK1.8以上 Linux64位系统(CentOS Linux release 7.7.1908) 源码安装需要安装Maven 3.2.x 4G+ free (如果给不到虚拟机4G可用内存 可以修改rocketma配置 下面介绍)
3、安装及启动
-
下载rocketmq #下载
wget https://archive.apache.org/dist/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip
-
修改脚本 bin/runserver.sh
bin/runbroker.sh
bin/tools.sh
-
vim runserver.sh -
vim runbroker.sh -
vim tools.sh
-
启动NameServer # 1.启动NameServer
mqnamesrv
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
-
启动Broker # 1.启动Broker
mqbroker -n localhost:9876
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log
二、RocketMQ环境测试
1、发送消息
# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.使用安装包的Demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
2、接收消息
# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
3、关闭RocketMQ
# 1.关闭NameServer
mqshutdown namesrv
# 2.关闭Broker
mqshutdown broker
三、RocketMQ和SpringBoot的整合
1、消息生产者
- 添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ckw.rocket</groupId>
<artifactId>springboot-rocketmq-produer</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<properties>
<rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring-boot-starter-version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
-
配置文件 # rocketmq的nameserver地址
rocketmq.name-server=IP:9876
# 指定生产组名称
rocketmq.producer.group=producer_grp
-
启动类
@SpringBootApplication
public class MQProducerApplication {
public static void main(String[] args) {
SpringApplication.run(MQSpringBootApplication.class);
}
}
- 测试类
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MyRocketProducerApplication.class})
public class MyRocketProducerApplicationTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void testSendMessage() {
this.rocketMQTemplate.convertAndSend(
"tp_springboot",
"springboot: hello ckw"
);
}
@Test
public void testSendMessages() {
for (int i = 0; i < 100; i++) {
this.rocketMQTemplate.convertAndSend(
"tp_springboot",
"springboot: hello ckw" + i
);
}
}
}
2、消息消费者
-
添加依赖 同消息生产者 -
配置文件 同消息生产者 -
启动类 同消息生产者 -
消息监听器
@Slf4j
@Component
@RocketMQMessageListener(topic = "tp_springboot", consumerGroup = "consumer_grp")
public class MyRocketListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info(message);
}
}
注意: 可能会启动报错,org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.0.1:10911> failed 新增配置 vim conf/broker.conf
|