| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> CentOS+QT+KAFKA开发环境部署及测试 -> 正文阅读 |
|
[大数据]CentOS+QT+KAFKA开发环境部署及测试 |
CentOS+QT+KAFKA开发环境部署及测试 本文档记录了在CentOS环境下通过QT开发KAFKA程序的步骤,关于CentOS中安装QT集成开发环境,不再赘述。此处默认是在QT编译环境已经完备的情况下,如何配置KAFKA的编译环境及测试实例演示。
librdkafa是一个开源的KAFKA客户端,由C/C++实现,提供了生产者、消费者、管理者的客户端,是一款稳定的、高性能的消息中间件。对于生产者每秒可发送百万量级的消息数,对于消费者每秒可以消费掉3百万量级的消息数。
(以管理员用户权限) 至此librdkafka安装完毕
上述log中显示了kafka的信息,如密码、端口号等,默认端口号: port = 9092
7、启动生产者指令:.\kafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic demo 生产者发布数据
参数:--from-beginning表示游标从头开始,不加此参数则从当前位置开始。 .\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo 消费者接收数据
创建基于控制台的Qt应用程序,键入源代码: #include <QCoreApplication> #include <iostream> #include <string> #include <cstdlib> #include <cstdio> #include <csignal> #include <cstring> #include <getopt.h> #include "rdkafkacpp.h" static bool run = true; static void sigterm (int sig) { ??? run = false; } class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb { public: ??? void dr_cb (RdKafka::Message &message) { ??????? std::cout << "Message delivery for (" << message.len() << " bytes): " <<message.errstr() << std::endl; ??????? if (message.key()) ??????????? std::cout << "Key: " << *(message.key()) << ";" << std::endl; ??? } }; class ExampleEventCb : public RdKafka::EventCb { public: ??? void event_cb (RdKafka::Event &event) { ??????? switch (event.type()) ??????? { ??????? case RdKafka::Event::EVENT_ERROR: ??????????? std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl; ??????????? if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) ??????????????? run = false; ??????????? break; ??????? case RdKafka::Event::EVENT_STATS: ??????????? std::cerr << "\"STATS\": " << event.str() << std::endl; ??????????? break; ???? ???case RdKafka::Event::EVENT_LOG: ??????????? fprintf(stderr, "LOG-%i-%s: %s\n", ??????????????????? event.severity(), event.fac().c_str(), event.str().c_str()); ??????????? break; ??????? default: ??????????? std::cerr << "EVENT " << event.type() <<" (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl; ??????????? break; ??????? } ??? } }; int main(int argc, char *argv[]) { ??? QCoreApplication a(argc, argv); //??? std::string brokers = "localhost"; ??? std::string brokers = "192.168.3.103"; ??? std::string errstr; //??? std::string topic_str="test"; ??? std::string topic_str="demo"; ??? int32_t partition = RdKafka::Topic::PARTITION_UA; ??? RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); ??? RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); ??? conf->set("bootstrap.servers", brokers, errstr); ??? ExampleEventCb ex_event_cb; ??? conf->set("event_cb", &ex_event_cb, errstr); ??? signal(SIGINT, sigterm); ??? signal(SIGTERM, sigterm); ??? ExampleDeliveryReportCb ex_dr_cb; ??? conf->set("dr_cb", &ex_dr_cb, errstr); ??? RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); ??? if (!producer) { ??????? std::cerr << "Failed to create producer: " << errstr << std::endl; ? ??????exit(1); ??? } ??? std::cout << "% Created producer " << producer->name() << std::endl; ??? RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr); ??? if (!topic) { ??????? std::cerr << "Failed to create topic: " << errstr << std::endl; ??????? exit(1); ??? } ??? for (std::string line; run && std::getline(std::cin, line);) { ??????? if (line.empty()) { ??????????? producer->poll(0); ??????????? continue; ??????? } ??????? RdKafka::ErrorCode resp = producer->produce(topic, partition, ??????????????????????????????????????????????????? RdKafka::Producer::RK_MSG_COPY /* Copy payload */, ??????????????????????????????????????????????????? const_cast<char *>(line.c_str()), line.size(), ????????????????????????????????????? ??????????????NULL, NULL); ??????? if (resp != RdKafka::ERR_NO_ERROR) ??????????? std::cerr << "% Produce failed: " << RdKafka::err2str(resp) << std::endl; ??????? else ??????????? std::cerr << "% Produced message (" << line.size() << " bytes)" << std::endl; ??????? producer->poll(0); ??? } ??? run = true; ??? // 退出前处理完输出队列中的消息 ??? while (run && producer->outq_len() > 0) { ??????? std::cerr << "Waiting for " << producer->outq_len() << std::endl; ??????? producer->poll(1000); ??? } ??? delete conf; ??? delete tconf; ??? delete topic; ??? delete producer; ??? RdKafka::wait_destroyed(5000); ??? return a.exec(); } 此时直接编译会报错,需做如下配置。 在.pro文件中添加引入librdkafka的库文件, QMAKE_LFLAGS += -lrdkafka -lrdkafka++-lz -lpthread -lrt #-lrdkafka等价于 LIBS += /usr/local/lib/librdkafka.so? 由于默认情况下librdkafka的头文件和库文件安装路径为: /usr/local/include/librdkafka /usr/local/lib?? 此时需要在/etc/ld.so.conf文件中加入librdkafka.so所在的目录:/usr/local/lib/ 方法一: echo "/usr/local/lib" >> /etc/ld.so.conf 方法二: vim /etc/ld.so.conf ldconfig 然后在文件尾部添加librdkafka库的路径 编译出现如下错误 :-1: error: cannot find –lz 解决办法: yum install zlib-devel 运行时错误日志: LOG-3-FAIL: [thrd:DESKTOP-VC32RT7:9092/0]: DESKTOP-VC32RT7:9092/0: Failed to resolve 'DESKTOP-VC32RT7:9092': 未知的名称或服务 (after 10013ms in state CONNECT, 3 identical error(s) suppressed) 原因是默认情况下,kafka服务器不允许外部访问,打开./config/server.properties配置文件,更改如下:
listeners=PLAINTEXT://:9092
修改前:advertised.listeners=PLAINTEXT://your.host.name:9092 修改后:advertised.listeners=PLAINTEXT://192.168.3.103:9092 然后重新启动zookeeper和kafka服务和消费者。在虚拟机上通过Qt编译运行生产者程序,可以在终端中输入消息内容,并回车,就可以在宿主即的消费者窗口打印输出消息了。 生产者发布消息 消费者订阅消息
上例中使用的消费者为librdkafka中自带的消费者客户端,本例将自己创建消费者程序,订阅来自客户端的消息。 创建基于控制台的Qt应用程序,键入源代码: #include <QCoreApplication> #include <iostream> #include <string> #include <cstdlib> #include <cstdio> #include <csignal> #include <cstring> #include <sys/time.h> #include <getopt.h> #include <unistd.h> #include "rdkafkacpp.h" static bool run = true; static bool exit_eof = true; static int eof_cnt = 0; static int partition_cnt = 0; static int verbosity = 1; static long msg_cnt = 0; static int64_t msg_bytes = 0; static void sigterm (int sig) { ??? run = false; } class ExampleEventCb : public RdKafka::EventCb { public: ??? void event_cb (RdKafka::Event &event) { ??????? switch (event.type()) ??????? { ??????? case RdKafka::Event::EVENT_ERROR: ??????????? std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl; ??????????? if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) ??????????????? run = false; ??????????? break; ??????? case RdKafka::Event::EVENT_STATS: ??????????? std::cerr << "\"STATS\": " << event.str() << std::endl; ??????????? break; ??????? case RdKafka::Event::EVENT_LOG: ??????????? fprintf(stderr, "LOG-%i-%s: %s\n",event.severity(), event.fac().c_str(), event.str().c_str()); ??????????? break; ??????? case RdKafka::Event::EVENT_THROTTLE: ??????????? std::cerr << "THROTTLED: " << event.throttle_time() << "ms by "<<event.broker_name() << " id " << (int)event.broker_id() << std::endl; ??????????? break; ??????? default: ??????????? std::cerr << "EVENT " << event.type() <<" (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl; ??????????? break; ??????? } ??? } }; void msg_consume(RdKafka::Message* message, void* opaque) { ??? switch (message->err()) { ??? case RdKafka::ERR__TIMED_OUT: ??????? //std::cerr << "RdKafka::ERR__TIMED_OUT"<<std::endl; ??????? break; ??? case RdKafka::ERR_NO_ERROR: ??????? /* Real message */ ??????? msg_cnt++; ??????? msg_bytes += message->len(); ??????? if (verbosity >= 3) ??????????? std::cerr << "Read msg at offset " << message->offset() << std::endl; ??????? RdKafka::MessageTimestamp ts; ??????? ts = message->timestamp(); ??????? if (verbosity >= 2 &&ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) { ??????????? std::string tsname = "?"; ??????????? if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME) ??????????????? tsname = "create time"; ??????????? else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME) ??????????????? tsname = "log append time"; ??????????? std::cout << "Timestamp: " << tsname << " " << ts.timestamp << std::endl; ??????? } ??????? if (verbosity >= 2 && message->key()) { ??????????? std::cout << "Key: " << *message->key() << std::endl; ??????? } ??????? if (verbosity >= 1) { ??????????? printf("%.*s\n",static_cast<int>(message->len()),static_cast<const char *>(message->payload())); ??????? } ??????? break; ??? case RdKafka::ERR__PARTITION_EOF: ??????? /* Last message */ ??????? if (exit_eof && ++eof_cnt == partition_cnt) { ??????????? std::cerr << "%% EOF reached for all " << partition_cnt <<" partition(s)" << std::endl; ??????????? run = false; ??????? } ??????? break; ??? case RdKafka::ERR__UNKNOWN_TOPIC: ??????? break; ??? case RdKafka::ERR__UNKNOWN_PARTITION: ??????? std::cerr << "Consume failed: " << message->errstr() << std::endl; ??????? run = false; ? ??????break; ??? default: ??????? /* Errors */ ??????? std::cerr << "Consume failed: " << message->errstr() << std::endl; ??????? run = false; ??????? break; ??? } } class ExampleConsumeCb : public RdKafka::ConsumeCb { public: ??? void consume_cb (RdKafka::Message &msg, void *opaque) { ??????? msg_consume(&msg, opaque); ??? } }; int main(int argc, char *argv[]) { ??? QCoreApplication a(argc, argv); //??? std::string brokers = "localhost"; ??? std::string brokers = "192.168.3.103"; ??? std::string errstr; //??? std::string topic_str="test"; ??? std::string topic_str="demo"; ??? std::vector<std::string> topics; ??? std::string group_id="101"; ??? RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); ??? RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); ??? //group.id必须设置 ??? if (conf->set("group.id", group_id, errstr) != RdKafka::Conf::CONF_OK) { ??????? std::cerr << errstr << std::endl; ??????? exit(1); ??? } ??? topics.push_back(topic_str); ??? //bootstrap.servers可以替换为metadata.broker.list ??? conf->set("bootstrap.servers", brokers, errstr); ??? ExampleConsumeCb ex_consume_cb; ??? conf->set("consume_cb", &ex_consume_cb, errstr); ??? ExampleEventCb ex_event_cb; ??? conf->set("event_cb", &ex_event_cb, errstr); ??? conf->set("default_topic_conf", tconf, errstr); ??? signal(SIGINT, sigterm); ??? signal(SIGTERM, sigterm); ??? RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr); ??? if (!consumer) { ??????? std::cerr << "Failed to create consumer: " << errstr << std::endl; ??????? exit(1); ??? } ??? std::cout << "% Created consumer " << consumer->name() << std::endl; ??? RdKafka::ErrorCode err = consumer->subscribe(topics); ??? if (err) { ??????? std::cerr << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(err) << std::endl; ??????? exit(1); ??? } ??? while (run) { ??? //5000毫秒未订阅到消息,触发RdKafka::ERR__TIMED_OUT ??? RdKafka::Message *msg = consumer->consume(5000); ??? msg_consume(msg, NULL); ??? delete msg; ??? } ??? consumer->close(); ??? delete conf; ??? delete tconf; ??? delete consumer; ??? std::cerr << "% Consumed " << msg_cnt << " messages ("<< msg_bytes << " bytes)" << std::endl; ??? //应用退出之前等待rdkafka清理资源 ??? RdKafka::wait_destroyed(5000); ??? return a.exec(); } 在.pro文件中添加librdkafka库路径,此处演示了另一种更常规的添加库路径的方式: QT -= gui CONFIG += c++11 console CONFIG -= app_bundle DEFINES += QT_DEPRECATED_WARNINGS SOURCES += \ ??????? main.cpp # Default rules for deployment. qnx: target.path = /tmp/$${TARGET}/bin else: unix:!android: target.path = /opt/$${TARGET}/bin !isEmpty(target.path): INSTALLS += target INCLUDEPATH += /usr/local/include/librdkafka LIBS += -L/usr/local/lib -lrdkafka LIBS += -L/usr/local/lib -lrdkafka++ 编译并运行消费者程序, 生产者发布消息: 消费者订阅消息: 参考来源:
|
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年3日历 | -2025/3/4 3:30:42- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |