IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kafka安装以及验证 -> 正文阅读

[大数据]Kafka安装以及验证

1 Kafka安装

1.1 下载安装

到官网http://kafka.apache.org/downloads.html下载想要的版本

注:由于Kafka控制台脚本对于基于UnixWindows的平台是不同的,因此在Windows平台上使用bin\windows\ 而不是bin/ 将脚本扩展名更改为.bat。

[root@along ~]# wget http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
[root@along ~]# tar -C /data/ -xvf kafka_2.11-2.1.0.tgz
[root@along ~]# cd /data/kafka_2.11-2.1.0/

1.2 配置启动zookeeper

kafka正常运行,必须配置zookeeper,否则无论是kafka集群还是客户端的生存者和消费者都无法正常的工作的;所以需要配置启动zookeeper服务。
点击了解zookeeper安装步骤

1.3 配置kafka

1.3.1 修改配置文件

[root@along kafka_2.11-2.1.0]# grep "^[^#]" config/server.properties
broker.id=0  
listeners=PLAINTEXT://localhost:9092  
num.network.threads=3  
num.io.threads=8  
socket.send.buffer.bytes=102400  
socket.receive.buffer.bytes=102400  
socket.request.max.bytes=104857600  
log.dirs=/tmp/kafka-logs
num.partitions=1  
num.recovery.threads.per.data.dir=1  
offsets.topic.replication.factor=1  
transaction.state.log.replication.factor=1  
transaction.state.log.min.isr=1  
log.retention.hours=168  
log.segment.bytes=1073741824  
log.retention.check.interval.ms=300000  
zookeeper.connect=localhost:2181  
zookeeper.connection.timeout.ms=6000  
group.initial.rebalance.delay.ms=0

注:可根据自己需求修改配置文件

broker.id:#唯一标识ID
listeners=PLAINTEXT://localhost:9092:#kafka服务监听地址和端口
log.dirs:#日志存储目录
zookeeper.connect:#指定zookeeper服务

1.3.2 配置环境变量

[root@along ~]# vim /etc/profile.d/kafka.sh  
export KAFKA_HOME="/data/kafka_2.11-2.1.0"  
export PATH="${KAFKA_HOME}/bin:$PATH"  
[root@along ~]# source /etc/profile.d/kafka.sh

1.3.3 配置服务启动脚本

[root@along ~]# vim /etc/init.d/kafka
#!/bin/sh
#
# chkconfig: 345 99 01
# description: Kafka
#
# File : Kafka
#
# Description: Starts and stops the Kafka server
#
   
source /etc/rc.d/init.d/functions  
   
KAFKA_HOME=/data/kafka_2.11-2.1.0
KAFKA_USER=root
export LOG_DIR=/tmp/kafka-logs
   
[ -e /etc/sysconfig/kafka ] && . /etc/sysconfig/kafka
   
# See how we were called.
case "$1" in  
   
  start)
    echo -n "Starting Kafka:"  
    /sbin/runuser -s /bin/sh $KAFKA_USER -c "nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_DIR/server.out 2> $LOG_DIR/server.err &"  
    echo " done."  
    exit 0
    ;;
   
  stop)
    echo -n "Stopping Kafka: "  
    /sbin/runuser -s /bin/sh $KAFKA_USER  -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill \-9"  
    echo " done."  
    exit 0
    ;;
  hardstop)
    echo -n "Stopping (hard) Kafka: "  
    /sbin/runuser -s /bin/sh $KAFKA_USER  -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill -9"  
    echo " done."  
    exit 0
    ;;
   
  status)
    c_pid=`ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}'`
    if [ "$c_pid" = "" ] ; then  
      echo "Stopped"  
      exit 3
    else  
      echo "Running $c_pid"  
      exit 0
    fi  
    ;;
   
  restart)
    stop
    start
    ;;
   
  *)
    echo "Usage: kafka {start|stop|hardstop|status|restart}"  
    exit 1
    ;;
   
esac

1.3.4 启动kafka服务

后台启动zookeeper服务

[root@along ~]# nohup zookeeper-server-start.sh /data/kafka_2.11-2.1.0/config/zookeeper.properties &

启动kafka服务

[root@along ~]# service kafka start  
Starting kafka (via systemctl): [ OK ]  
[root@along ~]# service kafka status  
Running 86018  
[root@along ~]# ss -nutl  
Netid State      Recv-Q Send-Q     Local Address:Port                    Peer Address:Port                                
tcp   LISTEN     0      50                    :::9092                              :::*
tcp   LISTEN     0      50                    :::2181                              :::*

1.4 kafka使用简单入门

1.4.1 创建主题topics

创建一个名为along的主题,它只包含一个分区,只有一个副本:

[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic along
Created topic "along".

如果我们运行list topic命令,我们现在可以看到该主题:

[root@along ~]# kafka-topics.sh --list --zookeeper localhost:2181  
along

1.4.2 发送一些消息

Kafka附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。默认情况下,每行将作为单独的消息发送。

运行生产者,然后在控制台中键入一些消息以发送到服务器。

[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic along
>This is a message
>This is another message

1.4.3 启动消费者

Kafka还有一个命令行使用者,它会将消息转储到标准输出。

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic along --from-beginning
This is a message
This is another message

所有命令行工具都有其他选项; 运行不带参数的命令将显示更详细地记录它们的使用信息。

1.5 设置多代理kafka群集

到目前为止,我们一直在与一个broker运行,但这并不好玩。对于Kafka,单个代理只是一个大小为1的集群,因此除了启动一些代理实例之外没有太多变化。但是为了感受它,让我们将我们的集群扩展到三个节点(仍然在我们的本地机器上)。

1.5.1 准备配置文件

[root@along kafka_2.11-2.1.0]# cd /data/kafka_2.11-2.1.0/
[root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-1.properties
[root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-2.properties
[root@along kafka_2.11-2.1.0]# vim config/server-1.properties
    broker.id=1  
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1  
[root@along kafka_2.11-2.1.0]# vim config/server-2.properties
    broker.id=2  
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2

注:该broker.id 属性是群集中每个节点的唯一且永久的名称。我们必须覆盖端口和日志目录,因为我们在同一台机器上运行这些,并且我们希望让所有代理尝试在同一端口上注册或覆盖彼此的数据。

1.5.2 开启集群另2个kafka服务

[root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-1.properties &  
[root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-2.properties &  
[root@along ~]# ss -nutl  
Netid State      Recv-Q Send-Q     Local Address:Port                    Peer Address:Port                            
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9092                              :::*
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9093                              :::*
tcp   LISTEN     0      50      ::ffff:127.0.0.1:9094                              :::*

1.5.3 在集群中进行操作

现在创建一个复制因子为3的新主题my-replicated-topic

[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".

在一个集群中,运行describe topics命令查看哪个broker正在做什么

[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
    Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1

#注释:第一行给出了所有分区的摘要,每个附加行提供有关一个分区的信息。由于我们只有一个分区用于此主题,因此只有一行。
#“leader”是负责给定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的领导者。
#“replicas”是复制此分区日志的节点列表,无论它们是否为领导者,或者即使它们当前处于活动状态。
#“isr”是“同步”复制品的集合。这是副本列表的子集,该列表当前处于活跃状态并且已经被领导者捕获。
#请注意,Leader: 2,在我的示例中,节点2 是该主题的唯一分区的Leader。

可以在我们创建的原始主题上运行相同的命令,以查看它的位置

[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic along  
Topic:along PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: along    Partition: 0 Leader: 0 Replicas: 0 Isr: 0

向我们的新主题发布一些消息:

[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>my test message 1
>my test message 2

现在让我们使用这些消息:

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2

1.5.4 测试集群的容错性

现在让我们测试一下容错性。Broker 2 充当leader 所以让我们杀了它:

[root@along ~]# ps aux | grep server-2.properties |awk '{print $2}'
106737  
[root@along ~]# kill -9 106737
[root@along ~]# ss -nutl
tcp LISTEN 0      50      ::ffff:127.0.0.1:9092                              :::*                         
tcp LISTEN 0      50      ::ffff:127.0.0.1:9093                              :::*

leader 已切换到其中一个从属节点,节点2不再位于同步副本集中:

[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic  
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 0,1

即使最初接受写入的leader 已经失败,这些消息仍可供消费:

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2

1.6 使用Kafka Connect导入/导出数据

从控制台写入数据并将其写回控制台是一个方便的起点,但有时候可能希望使用其他来源的数据或将数据从Kafka导出到其他系统。对于许多系统,您可以使用Kafka Connect导入或导出数据,而不是编写自定义集成代码。

Kafka ConnectKafka附带的工具,用于向Kafka导入和导出数据。它是一个可扩展的工具,运行连接器,实现与外部系统交互的自定义逻辑。我们将了解如何使用简单的连接器运行Kafka Connect,这些连接器将数据从文件导入Kafka主题并将数据从Kafka主题导出到文件。

首先创建一些种子数据进行测试:

[root@along ~]# echo -e "foo\nbar" > test.txt
或者在Windows上:
> echo foo> test.txt
> echo bar>> test.txt

接下来,启动两个以独立模式运行的连接器,这意味着它们在单个本地专用进程中运行。提供三个配置文件作为参数。

第一个始终是Kafka Connect流程的配置,包含常见配置,例如要连接的Kafka代理和数据的序列化格式。

其余配置文件均指定要创建的连接器。这些文件包括唯一的连接器名称,要实例化的连接器类以及连接器所需的任何其他配置。

[root@along ~]# connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
[2019-01-16 16:16:31,884] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:67)
[2019-01-16 16:16:31,903] INFO WorkerInfo values:  
... ...

#注:Kafka附带的这些示例配置文件使用您之前启动的默认本地群集配置并创建两个连接器:第一个是源连接器,它从输入文件读取行并生成每个Kafka主题,第二个是宿连接器从Kafka主题读取消息并将每个消息生成为输出文件中的一行。

验是否导入成功(另起终端)
在启动过程中,您将看到许多日志消息,包括一些指示正在实例化连接器的日志消息。

  • 一旦Kafka Connect进程启动,源连接器应该开始从test.txt主题读取行并将其生成到主题connect-test,并且接收器连接器应该开始从主题读取消息connect-test 并将它们写入文件test.sink.txt。我们可以通过检查输出文件的内容来验证数据是否已通过整个管道传递:
[root@along ~]# cat test.sink.txt  
foo  
bar

请注意,数据存储在Kafka主题中connect-test,因此我们还可以运行控制台使用者来查看主题中的数据(或使用自定义使用者代码来处理它):

[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

继续追加数据,验证

[root@along ~]# echo Another line>> test.txt
[root@along ~]# cat test.sink.txt
foo
bar
Another line
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"Another line"
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-24 09:31:36  更:2022-04-24 09:32:21 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 12:57:27-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码