代码运行需先安装环境依赖
windows环境下:
先安装Erlang
页面:https://www.erlang.org/downloads
下载链接:https://erlang.org/download/otp_win64_24.1.exe
安装完配置环境变量
ERLANG_HOME C:\Program Files\erl-24.1
然后在path变量里添加 %ERLANG_HOME%/bin
然后打开cmd命令框,输入erl
Eshell V12.1 (abort with ^G)
Erlang安装完成
开始安装rabbitmq
windows环境下
页面:https://www.rabbitmq.com/install-windows.html
下载链接:https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.9.7/rabbitmq-server-3.9.7.exe
client端
package com.liang.demo.RabbitMQ;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author : liyang
* @date : 2021/10/6
* rabbitMQ消费者
*/
public class Client {
public static void main(String[] args) throws Exception {
//连接工厂
ConnectionFactory f = new ConnectionFactory();
f.setHost("ip地址");
f.setUsername("admin");
f.setPassword("admin");
//建立连接
Connection c = f.newConnection();
//建立信道
Channel ch = c.createChannel();
//声明队列,如果该队列已经创建过,则不会重复创建
ch.queueDeclare("task_queue", true, false, false, null);
System.out.println("等待接收数据");
//收到消息后用来处理消息的回调对象
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("收到: " + msg);
//遍历字符串中的字符,每个点使进程暂停一秒
for (int i = 0; i < msg.length(); i++) {
if (msg.charAt(i) == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
System.out.println("处理结束");
//参数1:消息标签,参数2:是否确认多条消息
ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
//消费者取消时的回调对象
CancelCallback cancel = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//一次只能接受一条数据
ch.basicQos(1);
//第二个参数为消息回执,消息确认处理完成,为true为自动确认,只要消息发送到消费者即消息处理成功;为false为,手动发送确认回执,服务器才认为这个消息处理成功
ch.basicConsume("task_queue", false, callback, cancel);
}
}
server端
package com.liang.demo.RabbitMQ;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.util.Scanner;
/**
* @author : liyang
* @date : 2021/10/6
* rabbit生产者
*/
public class Server {
public static void main(String[] args) throws Exception {
//创建连接工厂,并设置连接信息
ConnectionFactory f = new ConnectionFactory();
f.setHost("ip地址");
f.setPort(5672);//可选,5672是默认端口
f.setUsername("admin");
f.setPassword("admin");
/*
* 与rabbitmq服务器建立连接,
* rabbitmq服务器端使用的是nio,会复用tcp连接,
* 并开辟多个信道与客户端通信
* 以减轻服务器端建立连接的开销
*/
Connection c = f.newConnection();
//建立信道
Channel ch = c.createChannel();
/*
* 声明队列,会在rabbitmq中创建一个队列
* 如果已经创建过该队列,就不能再使用其他参数来创建
*
* 参数含义:
* -queue: 队列名称
* -durable: 队列持久化,true表示RabbitMQ重启后队列仍存在
* -exclusive: 排他,true表示限制仅当前连接可用
* -autoDelete: 当最后一个消费者断开后,是否删除队列
* -arguments: 其他参数
*/
ch.queueDeclare("task_queue", true, false, false, null);
/*
* 发布消息
* 这里把消息向默认交换机发送.
* 默认交换机隐含与所有队列绑定,routing key即为队列名称
*
* 参数含义:
* -exchange: 交换机名称,空串表示默认交换机"(AMQP default)",不能用 null
* -routingKey: 对于默认交换机,路由键就是目标队列名称
* -props: 其他参数,例如头信息
* -body: 消息内容byte[]数组
*/
// ch.basicPublish("", "helloworld", null, "Hello world!".getBytes());
// System.out.println("消息已发送");
while (true) {
//控制台输入的消息发送到rabbitmq
System.out.print("输入消息: ");
String msg = new Scanner(System.in).nextLine();
//如果输入的是"exit"则结束生产者进程
if ("exit".equals(msg)) {
break;
}
//参数:exchage,routingKey,props,body;
//MessageProperties.PERSISTENT_TEXT_PLAIN持久化的设置
ch.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
System.out.println("消息已发送: " + msg);
}
c.close();
}
}
pom添加依赖:
<!--rabbitMQ依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
|