作者:施自扬 微信号:shiziyangwx
rocketMQ 使用idea源码解析启动
1. git clone 代码
链接: githup地址
2. 使用idea打开
各个代码包的功能
- acl:权限控制模块
- broker: broker 模块(broke 启动进程)
- client :消息客户端,包含消息生产者、消息消费者相关类
- common :公共包
- dev :开发者信息(非源代码)
- distribution :部署实例文件夹(非源代码)
- docs:很多官方文档。虽然不是源码,但很重要。强烈建议通读一遍。
- example: RocketMQ 例代码
- filter :消息过滤相关基础类
- logappender:日志实现相关类
- namesrv:NameServer实现相关类(NameServer启动进程)
- openmessageing:消息开放标准
- remoting:远程通信模块,基于Netty。
- srcutil:服务工具类
- store:消息存储实现相关类
- style:checkstyle相关实现
- test:测试相关类
- tools:工具类,监控命令相关实现类
3.启动NameSrv
- 先建一个conf目录,存放需要的配置文件。
- 将rocketmq-distribution->conf中的broker.conf,logback_broker.xml,logback_namesrv.xml复制到创建的conf下
- 启动配置 将新创建的conf配置到变量上
- 启动完成
启动Broker
- 修改conf文件
任意目录创建一个data文件,我直接在broker项目下创建了 更改broker.conf配置文件
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# namesrvAddr地址
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable=true
#### 配置为上面设置的文件夹路径
# 存储路径
storePathRootDir=/Users/shiziyang/IdeaProjects/rocketmq-all-4.9.2/broker/data/dataDir
# commitLog路径
storePathCommitLog=/Users/shiziyang/IdeaProjects/rocketmq-all-4.9.2/broker/data/dataDir/commitlog
# 消息队列存储路径
storePathConsumeQueue=/Users/shiziyang/IdeaProjects/rocketmq-all-4.9.2/broker/data/dataDir/consumequeue
# 消息索引存储路径
storePathIndex=/Users/shiziyang/IdeaProjects/rocketmq-all-4.9.2/broker/data/dataDir/index
# checkpoint文件路径
storeCheckpoint=/Users/shiziyang/IdeaProjects/rocketmq-all-4.9.2/broker/data/dataDir/checkpoint
# abort文件存储路径
abortFile=/Users/shiziyang/IdeaProjects/rocketmq-all-4.9.2/broker/data/dataDir/abort
- 启动配置
第一个框为上边修改的broker.conf配置文件路径 - 启动完成
- 验证
发送消息 -配置namesrvAddr -启动main方法是否能发送成功 消费消息 -配置namesrvAddr -启动main方法是否能消费到
springboot引入rocketMQ
1.创建一个springboot项目
2.pom引入
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
3.发送消息
package com.mq.controller;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MqMessageController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@RequestMapping(value = "/pushMessage",method = RequestMethod.GET)
public String get(@RequestParam("id") int id) {
rocketMQTemplate.convertAndSend("test-topic", "你好,rocketmq:" + id);
return "success";
}
}
4.消费消息
package com.mq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "test-topic",consumerGroup = "my-consumer-group")
@Slf4j
public class MqConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println(message);
}
}
5.验证
监控界面RocketMQ-Console使用idea源码解析启动
1.下载代码
链接: RocketMQ-Console的githup地址
2.使用idea打开
3.更改配置文件
4.启动
5.验证
浏览器访问:http://localhost:8080
|