消息分发机制
- 发布订阅:向每个消费者服务器都发送所有消息
- 轮询分发:无视消费者服务器性能平均分发
- 公平分发:根据消费者服务器的性能决定
- 重发:当消费者服务器宕机后会将消息重新发送
- 消息拉去:消费者服务器主动发送请求获取消息
高可用机制
- 主从共享:主服务进行写入一块数据区域,所有服务共享数据
- 主从分离:主服务向所有从服务发送消息,进行同步数据
- 多主多从:可以向任意结点进行写入,然后同步所有服务
- 转发模式:将消息的元数据进行存储,依次遍历所有节点
- 分库主从:每个小组做主从,所有小组间做分库
安装RabbitMQ
下载RabbitMQ和Erlang安装包
安装RabbitMQ和Erlang可执行文件
配置Erlang的bin目录环境变量到Path
在rabbit的sbin目录下管理员身份执行cmd命令
abbitmq-service.bat remove
set RABBITMQ_BASE=D:\Java\RabbitMQ\rabbitmq\data
rabbitmq-service.bat install
rabbitmq-plugins.bat enable rabbitmq_management
访问localhost:15672
用户名和密码guest
角色权限
none
management
policymaker
monitoring
administrator
搭建简单模式
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("zhangsan");
factory.setPassword("333");
factory.setVirtualHost("/");
Connection connection = factory.newConnection("producerConnection");
Channel channel = connection.createChannel();
String queueName = "queue1";
channel.queueDeclare(queueName, false, false, false, null);
String exchangeName = "exchange1";
String exchangeType = "direct";
channel.exchangeDeclare(exchangeName, exchangeType, true);
channel.queueBind("myQueue", "myExchange", "myRoutingKey");
String msg = "Hello World";
channel.basicPublish(exchangeName, queueName,null, msg.getBytes());
channel.close();
connection.close();
}
}
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("zhangsan");
factory.setPassword("333");
factory.setVirtualHost("/");
Connection connection = factory.newConnection("consumerConnection");
Channel channel = connection.createChannel();
String queueName = "张三";
channel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("收到消息" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("接收消息失败");
}
});
System.out.println("接收中。。。");
System.in.read();
connection.close();
channel.close();
}
}
AMQP协议
先创建长连接,之后开启通道进行通信
交换机接收消息并将消息发送到指定队列
RoutingKey进一步指定消息要发送给哪些队列
VirtualHosts将Broker服务进行虚拟分区
Fanout模式
设置交换机模式是fanout模式
将交换机与相应队列都进行绑定
direct模式
设置交换机模式是direct模式
将交换机队列绑定并指定RoutingKey
Topic模式
设置交换机模式是topic模式
可以指定模糊匹配的RoutingKey
#表示0-n级目录,*表示1级目录
Headers模式
设置交换机的模式是Headers
设置参数key和value
轮询模式
autoAck为true的时候为自动应答
channel.basicConsumer(“queue”, true, deliverCallback);
公平模式
一次传送的消息数量
channel.basicQos(1);
autoAck为false的时候为手动应答
channel.basicConsumer(“queue”, false, deliverCallback);
channel.basicAck(delivery.getEnvelop().getDeliveryTag(), false);
整合SpringBoot
server:
port:
8080
spring:
rabbitmq:
username: zhangsan
password: 333
virtual-host: /
host: 127.0.0.1
port: 5672
@Configuration
public class RabbitMqConfiguration {
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("myExchange", true, false)
}
@Bean
public Queue myQueue() {
return new Queue("myQueue", true)
}
public Binding myBind() {
return BindingBuilder.bin(myQueue()).to(fanoutExchange());
}
}
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void product() {
String exchangeName = "myExchange";
String queueName = "myQueue";
rabbitTemplate.convertAndSend(exchangeName, queueName);
}
}
@Component
@RabbitListener(queue = {"myQueue"})
public class Consumer {
@RabbitHandler
public void consume(String message) {
System.out.println(message);
}
}
过期时间TTL
设置队列TTL
@Configuration
public class TTLRabbitMQConfiguration {
@Bean
public DirectExchange ttlDirectExchange() {
return new DirectExchange("ttlDirectExchange", true, false);
}
@Bean
public Queue ttlDirectQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000);
return new Queue("ttlDirectionQueue", true, false, false, args)
}
@Bean
public Binding ttlBinding() {
return BindingBuilder.bin(ttlDirectQueue()).to(ttlDirectExchange()).with("routingKey");
}
}
设置消息TTL
public class Producer {
@Autowired private RabbitTemplate rabbitTemplate;
public void product() {
String exchangeName = "ttlDirectExchange";
String routingKey = "ttl";
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
public Message postProcessMessage(Message message) {
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
}
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId, messagePostProcessor);
}
}
死信队列
消息过期,消息被拒绝,队列达到最大长度
@Configurationpublic
class RabbitConfiguration {
@Bean
public DirectExchanges deadDirect() {
return new DirectExchage("deadExchange", true, false);
}
@Bean
public Queue deadQueue() {
return new Queue("deadQueue", true);
}
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadDirect()).to(deadQueue()).with("dead");
}
@Bean
public Queue ttlDirectQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000);
args.put("x-dead-letter-exchange", "dead_direct_exchange");
args.put("x-dead-letter-routing-key", "dead"); return new Queue("ttlDirectionQueue", true, false, false, args).with("routingKey")
}
}
集群
普通集群
? 主节点存储所有数据,从节点存储除消息队列中消息以外的数据
? 生产者向主节点发送数据,消费者可以从任意结点获取消息,该消息从主节点调取
|