RabbitMQ-c 从安装到c++客户端项目实战
实现需求: c++项目实现RabbitMQ客户端与服务端交互数据,MES信息传输。
# 1.安装RabbitMQ 此处只表示Windows10下的安装,其他Linux之类的安装请搜寻其他资料。 开发工具 visual studio 2015 cmake 3.21.3工具下载网址没有版本要求 RabbitMQ Windows版本地服务器
#(1)RabbitMQ服务器安装 RabbitMQ是要依赖Erlang的,安装之前先装Erlang。 Erlang下载路径官网 RabbitMQ下载路径官网,因为本人下载的是rabbitmq-server-3.9.7.exe,搭建成功并与能与本地项目通讯,所以推荐可以直接下此版本。如图:此版本上说明了是依赖于Erlang/OTP 23.2链接 和 Erlang 24链接的(以免你们难找所以下载链接都贴上) 上述步骤安装完成后; 就可以在浏览器中登录本地刚搭建的RabbitMQ服务器了,默认网址访问http://localhost:15672 进行测试,默认登录账户为:guest / guest。 注意:如果浏览器打不开网址,可能是默认安装RabbitMQ插件节点没有开启,在windows开始栏找到如图cmd打开 输入 开启RabbitMQ节点 命令 rabbitmqctl start_app #开启RabbitMQ管理模块的插件,并配置到RabbitMQ节点上 rabbitmq-plugins enable rabbitmq_management 如若配置失败,可能安装出错,卸载RabbitMQ服务端和Erlang之后重新安装(先安装Erlang完成后再安装RabbitMQ)
如果需要关闭RabbitMQ节点,可以使用下面的命令: rabbitmqctl stop_app
#(2)RabbitMQ-c安装 下载拉取源码:RabbitMQ-c,本地懒得配置git的可以直接下载zip包 打开cmake软件,构建自己的vs工程文件。 我用的是vs2015,win32程序,点击Configure按钮配置 点击完成后点击==Generate,按图取消其值的勾选,生成成功。 如果要修改cmake配置,点击File->Delete cache就行了
然后就是生成我们自己软件调用的DLL了。 点击open Project按钮,或者直接上面的build输出文件夹下打开vs工程, 成功后在生成目录下可以看到我们所需要的DLL文件 至此,所有的RabbitMQ配置已经完成,剩下就是如何在我们的项目中用c++语言构建RabbitMQ客户端与本地搭建的服务器通信了。
本地登录
出现了如上界面证明本地服务器是能正常运行的。 # 2.C++ RabbitMQ客户端 网上还有直接用SimpleAmqpClient的例子,但是搭建起来又要花费好多时间,我这里在搜索了大量资料后直接对RabbitMQ-c重新封装了一层供自己调用。 例子:vs c++创建的win32应用台程序
头文件MainTest.h
#pragma once
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <windows.h>
#include <iostream>
#include <atltime.h>
#include "../RabbitMQClient/RabbitmqClient.h"
#include <time.h>
using namespace std;
#pragma comment(lib,"RabbitmqClient.lib")
enum _RABBITMQ_MESSAGE_TYPE
{
_JSONSTR_Machine_status = 0,
_JSONSTR_Alarm_info,
_JSONSTR_update_Product_info,
};
class Vm
{
private:
CRabbitmqClient *m_rabbitmq_client;
string m_HostIP;
string m_Port;
string m_username;
string m_password;
public:
Vm();
~Vm();
void setConnectInfo(string IP, string port, string username, string password);
FILE* m_rabbitFile = NULL;
bool m_Rab_Isconnect = false;
bool Get_connect_status() { return m_Rab_Isconnect; }
bool Rab_Connect();
bool Rab_DisConnect();
string GetExePath();
int Producer_Send(_RABBITMQ_MESSAGE_TYPE type, string message);
int Consumer_Rcv(string duquename, vector<string>& vgetmsg, int getnum = 1);
};
main函数所在文件MainTest.cpp
#include "stdafx.h"
#include "MainTest.h"
#include <io.h>
#include <fstream>
#include <wchar.h>
string Vm::GetExePath()
{
string strExePath;
char szFilePath[MAX_PATH + 1];
GetModuleFileNameA(NULL, szFilePath, MAX_PATH);
(strrchr(szFilePath, _T('\\')))[1] = 0;
strExePath = szFilePath;
return strExePath;
}
Vm::Vm()
{
m_rabbitmq_client = CRabbitmqClient::GetInstance();
}
Vm::~Vm()
{
delete m_rabbitmq_client;
m_rabbitmq_client = NULL;
}
string string_To_UTF8(const std::string & str)
{
int nwLen = ::MultiByteToWideChar(CP_ACP, 0, str.c_str(), -1, NULL, 0);
wchar_t * pwBuf = new wchar_t[nwLen + 1];
ZeroMemory(pwBuf, nwLen * 2 + 2);
::MultiByteToWideChar(CP_ACP, 0, str.c_str(), str.length(), pwBuf, nwLen);
int nLen = ::WideCharToMultiByte(CP_UTF8, 0, pwBuf, -1, NULL, NULL, NULL, NULL);
char * pBuf = new char[nLen + 1];
ZeroMemory(pBuf, nLen + 1);
::WideCharToMultiByte(CP_UTF8, 0, pwBuf, nwLen, pBuf, nLen, NULL, NULL);
std::string retStr(pBuf);
delete[]pwBuf;
delete[]pBuf;
pwBuf = NULL;
pBuf = NULL;
return retStr;
}
string UTF8_To_string(const std::string & str)
{
int nwLen = MultiByteToWideChar(CP_UTF8, 0, str.c_str(), -1, NULL, 0);
wchar_t * pwBuf = new wchar_t[nwLen + 1];
memset(pwBuf, 0, nwLen * 2 + 2);
MultiByteToWideChar(CP_UTF8, 0, str.c_str(), str.length(), pwBuf, nwLen);
int nLen = WideCharToMultiByte(CP_ACP, 0, pwBuf, -1, NULL, NULL, NULL, NULL);
char * pBuf = new char[nLen + 1];
memset(pBuf, 0, nLen + 1);
WideCharToMultiByte(CP_ACP, 0, pwBuf, nwLen, pBuf, nLen, NULL, NULL);
std::string retStr = pBuf;
delete[]pBuf;
delete[]pwBuf;
pBuf = NULL;
pwBuf = NULL;
return retStr;
}
void Vm::setConnectInfo(string IP, string port, string username, string password)
{
m_HostIP = IP;
m_Port = port;
m_username = username;
m_password = password;
}
bool Vm::Rab_Connect()
{
if (m_rabbitmq_client)
{
m_rabbitmq_client->Setconncet_at(m_HostIP, atoi(m_Port.c_str()), m_username, m_password);
string err = "";
if (m_rabbitmq_client->Connect(err) < 0)
{
cout << "连接RabbitMQ提示:" + err << endl;
return false;
}
cout << "连接RabbitMQ服务器成功" << endl;
m_Rab_Isconnect = true;
return true;
}
return false;
}
bool Vm::Rab_DisConnect()
{
if (m_rabbitmq_client)
{
string err = "";
if (m_rabbitmq_client->Disconnect(err) < 0)
{
cout << "连接RabbitMQ提示:" + err << endl;
return false;
}
cout << "断开连接" << endl;
m_Rab_Isconnect = false;
return true;
}
return false;
}
int Vm::Producer_Send(_RABBITMQ_MESSAGE_TYPE type, string message)
{
static string tempfile1 = GetExePath() + "\\RabproductFailMsg.txt";
static string tempfile2 = GetExePath() + "\\RabalarmFailMsg.txt";
static string tempfile3 = GetExePath() + "\\RabstatusFailMsg.txt";
string tempfile;
string duquename = "";
switch (type)
{
case _JSONSTR_Machine_status:
tempfile = tempfile3;
duquename = "Q.EQPT_LOG_D";
break;
case _JSONSTR_Alarm_info:
tempfile = tempfile2;
duquename = "Q.ALARM_D";
break;
case _JSONSTR_update_Product_info:
tempfile = tempfile1;
duquename = "Q.PRODUCT_UPDATE_D";
break;
default:
break;
}
#if 1
CQueue queue_temp(duquename);
string err = "";
if (m_rabbitmq_client)
{
if (Get_connect_status())
{
if (m_rabbitmq_client->queue_declare(queue_temp, err) < 0)
{
cout << "连接RabbitMQ提示:" + err << endl;
m_rabbitFile = fopen(tempfile.c_str(), "a+");
string temp = message + "\n";
if (m_rabbitFile)
{
std::fputs(temp.c_str(), m_rabbitFile);
std::fflush(m_rabbitFile);
std::fclose(m_rabbitFile);
}
return -1;
}
else
cout << "声明队列成功" << endl;
if (m_rabbitmq_client->publish(message, duquename, err) < 0)
{
cout << "连接RabbitMQ提示:" + err << endl;
m_rabbitFile = fopen(tempfile.c_str(), "a+");
string temp = message + "\n";
if (m_rabbitFile)
{
std::fputs(temp.c_str(), m_rabbitFile);
std::fflush(m_rabbitFile);
std::fclose(m_rabbitFile);
}
return -1;
}
else
{
cout << "发送内容成功" << endl;
if (access(tempfile.c_str(), 0) == 0)
{
std::ifstream in(tempfile);
std::string getmsg;
vector<CMessage> vec;
while (getline(in, getmsg)) {
CMessage mg(getmsg);
vec.push_back(mg);
}
in.close();
remove(tempfile.c_str());
if (vec.size() > 0) {
if (m_rabbitmq_client->publish(vec, duquename, err) < 0)
{
cout << "连接RabbitMQ发送失败信息重发失败:" + err << endl;
return -1;
}
}
}
}
}
else
{
cout << "连接RabbitMQ提示:" + err << endl;
m_rabbitFile = fopen(tempfile.c_str(), "a+");
string temp = message + "\n";
if (m_rabbitFile)
{
std::fputs(temp.c_str(), m_rabbitFile);
std::fflush(m_rabbitFile);
std::fclose(m_rabbitFile);
}
}
}
return 0;
#else
CRabbitmqClient rb_client(m_HostIP, atoi(m_Port.c_str()), m_username, m_password);
CQueue queue_temp(duquename);
string err = "";
{
if (rb_client.Connect(err) < 0)
{
cout << "连接RabbitMQ提示:" + err << endl;
m_rabbitFile = fopen(tempfile.c_str(), "a+");
string temp = message + "\n";
if (m_rabbitFile)
{
std::fputs(temp.c_str(), m_rabbitFile);
std::fflush(m_rabbitFile);
std::fclose(m_rabbitFile);
}
return -1;
}
if (rb_client.queue_declare(queue_temp, err) < 0)
{
cout << "连接RabbitMQ提示:" + err << endl;
m_rabbitFile = fopen(tempfile.c_str(), "a+");
string temp = message + "\n";
if (m_rabbitFile)
{
std::fputs(temp.c_str(), m_rabbitFile);
std::fflush(m_rabbitFile);
std::fclose(m_rabbitFile);
}
return -1;
}
if (rb_client.publish(message, duquename, err)<0)
{
cout << "连接RabbitMQ提示:" + err << endl;
m_rabbitFile = fopen(tempfile.c_str(), "a+");
string temp = message + "\n";
if (m_rabbitFile)
{
std::fputs(temp.c_str(), m_rabbitFile);
std::fflush(m_rabbitFile);
std::fclose(m_rabbitFile);
}
return -1;
}
else
{
if (access(tempfile.c_str(), 0) == 0)
{
std::ifstream in(tempfile);
std::string getmsg;
vector<CMessage> vec;
while (getline(in, getmsg)) {
CMessage mg(getmsg);
vec.push_back(mg);
}
in.close();
remove(tempfile.c_str());
if (vec.size() > 0) {
if (rb_client.publish(vec, duquename, err) < 0)
{
cout << "连接RabbitMQ发送失败信息重发失败:" + err << endl;
return -1;
}
}
}
}
rb_client.__sleep(10);
rb_client.Disconnect();
return 0;
}
#endif
}
int Vm::Consumer_Rcv(string duquename, vector<string>& vgetmsg, int getnum)
{
CRabbitmqClient rb_client(m_HostIP, atoi(m_Port.c_str()), m_username, m_password);
string err = "";
::timeval tvb = { 1,10 };
{
if (rb_client.Connect(err) < 0)
{
cout << "连接RabbitMQ提示:" + err << endl;
return -1;
}
cout << "连接RabbitMQ服务器成功" << endl;
vgetmsg.clear();
if (rb_client.consumer(duquename, vgetmsg, getnum, &tvb, err)<0)
{
cout << "连接RabbitMQ提示:" + err << endl;
return -1;
}
rb_client.__sleep(10);
rb_client.Disconnect();
return 0;
}
}
int main()
{
Vm Interface;
Interface.setConnectInfo("localhost", "5672", "guest", "guest");
Interface.Rab_Connect();
string temp = "测试机台信息发送";
Interface.Producer_Send(_JSONSTR_Machine_status, string_To_UTF8(temp));
Sleep(1000);
Interface.Rab_DisConnect();
vector<string> msg;
msg.clear();
Interface.Consumer_Rcv("Q.EQPT_LOG_D",msg);
if(msg.size()>0)
cout << "连接RabbitMQ接收到的信息为:" + UTF8_To_string(msg[0]) << endl;
int a = 0;
scanf("%d", &a);
return 0;
}
配置好属性后,编译通过,运行如下:
想在rabbitMQ服务端能看到,main函数程序中注释掉
int main()
{
Vm Interface;
Interface.setConnectInfo("localhost", "5672", "guest", "guest");
Interface.Rab_Connect();
string temp = "测试机台信息发送";
Interface.Producer_Send(_JSONSTR_Machine_status, string_To_UTF8(temp));
Sleep(1000);
Interface.Rab_DisConnect();
int a = 0;
scanf("%d", &a);
return 0;
}
点击Q.EQPT_LOG_D,查看刚刚创建的队列具体里面的具体内容 下拉点击Get Message就能显示刚刚发送的消息了。 好了,完成了。
3.结语
此文档只有调用程序,没有RabbitMQ c++封装的代码,由于本人也花费了很多精力,此封装程序暂没公开。程序在我主界面资源的下载,保证下载直接就能编译运行使用到各位的c++项目中,如实在不想用积分,请私信。
|