IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMq生产者消费者代码 -> 正文阅读

[大数据]RabbitMq生产者消费者代码

生产者:
package com.atguigu.util; /**
 * Copyright (c)  牧原 All Rights Reserved
 * <p>
 * Project: Producer
 * Package: com.atguigu.util
 * Version: 1.0
 * <p>
 * Created by songquan on 2022/4/26 上午10:13
 */


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.postgresql.core.QueryExecutor;
import org.postgresql.util.HostSpec;

import java.io.IOException;
import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.TimeoutException;

/**
 * @topic:
 * @desc:
 * @author: songquan tel:18211850987
 * @department:牧原食品-肉食总部数字化部-肉食总部大数据分析与应用科
 * @datatime: 2022/4/26 上午10:13
 */
public class Producer {


    //队列名称

    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.company.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.department.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.position.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.rank.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.employee.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.university.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.taginfo.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.district.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.bank.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.category.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.stordoc.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.measdoc.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.mattaxes.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.customer.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.material.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.bankaccbas.queue";
//    public static final String QUEUE_NAME = "data.etl.syn.rs.mdm.supplier.queue";


    //发消息
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建一个连接工厂,该连接工厂其实就对应着我们访问http://182.92.210.39:15672/网站之后的rabbitmq,从这个工厂里可以获取队列
        ConnectionFactory factory = new ConnectionFactory() {

            public QueryExecutor openConnectionImpl(HostSpec[] hostSpecs, String s, String s1, Properties properties) throws SQLException {
                return null;
            }
        };

        //工厂IP连接RabbitMQ的队列
        factory.setHost("10.106.11.37");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("123456");

        //创建连接
        //Connection connection =  factory.newConnection();
        Connection connection = factory.newConnection();
        //获取信道,通过这个信道可以连接交换机Exchange,然后再连接队列Queue
        Channel channel = (connection).createChannel();

        /**
         * 生成一个队列,此队列中可以存放消息
         *
         * 1.队列名称
         * 2.队列里面的消息是否持久化到磁盘中
         * 3.该消息队列是否共享,true表示多个消费者可访问此消息队列,false表示只有一个消费者可访问此消息队列
         * 4.是否自动删除,最后一个消费者断开连接以后,该消息队列是否自动删除
         * 5.其它参数
         * */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //要发送到hello队列中的消息
       // String message = "hello world";
        String message = "{\"table_name\":\"md_company\", \"type\":\"insert\", \"time_now\":\"2022-05-09 11:20:10\", \"id\":\"2\", \"tenant_id\":\"2\", \"code\":\"2\", \"name\" :\"2\", \"short_name\":\"2\", \"bank_account\":\"2\", \"bank_name\":\"2\", \"tax_payer_no\":\"2\", \"legal_person\":\"2\", \"tel\":\"2\", \"established_date\":\"2\", \"business_license\":\"2\", \"is_region\":\"2\" , \"is_bm_payment\":\"2\", \"province_code\":\"2\", \"city_code\":\"2\" , \"address\":\"2\" , \"company_type_code\":\"2\", \"is_used\":\"2\", \"create_user\":\"2\", \"create_dept\":\"2\", \"create_time\":\"2022-05-09 11:20:10\", \"update_user\":\"2\", \"update_time\":\"2022-05-09 11:20:18\", \"status\":\"2\", \"is_deleted\":\"2\", \"edge_form_id\":\"2\" , \"data_type\":\"2\",\"exchange_status\":\"2\"}";
        /**
         * 往队列中发送一个消息
         *
         * 1.发送到哪个交换机
         * 2.路由的Key值是哪个,本次是队列的名称
         * 3.其它参数信息
         * 4.发送消息的消息体,需要转换成Byte数组
         * */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

        //如果消息成功的发送到了hello队列中,那么会输出这句代码
        System.out.println("消息发送完毕");
    }

}

消费者:

package com.atguigu.util; /**
 * Copyright (c)  牧原 All Rights Reserved
 * <p>
 * Project: Consumer
 * Package: com.atguigu.util
 * Version: 1.0
 * <p>
 * Created by songquan on 2022/4/26 上午10:30
 */


import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @topic:
 * @desc:
 * @author: songquan tel:18211850987
 * @department:牧原食品-肉食总部数字化部-肉食总部大数据分析与应用科
 * @datatime: 2022/4/26 上午10:30
 */
public class Consumer {
    //消费者要获取哪个队列中的消息
    public static final String QUEUE_NAME="data.etl.syn.rs.mdm.company.queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("10.106.11.37");
        //用户名
        factory.setUsername("admin");
        //密码
        factory.setPassword("123456");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        //如果能成功接收到消息会调用的回调函数
        DeliverCallback deliverCallback=(consumerTag, message)->{
            System.out.println(new String(message.getBody()));
        };

        //如果取消从消息队列中获取消息时会调用的回调函数
        CancelCallback cancelCallback= consumerTag->{
            System.out.println("消息消费被中断");
        };

        /**
         *  消费者消费消息,也即是消费者从消息队列中取消息
         *
         *  1.消费哪个队列
         *  2.消费成功之后是否要自动应答,true代表的是自动应答,false代表的是手动应答
         *  3.消费者成功消费的回调
         *  4.消费者取消消费的回调
         * */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }

}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-10 11:58:25  更:2022-05-10 11:59:27 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 6:42:50-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码