IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> CentOS+QT+KAFKA开发环境部署及测试 -> 正文阅读

[大数据]CentOS+QT+KAFKA开发环境部署及测试

CentOS+QT+KAFKA开发环境部署及测试

本文档记录了在CentOS环境下通过QT开发KAFKA程序的步骤,关于CentOS中安装QT集成开发环境,不再赘述。此处默认是在QT编译环境已经完备的情况下,如何配置KAFKA的编译环境及测试实例演示。

  • 安装librdkafka

librdkafa是一个开源的KAFKA客户端,由C/C++实现,提供了生产者、消费者、管理者的客户端,是一款稳定的、高性能的消息中间件。对于生产者每秒可发送百万量级的消息数,对于消费者每秒可以消费掉3百万量级的消息数。

  1. 下载librdkafka源码库,地址:GitHub - edenhill/librdkafka: The Apache Kafka C/C++ library
  2. 解压到指定目录,cd ./librdkafka-master/

  1. 执行命令:chmod 777 configure lds-gen.py
  2. 执行命令:./configure

  1. 执行指令:make

  1. 执行指令:make install

(以管理员用户权限)

至此librdkafka安装完毕

  • 安装 KAFKA服务器
  1. 下载二进制安装包:Apache Download Mirrors解压即可(免安装)
  2. 下载JDK安装包:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
  3. 先安装JDK并配置系统环境变量,这是KAFKA服务运行的前提;
  4. 启动zookeeper服务指令:.\windows\zookeeper-server-start.bat ..\config\zookeeper.properties

  1. 启动kafka服务指令:.\windows\kafka-server-start.bat ..\config\server.properties

上述log中显示了kafka的信息,如密码、端口号等,默认端口号:

port = 9092

  1. 创建主题指令:.\windows\kafka-topics.bat --zookeeper 127.0.0.1:2181 --create --topic demo-topic --partitions 2? --replication-factor 1

7、启动生产者指令:.\kafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic demo

生产者发布数据

  1. 启动消费者指令:.\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo --from-beginning

参数:--from-beginning表示游标从头开始,不加此参数则从当前位置开始。

.\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo

消费者接收数据

  • QT开发生产者和消费者程序
  1. 生产者

创建基于控制台的Qt应用程序,键入源代码:

#include <QCoreApplication>

#include <iostream>

#include <string>

#include <cstdlib>

#include <cstdio>

#include <csignal>

#include <cstring>

#include <getopt.h>

#include "rdkafkacpp.h"

static bool run = true;

static void sigterm (int sig) {

??? run = false;

}

class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {

public:

??? void dr_cb (RdKafka::Message &message) {

??????? std::cout << "Message delivery for (" << message.len() << " bytes): " <<message.errstr() << std::endl;

??????? if (message.key())

??????????? std::cout << "Key: " << *(message.key()) << ";" << std::endl;

??? }

};

class ExampleEventCb : public RdKafka::EventCb {

public:

??? void event_cb (RdKafka::Event &event) {

??????? switch (event.type())

??????? {

??????? case RdKafka::Event::EVENT_ERROR:

??????????? std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl;

??????????? if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)

??????????????? run = false;

??????????? break;

??????? case RdKafka::Event::EVENT_STATS:

??????????? std::cerr << "\"STATS\": " << event.str() << std::endl;

??????????? break;

???? ???case RdKafka::Event::EVENT_LOG:

??????????? fprintf(stderr, "LOG-%i-%s: %s\n",

??????????????????? event.severity(), event.fac().c_str(), event.str().c_str());

??????????? break;

??????? default:

??????????? std::cerr << "EVENT " << event.type() <<" (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl;

??????????? break;

??????? }

??? }

};

int main(int argc, char *argv[])

{

??? QCoreApplication a(argc, argv);

//??? std::string brokers = "localhost";

??? std::string brokers = "192.168.3.103";

??? std::string errstr;

//??? std::string topic_str="test";

??? std::string topic_str="demo";

??? int32_t partition = RdKafka::Topic::PARTITION_UA;

??? RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

??? RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

??? conf->set("bootstrap.servers", brokers, errstr);

??? ExampleEventCb ex_event_cb;

??? conf->set("event_cb", &ex_event_cb, errstr);

??? signal(SIGINT, sigterm);

??? signal(SIGTERM, sigterm);

??? ExampleDeliveryReportCb ex_dr_cb;

??? conf->set("dr_cb", &ex_dr_cb, errstr);

??? RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);

??? if (!producer) {

??????? std::cerr << "Failed to create producer: " << errstr << std::endl;

? ??????exit(1);

??? }

??? std::cout << "% Created producer " << producer->name() << std::endl;

??? RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);

??? if (!topic) {

??????? std::cerr << "Failed to create topic: " << errstr << std::endl;

??????? exit(1);

??? }

??? for (std::string line; run && std::getline(std::cin, line);) {

??????? if (line.empty()) {

??????????? producer->poll(0);

??????????? continue;

??????? }

??????? RdKafka::ErrorCode resp = producer->produce(topic, partition,

??????????????????????????????????????????????????? RdKafka::Producer::RK_MSG_COPY /* Copy payload */,

??????????????????????????????????????????????????? const_cast<char *>(line.c_str()), line.size(),

????????????????????????????????????? ??????????????NULL, NULL);

??????? if (resp != RdKafka::ERR_NO_ERROR)

??????????? std::cerr << "% Produce failed: " << RdKafka::err2str(resp) << std::endl;

??????? else

??????????? std::cerr << "% Produced message (" << line.size() << " bytes)" << std::endl;

??????? producer->poll(0);

??? }

??? run = true;

??? // 退出前处理完输出队列中的消息

??? while (run && producer->outq_len() > 0) {

??????? std::cerr << "Waiting for " << producer->outq_len() << std::endl;

??????? producer->poll(1000);

??? }

??? delete conf;

??? delete tconf;

??? delete topic;

??? delete producer;

??? RdKafka::wait_destroyed(5000);

??? return a.exec();

}

此时直接编译会报错,需做如下配置。

在.pro文件中添加引入librdkafka的库文件,

QMAKE_LFLAGS += -lrdkafka -lrdkafka++-lz -lpthread -lrt

#-lrdkafka等价于 LIBS += /usr/local/lib/librdkafka.so?

由于默认情况下librdkafka的头文件和库文件安装路径为:

/usr/local/include/librdkafka

/usr/local/lib??

此时需要在/etc/ld.so.conf文件中加入librdkafka.so所在的目录:/usr/local/lib/

方法一:

echo "/usr/local/lib" >> /etc/ld.so.conf
ldconfig

方法二:

vim /etc/ld.so.conf

ldconfig

然后在文件尾部添加librdkafka库的路径

编译出现如下错误

:-1: error: cannot find –lz

解决办法:

yum install zlib-devel

运行时错误日志:

LOG-3-FAIL: [thrd:DESKTOP-VC32RT7:9092/0]: DESKTOP-VC32RT7:9092/0: Failed to resolve 'DESKTOP-VC32RT7:9092': 未知的名称或服务 (after 10013ms in state CONNECT, 3 identical error(s) suppressed)

原因是默认情况下,kafka服务器不允许外部访问,打开./config/server.properties配置文件,更改如下:

  1. 把原文件中31行的注释去掉

listeners=PLAINTEXT://:9092

  1. 36行的注释去掉,把advertised.listeners改为服务器的IP地址

修改前:advertised.listeners=PLAINTEXT://your.host.name:9092

修改后:advertised.listeners=PLAINTEXT://192.168.3.103:9092

然后重新启动zookeeperkafka服务和消费者。在虚拟机上通过Qt编译运行生产者程序,可以在终端中输入消息内容,并回车,就可以在宿主即的消费者窗口打印输出消息了。

生产者发布消息

消费者订阅消息

  1. 消费者

上例中使用的消费者为librdkafka中自带的消费者客户端,本例将自己创建消费者程序,订阅来自客户端的消息。

创建基于控制台的Qt应用程序,键入源代码:

#include <QCoreApplication>

#include <iostream>

#include <string>

#include <cstdlib>

#include <cstdio>

#include <csignal>

#include <cstring>

#include <sys/time.h>

#include <getopt.h>

#include <unistd.h>

#include "rdkafkacpp.h"

static bool run = true;

static bool exit_eof = true;

static int eof_cnt = 0;

static int partition_cnt = 0;

static int verbosity = 1;

static long msg_cnt = 0;

static int64_t msg_bytes = 0;

static void sigterm (int sig) {

??? run = false;

}

class ExampleEventCb : public RdKafka::EventCb {

public:

??? void event_cb (RdKafka::Event &event) {

??????? switch (event.type())

??????? {

??????? case RdKafka::Event::EVENT_ERROR:

??????????? std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl;

??????????? if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)

??????????????? run = false;

??????????? break;

??????? case RdKafka::Event::EVENT_STATS:

??????????? std::cerr << "\"STATS\": " << event.str() << std::endl;

??????????? break;

??????? case RdKafka::Event::EVENT_LOG:

??????????? fprintf(stderr, "LOG-%i-%s: %s\n",event.severity(), event.fac().c_str(), event.str().c_str());

??????????? break;

??????? case RdKafka::Event::EVENT_THROTTLE:

??????????? std::cerr << "THROTTLED: " << event.throttle_time() << "ms by "<<event.broker_name() << " id " << (int)event.broker_id() << std::endl;

??????????? break;

??????? default:

??????????? std::cerr << "EVENT " << event.type() <<" (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl;

??????????? break;

??????? }

??? }

};

void msg_consume(RdKafka::Message* message, void* opaque) {

??? switch (message->err()) {

??? case RdKafka::ERR__TIMED_OUT:

??????? //std::cerr << "RdKafka::ERR__TIMED_OUT"<<std::endl;

??????? break;

??? case RdKafka::ERR_NO_ERROR:

??????? /* Real message */

??????? msg_cnt++;

??????? msg_bytes += message->len();

??????? if (verbosity >= 3)

??????????? std::cerr << "Read msg at offset " << message->offset() << std::endl;

??????? RdKafka::MessageTimestamp ts;

??????? ts = message->timestamp();

??????? if (verbosity >= 2 &&ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {

??????????? std::string tsname = "?";

??????????? if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME)

??????????????? tsname = "create time";

??????????? else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME)

??????????????? tsname = "log append time";

??????????? std::cout << "Timestamp: " << tsname << " " << ts.timestamp << std::endl;

??????? }

??????? if (verbosity >= 2 && message->key()) {

??????????? std::cout << "Key: " << *message->key() << std::endl;

??????? }

??????? if (verbosity >= 1) {

??????????? printf("%.*s\n",static_cast<int>(message->len()),static_cast<const char *>(message->payload()));

??????? }

??????? break;

??? case RdKafka::ERR__PARTITION_EOF:

??????? /* Last message */

??????? if (exit_eof && ++eof_cnt == partition_cnt) {

??????????? std::cerr << "%% EOF reached for all " << partition_cnt <<" partition(s)" << std::endl;

??????????? run = false;

??????? }

??????? break;

??? case RdKafka::ERR__UNKNOWN_TOPIC:

??????? break;

??? case RdKafka::ERR__UNKNOWN_PARTITION:

??????? std::cerr << "Consume failed: " << message->errstr() << std::endl;

??????? run = false;

? ??????break;

??? default:

??????? /* Errors */

??????? std::cerr << "Consume failed: " << message->errstr() << std::endl;

??????? run = false;

??????? break;

??? }

}

class ExampleConsumeCb : public RdKafka::ConsumeCb {

public:

??? void consume_cb (RdKafka::Message &msg, void *opaque) {

??????? msg_consume(&msg, opaque);

??? }

};

int main(int argc, char *argv[])

{

??? QCoreApplication a(argc, argv);

//??? std::string brokers = "localhost";

??? std::string brokers = "192.168.3.103";

??? std::string errstr;

//??? std::string topic_str="test";

??? std::string topic_str="demo";

??? std::vector<std::string> topics;

??? std::string group_id="101";

??? RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

??? RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

??? //group.id必须设置

??? if (conf->set("group.id", group_id, errstr) != RdKafka::Conf::CONF_OK) {

??????? std::cerr << errstr << std::endl;

??????? exit(1);

??? }

??? topics.push_back(topic_str);

??? //bootstrap.servers可以替换为metadata.broker.list

??? conf->set("bootstrap.servers", brokers, errstr);

??? ExampleConsumeCb ex_consume_cb;

??? conf->set("consume_cb", &ex_consume_cb, errstr);

??? ExampleEventCb ex_event_cb;

??? conf->set("event_cb", &ex_event_cb, errstr);

??? conf->set("default_topic_conf", tconf, errstr);

??? signal(SIGINT, sigterm);

??? signal(SIGTERM, sigterm);

??? RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);

??? if (!consumer) {

??????? std::cerr << "Failed to create consumer: " << errstr << std::endl;

??????? exit(1);

??? }

??? std::cout << "% Created consumer " << consumer->name() << std::endl;

??? RdKafka::ErrorCode err = consumer->subscribe(topics);

??? if (err) {

??????? std::cerr << "Failed to subscribe to " << topics.size() << " topics: " << RdKafka::err2str(err) << std::endl;

??????? exit(1);

??? }

??? while (run) {

??? //5000毫秒未订阅到消息,触发RdKafka::ERR__TIMED_OUT

??? RdKafka::Message *msg = consumer->consume(5000);

??? msg_consume(msg, NULL);

??? delete msg;

??? }

??? consumer->close();

??? delete conf;

??? delete tconf;

??? delete consumer;

??? std::cerr << "% Consumed " << msg_cnt << " messages ("<< msg_bytes << " bytes)" << std::endl;

??? //应用退出之前等待rdkafka清理资源

??? RdKafka::wait_destroyed(5000);

??? return a.exec();

}

.pro文件中添加librdkafka库路径,此处演示了另一种更常规的添加库路径的方式:

QT -= gui

CONFIG += c++11 console

CONFIG -= app_bundle

DEFINES += QT_DEPRECATED_WARNINGS

SOURCES += \

??????? main.cpp

# Default rules for deployment.

qnx: target.path = /tmp/$${TARGET}/bin

else: unix:!android: target.path = /opt/$${TARGET}/bin

!isEmpty(target.path): INSTALLS += target

INCLUDEPATH += /usr/local/include/librdkafka

LIBS += -L/usr/local/lib -lrdkafka

LIBS += -L/usr/local/lib -lrdkafka++

编译并运行消费者程序,

生产者发布消息:

消费者订阅消息:

参考来源:

  1. https://blog.csdn.net/caoshangpa/article/details/79786100?utm_medium=distribute.pc_relevant.none-task-blog-2~default~BlogCommendFromMachineLearnPai2~default-5.base&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~BlogCommendFromMachineLearnPai2~default-5.base
  2. https://www.cnblogs.com/alvingofast/p/kafka_deployment_on_windows.html

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-13 17:31:50  更:2021-07-13 17:33:56 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/6 5:33:16-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码