目录
一、RocketMQ可视化界面安装
二、springcloudstream整合rocketmq
1、引入依赖
2、开启输入何输出
3、关键的一步,配置文件
4、主类
5、控制器
6、消息消费者
7、消息生产者
RocketMQ安装步骤略。
开启rocketmq:
windows
start mqnamesrv.cmd
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
一、RocketMQ可视化界面安装
Tags · apache/rocketmq-externals · GitHub
下载
?下载好以后,修改配置文件:?
打包:
mvn clean package -Dmaven.test.skip=true
进入 target 启动 jar:
java -jar rocketmq-console-ng-1.0.0.jar
成功开启后,运行可视化项目,一般第一次运行会添加很多maven依赖
运行后服务可视化界面:
二、springcloudstream整合rocketmq
1、引入依赖
使用时记得对应自己的版本
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
2、开启输入何输出
@EnableBinding
package com.wxl;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class,args);
}
}
3、关键的一步,配置文件
spring:
cloud:
stream:
bindings:
input: # 对应消费者
destination: TestTopic
group: scGroup
output: # 对应生产者
destination: TestTopic
rocketmq:
binder:
name-server: 127.0.0.1:9876
4、主类
package com.wxl;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class,args);
}
}
5、控制器
package com.wxl.controller;
import com.wxl.basic.ScProducer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("/MQTest")
public class MQTestController {
@Resource
private ScProducer producer;
@RequestMapping("/sendMessage")
public String sendMessage(String message) {
producer.sendMessage(message);
return "消息发送完成";
}
}
6、消息消费者
package com.wxl.basic;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;
@Component
public class ScConsumer {
@StreamListener(value = Sink.INPUT)
public void onMessage(String message) {
System.out.println("received message:" + message + "from binding:" + Sink.INPUT);
}
}
7、消息生产者
package com.wxl.basic;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@Component
public class ScProducer {
@Resource
private Source source;
public void sendMessage(String msg){
Map<String,Object> headers=new HashMap<String,Object>();
headers.put(MessageConst.PROPERTY_TAGS,"testTag");
MessageHeaders messageHeaders=new MessageHeaders(headers);
Message<String> message= MessageBuilder.createMessage(msg,messageHeaders);
this.source.output().send(message);
}
}
未完待续......
|