IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 在linux下,使用AMQP-CPP开发rabbitmq c++ -> 正文阅读

[大数据]在linux下,使用AMQP-CPP开发rabbitmq c++

首先介绍一下背景以及我为什么选择AMQP来开发

1. rabbitmq作为我们的日志中转

2. 生产者只能为单进程(项目要求),且进程还需要处理其他事情,所以发送者不能够阻塞进程

3. 必须实现心跳功能(我们是与代理服务器相连,如果没有心跳且没传输数据时会被代理关闭连接,除非设置代理不关闭连接,比如nginx在十分钟没有数据的情况下就会端开连接,可以手动设置nginx的保活时间时间长一点来解决,但是治标不治本,所以最好实现心跳)

由于上述三点,我比较了SimpleAmqpClient 与AMQP-CPP,给出自己的观点:

1.?SimpleAmqpClient与AMQP-CPP都是由c++封装的rabbitmq

2.?SimpleAmqpClient不支持异步,发送会阻塞进程。AMQP-CPP采用回调,异步完成发送

3. 我没有找到SimpleAmqpClient的心跳发送函数,只找到设置,只能接收到服务器给的心跳,无法响应,所以会被断开(可能在其他地方,我没找到)。AMQP-CPP在回调onNegotiate中与服务器协商间隔,在onHeartbeat接收服务器心跳并手动调用heartbeat发送心跳。

4.?SimpleAmqpClient的登录不能异步,这个在运行过程中如果重新登录也会阻塞住进程发的运行,毕竟我们只有一个进程。

5.?SimpleAmqpClient的只能编译成动态库,如果要使用静态库需要改源码。AMQP-CPP可编译静态库与动态库。

注:SimpleAmqpClient可以实现非阻塞的登录,需要修改代码。

基于上述问题与特性,我选择AMQP-CPP来开发。

1. 在github上clone下来,地址:https://github.com/CopernicaMarketingSoftware/AMQP-CPP

2. 编译:

????????mkdir build
????????cd build
????????cmake .. [-DAMQP-CPP_BUILD_SHARED] [-DAMQP-CPP_LINUX_TCP]
????????cmake --build . --target install

? ? ? ? eg:cmake ..? -DAMQP-CPP_BUILD_SHARED=OFF -DAMQP-CPP_LINUX_TCP=ON

3. 使用AMQP-CPP

????????AMQP-CPP可以自己实现网络部分,也可以直接用libevent,我选择直接用libevent

? ? ? ? 首先:继承LibEventHandler,并实现自己的函数:

????????

 class LibEventHandlerMyError : public AMQP::LibEventHandler                                   
 {
 public:
     LibEventHandlerMyError(struct event_base* evbase);
     
     ~LibEventHandlerMyError();
 
     void onError(AMQP::TcpConnection *connection, const char *message) override;
 
     virtual void onHeartbeat(AMQP::TcpConnection *connection);                                
 
     virtual void onReady(AMQP::TcpConnection *connection);
 
     virtual void onConnected(AMQP::TcpConnection *connection);
 
     virtual uint16_t onNegotiate(AMQP::TcpConnection *connection, uint16_t interval);
 public:                           
 
 private: 
    
     struct event_base* evbase_ {nullptr};
};

创建与rabbitmq的连接有三个类比较重要(我用的):AMQP::TcpConnection,AMQP::TcpChannel?LibEventHandlerMyError(继承LibEventHandler,以使用网络)

1. auto event_base = event_base_new();

2.? handle = ?new LibEventHandlerMyError(event_base, login);

3.?tcp_connect_ = new AMQP::TcpConnection (this, AMQP::Address(host_.c_str(), port_,
? ? ? ? ? ? AMQP::Login(user_.c_str(), password_.c_str()), vhost_.c_str()));

4.?tcp_channel_ = new AMQP::TcpChannel(tcp_connect_);

handle为LibEventHandlerMyError实例,login为我自己的登录结构体,创建TcpConnection时,我用的自己的LibEventHandlerMyError,所以用的this.

tcp_channel_就是用来操作的对象,发送,接收等

生产者过程:

 tcp_channel_->startTransaction();
?AMQP::Envelope env(msg, strlen(msg));
?env.setDeliveryMode(2);//这一步是设置属性,还有其他很多属性,具体的需要看头文件,我这里是设置为持久化信息

?tcp_channel_->publish(exchange, route_key, env);?
?tcp_channel_->commitTransaction()
? ?.onSuccess([]()mutable {
? ? ?  std::cout<<"success"<<std::endl;
     })
???.onError([](const char *err_msg)mutable {
? ? ? ? std::cout<<"send msg fail:"<< err_msg<<std::endl;
? ? ?});

注意:写到此并不能将消息发送出去,因为我是单进程,如果你是多线程,你可以开一个线程去执行event_base_dispatch(evbase_)也就是 event_base_loop(evbase_, 0);这里将网络事件进行轮询监听,并调用相应的回调,记住:这个函数是死循环,也就是无法主动退出,必须手动调用event_base_loopbreak,但是我是单进程,所以我是手动调用event_base_loop(evbase_, 2),EVLOOP_NONBLOCK ,来触发监听。至此,生产者发送完毕。

消费者大同小异了
????????

 tcp_channel_->consume(queue_name)
     .onReceived([](const AMQP::Message &msg, uint64_t tag, bool redelivered){
             .....todo
             channel->ack(tag);
             })  
 .onError([](const char *err_msg){
         printf("consume fail:%s", err_msg);
         });
             

over

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-13 17:31:50  更:2021-07-13 17:34:07 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/6 18:30:45-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码