IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ -> 正文阅读

[大数据]RabbitMQ

消息分发机制

  • 发布订阅:向每个消费者服务器都发送所有消息
  • 轮询分发:无视消费者服务器性能平均分发
  • 公平分发:根据消费者服务器的性能决定
  • 重发:当消费者服务器宕机后会将消息重新发送
  • 消息拉去:消费者服务器主动发送请求获取消息

高可用机制

  • 主从共享:主服务进行写入一块数据区域,所有服务共享数据
  • 主从分离:主服务向所有从服务发送消息,进行同步数据
  • 多主多从:可以向任意结点进行写入,然后同步所有服务
  • 转发模式:将消息的元数据进行存储,依次遍历所有节点
  • 分库主从:每个小组做主从,所有小组间做分库

安装RabbitMQ

下载RabbitMQ和Erlang安装包

安装RabbitMQ和Erlang可执行文件

配置Erlang的bin目录环境变量到Path

在rabbit的sbin目录下管理员身份执行cmd命令

abbitmq-service.bat remove
set RABBITMQ_BASE=D:\Java\RabbitMQ\rabbitmq\data
rabbitmq-service.bat install
rabbitmq-plugins.bat enable rabbitmq_management

访问localhost:15672

用户名和密码guest

角色权限

none
management
policymaker
monitoring
administrator

搭建简单模式

//生产者搭建
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("zhangsan");
        factory.setPassword("333");
        factory.setVirtualHost("/");
        //创建链接
        Connection connection = factory.newConnection("producerConnection");
        //创建通道
        Channel channel = connection.createChannel();
        //指定队列名
        String queueName = "queue1";
        //队列名称,是否持久化,是否独占,是否自动删除队列,携带附属参数
        channel.queueDeclare(queueName, false, false, false, null);
        //交换机名,类型
        String exchangeName = "exchange1";
        String exchangeType = "direct";
        //交换机名,true表示不会因为服务重启交换机丢失
        channel.exchangeDeclare(exchangeName,  exchangeType, true);
        //绑定队列和交换机
        channel.queueBind("myQueue", "myExchange", "myRoutingKey");
        //发布
        String msg = "Hello World";
        channel.basicPublish(exchangeName, queueName,null, msg.getBytes());
        //关闭连接
        channel.close();
        connection.close();
    }
}
//消费者搭建
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("zhangsan");
        factory.setPassword("333");
        factory.setVirtualHost("/");
        //创建连接
        Connection connection = factory.newConnection("consumerConnection");
        //创建通道
        Channel channel = connection.createChannel();
        String queueName = "张三";
        channel.basicConsume(queueName, true, new DeliverCallback() {
            public void handle(String s, Delivery delivery) throws IOException {
                System.out.println("收到消息" + new String(delivery.getBody(), "UTF-8"));
            }
        }, new CancelCallback() {
            public void handle(String s) throws IOException {
                System.out.println("接收消息失败");
            }
        });
        System.out.println("接收中。。。");
        System.in.read();
        connection.close();
        channel.close();
    }
}

AMQP协议

先创建长连接,之后开启通道进行通信

交换机接收消息并将消息发送到指定队列

RoutingKey进一步指定消息要发送给哪些队列

VirtualHosts将Broker服务进行虚拟分区

Fanout模式

设置交换机模式是fanout模式

将交换机与相应队列都进行绑定

direct模式

设置交换机模式是direct模式

将交换机队列绑定并指定RoutingKey

Topic模式

设置交换机模式是topic模式

可以指定模糊匹配的RoutingKey

#表示0-n级目录,*表示1级目录

Headers模式

设置交换机的模式是Headers

设置参数key和value

轮询模式

autoAck为true的时候为自动应答

channel.basicConsumer(“queue”, true, deliverCallback);

公平模式

一次传送的消息数量

channel.basicQos(1);

autoAck为false的时候为手动应答

channel.basicConsumer(“queue”, false, deliverCallback);

channel.basicAck(delivery.getEnvelop().getDeliveryTag(), false);

整合SpringBoot

#配置application.yaml
server:
    port:
        8080
spring:
    rabbitmq:
        username: zhangsan
        password: 333
        virtual-host: /
        host: 127.0.0.1
        port: 5672
@Configuration
public class RabbitMqConfiguration {
    //创建交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("myExchange", true, false)
    }
    //创建队列
    @Bean
    public Queue myQueue() {
        return new Queue("myQueue", true)
    }
    //完成绑定关系
    public Binding myBind() {
        return BindingBuilder.bin(myQueue()).to(fanoutExchange());
    }
}
//生产者发送消息
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void product() {
        String exchangeName = "myExchange";
        String queueName = "myQueue";
        rabbitTemplate.convertAndSend(exchangeName, queueName);
    }
}
//消费者接受消息
@Component
@RabbitListener(queue = {"myQueue"})
public class Consumer {
    @RabbitHandler
    public void consume(String message) {
        System.out.println(message);
    }
}

过期时间TTL

设置队列TTL

@Configuration
public class TTLRabbitMQConfiguration {
    @Bean
    public DirectExchange ttlDirectExchange() {
        return new DirectExchange("ttlDirectExchange", true, false);
    }
    @Bean
    public Queue ttlDirectQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 5000);
        return new Queue("ttlDirectionQueue", true, false, false, args)
    }
    @Bean
    public Binding ttlBinding() {
        return BindingBuilder.bin(ttlDirectQueue()).to(ttlDirectExchange()).with("routingKey");
    }
}

设置消息TTL

public class Producer {
    @Autowired    private RabbitTemplate rabbitTemplate;        
    public void product() {        
    	String exchangeName = "ttlDirectExchange";        
    	String routingKey = "ttl";        
    	MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {            
    		public Message postProcessMessage(Message message) {                
    			message.getMessageProperties().setExpiration("5000");                
    			message.getMessageProperties().setContentEncoding("UTF-8");                
    			return message;            
    		}        
    	}        
    	rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId, messagePostProcessor);    
    }
}

死信队列

消息过期,消息被拒绝,队列达到最大长度

@Configurationpublic 
class RabbitConfiguration {    
	@Bean    
	public DirectExchanges deadDirect() {        
		return new DirectExchage("deadExchange", true, false);    
	}    
	@Bean    
	public Queue deadQueue() {        
		return new Queue("deadQueue", true);    
	}    
	@Bean    
	public Binding deadBinding() {        
		return BindingBuilder.bind(deadDirect()).to(deadQueue()).with("dead");    
	}    
	@Bean    
	public Queue ttlDirectQueue() {        
		Map<String, Object> args = new HashMap<>();        
		args.put("x-message-ttl", 5000);        
		args.put("x-dead-letter-exchange", "dead_direct_exchange");        
		args.put("x-dead-letter-routing-key", "dead");        return new Queue("ttlDirectionQueue", true, false, false, args).with("routingKey")    
	}
}

集群

普通集群

? 主节点存储所有数据,从节点存储除消息队列中消息以外的数据

? 生产者向主节点发送数据,消费者可以从任意结点获取消息,该消息从主节点调取

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-30 12:48:25  更:2021-07-30 12:48:46 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 8:06:09-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码