Rabbitmq是一款消息中间件:其凭借高可靠,以扩展,高可用,以及丰富的功能,使得互联网中越来越多的公司用到,所以我们就要学习学习了 消息队列中间件(Message Queue Middleware)简称MQ 消息中间件的作用:解耦 冗余 扩展 削峰 可恢复性 顺序保证(线程) 缓冲 异步通信 Rabbitmq是采用ERlang语言编写的,所以在配置Rabbitmq环境时需要配置ERlang语言 Rabbitmq是实现了AMQP(Advanced Message Queuing Protocol 高级消息队列)协议的消息中间件
安装Rabbitmq:我是基于Centos8版本安装的 (默认有ERlang环境)
1.下载ERlang:[:erlang环境下载地址](https://github.com/rabbitmq/erlang-rpm/releases%EF%BC%9Aerlang%E7%8E%AF%E5%A2%83%E4%B8%8B%E8%BD%BD%E5%9C%B0%E5%9D%80)
2.使用工具放入指定文件夹:rpm -Uvh erlang-24.1-1.el8.x86_64.rpm :解压
3.安装erlang:yum install -y erlang erl -v:安装成功就能看到版本信息
4.安装基本类库:yum install -y socat
5.下载Rabbitmq:[Rabbitmq下载](https://www.rabbitmq.com/install-rpm.html#downloads)
6.rpm -Uvh rabbitmq-server-3.9.7-1.el8.noarch.rpm :解压
7.安装yum install -y rabbitmq-server
8.如果需要web页面访问需要安装插件 rabbitmq-plugins enable rabbitmq_management
9.需要设置用户用来管理:添加用户:rabbitmqctl add_user 名字 密码
分配权限:rabbitmqctl set_user_tags 名字 用户级别
rabbitmqctl set_permissions -p "/" 用户名 ".*" ".*" ".*" :为用户设置所有权限
用户级别:administrator:超级管理员
monitoring:监控者:可以看到所有的节点,但是不能操作
policymaker:策略定制者:可以自己创建自己的节点
management:普通管理员:只能看到自己的信息
基本命令:
systemctl start rabbitmq-server :启动服务
systemctl restart rabbitmq-server :重启服务
systemctl stop rabbitmq-server :关闭服务
systemctl status rabbitmq-server :查看服务状态
测试发送HelloWorld:
<!--添加Maven依赖->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
生产者(生产消息):
package ceshi;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitProducer {
public static void main(String[] args) {
ConnectionFactory connectionFactory=new ConnectionFactory();
Connection connection =null;
Channel channel =null;
try {
connectionFactory.setHost();
connectionFactory.setPort();
connectionFactory.setUsername();
connectionFactory.setPassword();
connectionFactory.setVirtualHost();
connectionFactory.setConnectionTimeout();
connection=connectionFactory.newConnection();
channel= connection.createChannel();
channel.queueDeclare("helloworld",false,false,false,null);
channel.basicPublish("","helloworld",null,"helloworld".getBytes());
System.out.println("发送消息成功");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
if (channel!=null) {
channel.close();
}
if (connection!=null) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
消费者(接收消息):
package ceshi;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitConsumer {
public static void main(String[] args) {
ConnectionFactory connectionFactory=new ConnectionFactory();
Connection connection =null;
Channel channel =null;
try {
connectionFactory.setHost("123.57.158.37");
connectionFactory.setPort(5672);
connectionFactory.setUsername("lengxin");
connectionFactory.setPassword("123456rjx");
connectionFactory.setVirtualHost("/");
connectionFactory.setConnectionTimeout(50000);
connection=connectionFactory.newConnection();
channel= connection.createChannel();
channel.basicConsume("helloworld",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("传递的消息为:"+new String(body));
}
});
System.out.println("接收消息");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
if (channel!=null) {
channel.close();
}
if (connection!=null) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
方法参数详解:
1.队列名称 2.是否持久化 3.排他性(独占) 4.是否自动删除(消费完成后) 5.携带参数
channel.queueDeclare("helloworld",false,false,false,null);
1.交换机名称 2.队列名称 3.传递消息额外配置(消息持久化) 4.消息具体内容(必须是字节数组)
channel.basicPublish("","helloworld",null,"helloworld".getBytes());
1.消费队列名称 2.开始消息的自动确认机制 3.消费时的回调接口
channel.basicConsume("helloworld",true,new DefaultConsumer(channel){
相关概念: 生产者(Producer):负责发送(生产)消息 消费者(Consumer):负责接收(消费)消息 Broker服务节点:看做一台Rabbitmq服务器,生产者往进放,消费者往出取 队列: 是Rabbitmq的内部对象,用于储存消息 交换机(Exchange):用来和队列绑定 路由key(Routing key):用来指定这个消息的路由规则(匹配) 绑定(Binding):将交换机与队列关联起来
rabbitmq的几种模式: helloworld模式: 一个生产者和一个消费者绑定同一个队列
工作队列模式: 一个生产者多个消费者,绑定同一个队列,默认是轮询(消费平等,不会因为消费快就多消费,消费慢就少消费) 可以手动设置,使其不是平等消费
发布与订阅模式(Publish/Subscribe)Fanout:绑定同一个交换机,生产者发送消息到交换机,其余队列里的消费者都能监听到消息 Direct(直连):通过Routing key 来配置连接 Topic(主题模式):通过Routing key 动态匹配
SpringBoot整合Rabbitmq :单独写 TTL 死信队列 rabbitmq集群 分布式事务
|