IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> C++知识库 -> Windows c++中RabbitMQ-c特殊消息队列的保姆教程 -> 正文阅读

[C++知识库]Windows c++中RabbitMQ-c特殊消息队列的保姆教程

RabbitMQ-c 从安装到c++客户端项目实战

实现需求:
c++项目实现RabbitMQ客户端与服务端交互数据,MES信息传输。

# 1.安装RabbitMQ
此处只表示Windows10下的安装,其他Linux之类的安装请搜寻其他资料。
开发工具
visual studio 2015
cmake 3.21.3工具下载网址没有版本要求
RabbitMQ Windows版本地服务器

#(1)RabbitMQ服务器安装
RabbitMQ是要依赖Erlang的,安装之前先装Erlang。
Erlang下载路径官网
RabbitMQ下载路径官网,因为本人下载的是rabbitmq-server-3.9.7.exe,搭建成功并与能与本地项目通讯,所以推荐可以直接下此版本。如图:在这里插入图片描述此版本上说明了是依赖于Erlang/OTP 23.2链接 和 Erlang 24链接的(以免你们难找所以下载链接都贴上)在这里插入图片描述
上述步骤安装完成后;
就可以在浏览器中登录本地刚搭建的RabbitMQ服务器了,默认网址访问http://localhost:15672 进行测试,默认登录账户为:guest / guest。
注意:如果浏览器打不开网址,可能是默认安装RabbitMQ插件节点没有开启,在windows开始栏找到如图cmd打开在这里插入图片描述
输入 开启RabbitMQ节点 命令
rabbitmqctl start_app
#开启RabbitMQ管理模块的插件,并配置到RabbitMQ节点上
rabbitmq-plugins enable rabbitmq_management
如若配置失败,可能安装出错,卸载RabbitMQ服务端和Erlang之后重新安装(先安装Erlang完成后再安装RabbitMQ

如果需要关闭RabbitMQ节点,可以使用下面的命令:
rabbitmqctl stop_app

#(2)RabbitMQ-c安装
下载拉取源码:RabbitMQ-c,本地懒得配置git的可以直接下载zip包在这里插入图片描述
打开cmake软件,构建自己的vs工程文件。
在这里插入图片描述
我用的是vs2015,win32程序,点击Configure按钮配置在这里插入图片描述
点击完成后点击==Generate,按图取消其值的勾选,生成成功。在这里插入图片描述
如果要修改cmake配置,点击File->Delete cache就行了在这里插入图片描述

然后就是生成我们自己软件调用的DLL了。
点击open Project按钮,或者直接上面的build输出文件夹下打开vs工程,在这里插入图片描述
在这里插入图片描述
成功后在生成目录下可以看到我们所需要的DLL文件在这里插入图片描述
至此,所有的RabbitMQ配置已经完成,剩下就是如何在我们的项目中用c++语言构建RabbitMQ客户端与本地搭建的服务器通信了。

本地登录
在这里插入图片描述

出现了如上界面证明本地服务器是能正常运行的。
# 2.C++ RabbitMQ客户端
网上还有直接用SimpleAmqpClient的例子,但是搭建起来又要花费好多时间,我这里在搜索了大量资料后直接对RabbitMQ-c重新封装了一层供自己调用。
例子:vs c++创建的win32应用台程序

头文件MainTest.h

#pragma once
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <windows.h>
#include <iostream>
#include <atltime.h>
#include "../RabbitMQClient/RabbitmqClient.h"
#include <time.h>
using namespace std;

#pragma comment(lib,"RabbitmqClient.lib")

enum _RABBITMQ_MESSAGE_TYPE
{
	_JSONSTR_Machine_status = 0,	//机台状态
	_JSONSTR_Alarm_info,			//报警信息传输
	_JSONSTR_update_Product_info,	//生产信息更新传输
};

class Vm
{
private:
	CRabbitmqClient *m_rabbitmq_client;
	string m_HostIP;//IP地址
	string m_Port;//端口号
	string m_username;//rabbitmq登录用户名
	string m_password;//rabbitmq登录密码

public:
	Vm();//构造函数
	~Vm();//析构函数
	void setConnectInfo(string IP, string port, string username, string password);
	
	FILE* m_rabbitFile = NULL;//发送失败文件句柄,发送失败保存当前信息,下一次发送成功时重传
	bool m_Rab_Isconnect = false;//是否连接
	bool Get_connect_status() { return m_Rab_Isconnect; }
	bool Rab_Connect();//连接RabbitMQ服务器
	bool Rab_DisConnect();//断开连接

	string GetExePath();//获取程序执行路径
	int Producer_Send(_RABBITMQ_MESSAGE_TYPE type, string message);//发送
	int Consumer_Rcv(string duquename, vector<string>& vgetmsg, int getnum = 1);//接收
};

main函数所在文件MainTest.cpp

// MainTest.cpp : 定义控制台应用程序的入口点。
//

#include "stdafx.h"
#include "MainTest.h"
#include <io.h>
#include <fstream>
#include <wchar.h>


string Vm::GetExePath()
{
	//获得路径
	string strExePath;
	char szFilePath[MAX_PATH + 1];
	GetModuleFileNameA(NULL, szFilePath, MAX_PATH);
	(strrchr(szFilePath, _T('\\')))[1] = 0;
	strExePath = szFilePath;

	return strExePath;

}

Vm::Vm()
{
	m_rabbitmq_client = CRabbitmqClient::GetInstance();
}

Vm::~Vm()
{
	delete m_rabbitmq_client;
	m_rabbitmq_client = NULL;
}


//转换成utf8格式的字符串,以免发送中文在Rabbitmq服务器上显示乱码
string string_To_UTF8(const std::string & str)
{
	int nwLen = ::MultiByteToWideChar(CP_ACP, 0, str.c_str(), -1, NULL, 0);

	wchar_t * pwBuf = new wchar_t[nwLen + 1];//一定要加1,不然会出现尾巴
	ZeroMemory(pwBuf, nwLen * 2 + 2);

	::MultiByteToWideChar(CP_ACP, 0, str.c_str(), str.length(), pwBuf, nwLen);

	int nLen = ::WideCharToMultiByte(CP_UTF8, 0, pwBuf, -1, NULL, NULL, NULL, NULL);

	char * pBuf = new char[nLen + 1];
	ZeroMemory(pBuf, nLen + 1);

	::WideCharToMultiByte(CP_UTF8, 0, pwBuf, nwLen, pBuf, nLen, NULL, NULL);

	std::string retStr(pBuf);

	delete[]pwBuf;
	delete[]pBuf;

	pwBuf = NULL;
	pBuf = NULL;

	return retStr;
}

//转换显示
string UTF8_To_string(const std::string & str)
{
	int nwLen = MultiByteToWideChar(CP_UTF8, 0, str.c_str(), -1, NULL, 0);

	wchar_t * pwBuf = new wchar_t[nwLen + 1];//一定要加1,不然会出现尾巴
	memset(pwBuf, 0, nwLen * 2 + 2);

	MultiByteToWideChar(CP_UTF8, 0, str.c_str(), str.length(), pwBuf, nwLen);

	int nLen = WideCharToMultiByte(CP_ACP, 0, pwBuf, -1, NULL, NULL, NULL, NULL);

	char * pBuf = new char[nLen + 1];
	memset(pBuf, 0, nLen + 1);

	WideCharToMultiByte(CP_ACP, 0, pwBuf, nwLen, pBuf, nLen, NULL, NULL);

	std::string retStr = pBuf;

	delete[]pBuf;
	delete[]pwBuf;

	pBuf = NULL;
	pwBuf = NULL;

	return retStr;
}

void Vm::setConnectInfo(string IP, string port, string username, string password)
{
	m_HostIP = IP;//IP地址
	m_Port = port;//端口号
	m_username = username;//rabbitmq登录用户名
	m_password = password;//rabbitmq登录密码
}

bool Vm::Rab_Connect()
{
	if (m_rabbitmq_client)
	{
		//设置要连接服务器的ip,端口号,用户名,密码
		m_rabbitmq_client->Setconncet_at(m_HostIP, atoi(m_Port.c_str()), m_username, m_password);
		string err = "";

		//连接服务器
		if (m_rabbitmq_client->Connect(err) < 0)
		{
			cout << "连接RabbitMQ提示:" + err << endl;
			return false;
		}
		cout << "连接RabbitMQ服务器成功" << endl;
		m_Rab_Isconnect = true;
		return true;
	}
	return false;
}

bool Vm::Rab_DisConnect()
{
	if (m_rabbitmq_client)
	{
		string err = "";
		if (m_rabbitmq_client->Disconnect(err) < 0)
		{
			cout << "连接RabbitMQ提示:" + err << endl;
			return false;
		}
		cout << "断开连接" << endl;
		m_Rab_Isconnect = false;
		return true;
	}
	return false;
}

int Vm::Producer_Send(_RABBITMQ_MESSAGE_TYPE type, string message)
{
	static string tempfile1 = GetExePath() + "\\RabproductFailMsg.txt";
	static string tempfile2 = GetExePath() + "\\RabalarmFailMsg.txt";
	static string tempfile3 = GetExePath() + "\\RabstatusFailMsg.txt";
	string tempfile;
	string duquename = "";
	switch (type)
	{
	case _JSONSTR_Machine_status:
		tempfile = tempfile3;
		duquename = "Q.EQPT_LOG_D";
		break;
	case _JSONSTR_Alarm_info:
		tempfile = tempfile2;
		duquename = "Q.ALARM_D";
		break;
	case _JSONSTR_update_Product_info:
		tempfile = tempfile1;
		duquename = "Q.PRODUCT_UPDATE_D";
		break;
	default:
		break;
	}

#if 1  /*客户端一直连接服务端,需要断开连接才断开 */
	CQueue queue_temp(duquename);
	string err = "";
	if (m_rabbitmq_client)
	{
		if (Get_connect_status()) //连接成功再进行下一步操作
		{
			//声明一个队列
			if (m_rabbitmq_client->queue_declare(queue_temp, err) < 0)
			{
				cout << "连接RabbitMQ提示:" + err << endl;
				m_rabbitFile = fopen(tempfile.c_str(), "a+");
				string temp = message + "\n";
				if (m_rabbitFile)
				{
					std::fputs(temp.c_str(), m_rabbitFile);
					std::fflush(m_rabbitFile);
					std::fclose(m_rabbitFile);
				}
				return -1;
			}
			else
				cout << "声明队列成功" << endl;

			/*直接使用默认交换机向队列发送消息,路由在声明队列的时候自动绑定到默认交换机,并且与队列同名*/
			//将序列化之后的二进制消息放到指定路由对应的消息队列中
			if (m_rabbitmq_client->publish(message, duquename/*routingkey*/, err) < 0)
			{
				cout << "连接RabbitMQ提示:" + err << endl;
				m_rabbitFile = fopen(tempfile.c_str(), "a+");
				string temp = message + "\n";
				if (m_rabbitFile)
				{
					std::fputs(temp.c_str(), m_rabbitFile);
					std::fflush(m_rabbitFile);
					std::fclose(m_rabbitFile);
				}
				return -1;
			}
			else
			{
				cout << "发送内容成功" << endl;
				if (access(tempfile.c_str(), 0) == 0)//文件存在
				{
					std::ifstream in(tempfile);
					std::string getmsg;
					vector<CMessage> vec;
					while (getline(in, getmsg)) {//逐行读取,遇到换行符
						CMessage mg(getmsg);
						vec.push_back(mg);
					}
					in.close();
					remove(tempfile.c_str());//删除文件重置
					if (vec.size() > 0) {
						if (m_rabbitmq_client->publish(vec, duquename, err) < 0)
						{
							cout << "连接RabbitMQ发送失败信息重发失败:" + err << endl;
							return -1;
						}
					}
				}
			}
		}
		else
		{
			cout << "连接RabbitMQ提示:" + err << endl;
			m_rabbitFile = fopen(tempfile.c_str(), "a+");
			string temp = message + "\n";
			if (m_rabbitFile)
			{
				std::fputs(temp.c_str(), m_rabbitFile);
				std::fflush(m_rabbitFile);
				std::fclose(m_rabbitFile);
			}
		}
		//断开连接
		//m_rabbitmq_client->__sleep(10);
		//m_rabbitmq_client->Disconnect();
		//return 0;
	}
	return 0;

#else /*客户端每发送一次,就建立一次连接与断开 */
	
	/*直接使用默认交换机向队列发送消息,路由在声明队列的时候自动绑定到默认交换机,并且与队列同名*/
	CRabbitmqClient rb_client(m_HostIP, atoi(m_Port.c_str()), m_username, m_password);
	//CExchange exchange(exchangename);
	CQueue queue_temp(duquename);
	//CRabbitmqClient rb_client("127.0.0.1", 5672, "guest", "guest");
	string err = "";
	//if (rb_client)
	{
		//连接服务器
		if (rb_client.Connect(err) < 0)
		{
			cout << "连接RabbitMQ提示:" + err << endl;
			m_rabbitFile = fopen(tempfile.c_str(), "a+");
			string temp = message + "\n";//方便写入与读取
			if (m_rabbitFile)
			{
				std::fputs(temp.c_str(), m_rabbitFile);
				std::fflush(m_rabbitFile);
				std::fclose(m_rabbitFile);
			}
			return -1;
		}
		/*使用默认交换机不用声明交换机*/
		//声明一个交换机
		//if (rb_client.exchange_declare(exchange, err) < 0)
		//{
		//	return -1;
		//}
		//声明一个队列
		if (rb_client.queue_declare(queue_temp, err) < 0)
		{
			cout << "连接RabbitMQ提示:" + err << endl;
			m_rabbitFile = fopen(tempfile.c_str(), "a+");
			string temp = message + "\n";
			if (m_rabbitFile)
			{
				std::fputs(temp.c_str(), m_rabbitFile);
				std::fflush(m_rabbitFile);
				std::fclose(m_rabbitFile);
			}
			return -1;
		}

		/*使用默认交换机不用将交换机绑定到队列*/
		//将交换机绑定到队列, 
		//if (rb_client.queue_bind(queue_temp, exchange, routkeyname, err)<0)
		//{
		//	return -1;
		//}

		//将序列化之后的二进制消息放到指定路由对应的消息队列中
		if (rb_client.publish(message, duquename, err)<0)
		{
			cout << "连接RabbitMQ提示:" + err << endl;
			m_rabbitFile = fopen(tempfile.c_str(), "a+");
			string temp = message + "\n";
			if (m_rabbitFile)
			{
				std::fputs(temp.c_str(), m_rabbitFile);
				std::fflush(m_rabbitFile);
				std::fclose(m_rabbitFile);
			}
			return -1;
		}
		else
		{
			if (access(tempfile.c_str(), 0) == 0)//文件存在
			{
				std::ifstream in(tempfile);
				std::string getmsg;
				vector<CMessage> vec;
				while (getline(in, getmsg)) {//逐行读取,遇到换行符
					CMessage mg(getmsg);
					vec.push_back(mg);
				}
				in.close();
				remove(tempfile.c_str());//删除文件重置
				if (vec.size() > 0) {
					if (rb_client.publish(vec, duquename, err) < 0)
					{
						cout << "连接RabbitMQ发送失败信息重发失败:" + err << endl;
						return -1;
					}
				}
			}
		}
		//断开连接
		rb_client.__sleep(10);
		rb_client.Disconnect();
		return 0;
	}
#endif
}

int Vm::Consumer_Rcv(string duquename, vector<string>& vgetmsg, int getnum)
{
	// CRabbitmqClient rb_client("127.0.0.1", 5672, "guest", "guest");
	CRabbitmqClient rb_client(m_HostIP, atoi(m_Port.c_str()), m_username, m_password);
	//CExchange exchange(exchangename);
	//CQueue queue_temp(duquename);
	string err = "";
	::timeval tvb = { 1,10 };
	//if (m_rabbitmq_client)
	{
		//连接服务器
		if (rb_client.Connect(err) < 0)
		{
			cout << "连接RabbitMQ提示:" + err << endl;
			return -1;
		}
		cout << "连接RabbitMQ服务器成功" << endl;
		
		vgetmsg.clear();
		//接收
		if (rb_client.consumer(duquename, vgetmsg, getnum, &tvb, err)<0)
		{
			cout << "连接RabbitMQ提示:" + err << endl;
			return -1;
		}

		rb_client.__sleep(10);
		rb_client.Disconnect();
		return 0;
	}
}


int main()
{
	Vm Interface;
	Interface.setConnectInfo("localhost", "5672", "guest", "guest");//注:本地网页默认登录端口是15672,监听的端口号为5672
	Interface.Rab_Connect();
	string temp = "测试机台信息发送";
	Interface.Producer_Send(_JSONSTR_Machine_status, string_To_UTF8(temp)/*转化成中文UTF-8格式发送,不然在服务器界面上显示的是乱码*/);
	Sleep(1000);
	Interface.Rab_DisConnect();

	vector<string> msg;
	msg.clear();
	Interface.Consumer_Rcv("Q.EQPT_LOG_D",msg);//从刚刚的队列中取消息
	if(msg.size()>0)
		cout << "连接RabbitMQ接收到的信息为:" + UTF8_To_string(msg[0]) << endl;
	int a = 0;
	scanf("%d", &a);
    return 0;
}


在这里插入图片描述
配置好属性后,编译通过,运行如下:
在这里插入图片描述

想在rabbitMQ服务端能看到,main函数程序中注释掉

int main()
{
	Vm Interface;
	Interface.setConnectInfo("localhost", "5672", "guest", "guest");//注:本地网页默认登录端口是15672,监听的端口号为5672
	Interface.Rab_Connect();
	string temp = "测试机台信息发送";
	Interface.Producer_Send(_JSONSTR_Machine_status, string_To_UTF8(temp)/*转化成中文UTF-8格式发送,不然在服务器界面上显示的是乱码*/);
	Sleep(1000);
	Interface.Rab_DisConnect();

	//vector<string> msg;
	//msg.clear();
	//Interface.Consumer_Rcv("Q.EQPT_LOG_D",msg);//从刚刚的队列中取消息
	//if(msg.size()>0)
	//	cout << "连接RabbitMQ接收到的信息为:" + UTF8_To_string(msg[0]) << endl;
	int a = 0;
	scanf("%d", &a);
    return 0;
}

在这里插入图片描述

点击Q.EQPT_LOG_D,查看刚刚创建的队列具体里面的具体内容
在这里插入图片描述
下拉点击Get Message就能显示刚刚发送的消息了。
在这里插入图片描述
好了,完成了。

3.结语

此文档只有调用程序,没有RabbitMQ c++封装的代码,由于本人也花费了很多精力,此封装程序暂没公开。程序在我主界面资源的下载,保证下载直接就能编译运行使用到各位的c++项目中,如实在不想用积分,请私信。

  C++知识库 最新文章
【C++】友元、嵌套类、异常、RTTI、类型转换
通讯录的思路与实现(C语言)
C++PrimerPlus 第七章 函数-C++的编程模块(
Problem C: 算法9-9~9-12:平衡二叉树的基本
MSVC C++ UTF-8编程
C++进阶 多态原理
简单string类c++实现
我的年度总结
【C语言】以深厚地基筑伟岸高楼-基础篇(六
c语言常见错误合集
上一篇文章      下一篇文章      查看所有文章
加:2021-11-27 09:43:57  更:2021-11-27 09:46:17 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/6 13:36:21-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码