IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kafk安装、应用及系统架构 -> 正文阅读

[大数据]Kafk安装、应用及系统架构

一、环境搭建

1.1 安装包及环境准备

zookeeper:3.7.0  (apache-zookeeper-3.7.0-bin.tar.gz)
kafka:2.8.0  (kafka_2.13-2.8.0.tgz)
jdk:jdk-8u301  (jdk-8u301-linux-x64.tar.gz)

zookeeper、kafka均采用集群部署,主机采用centos7.9,环境信息如下:

主机名ip应用部署
node1192.168.206.201zookeeper、kafka
node1192.168.206.202zookeeper、kafka
node1192.168.206.203zookeeper、kafka

hosts文件配置如下:

$ vi /etc/hosts

192.168.206.201 node1
192.168.206.202 node2
192.168.206.203 node3

1.2 JDK安装

服务器上解压安装包,配置环境变量,不赘述,可参照如下:(3台服务器均需操作)

$ tar -xzvf jdk-8u301-linux-x64.tar.gz -C /usr/local/

$ vi .bash_profile
  PATH=$PATH:$HOME/bin
  export PATH
  export JAVA_HOME=/usr/local/jdk1.8.0_301
  export PATH=$JAVA_HOME/bin:$PATH

$ source .bash_profile

测试:

$ java -version

输入如下:
java version "1.8.0_301"
Java(TM) SE Runtime Environment (build 1.8.0_301-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.301-b09, mixed mode)

1.3 Zookeeper安装

服务器上解压并安装

$ tar -xzvf apache-zookeeper-3.7.0-bin.tar.gz

$ cd apache-zookeeper-3.7.0-bin/conf/
$ vi zoo.cfg
  tickTime=2000
  dataDir=/data/zookeeper
  clientPort=2181
  initLimit=5
  syncLimit=2
  server.1=node1:2888:3888
  server.2=node2:2888:3888
  server.3=node3:2888:3888

$ mkdir -p /data/zookeeper
$ vi /data/zookeeper/myid
  1
  ##此文件内写入zoo.cfg文件中配置的人最后三行server.序号=IP:2888:3888 对应的序号,如node1则为1 node2则为2

依次在三台服务器上配置,注意上述的myid文件中的序号按照zoo.cfg中的序号配置。
启动zk:

$ bin/zkServer.sh start

启动成功:
ZooKeeper JMX enabled by default
Using config: /root/apache-zookeeper-3.7.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

客户端测试:

$ bin/zkCli.sh -server 127.0.0.1:2181

进入zk交互终端,执行测试命令
[zk: 127.0.0.1:2181(CONNECTED) 0] ls /
[zookeeper]

1.4 Kafka安装

服务器解压并安装:

$ tar -xzvf kafka_2.13-2.8.0.tgz
$ cd kafka_2.13-2.8.0
$ vi config/server.properties

broker.id=1     #每台服务器唯一且不冲突,用于区分不同的broker
log.dirs=/data/kafka #修改目录,此目录为kafka日志数据文件的存储路径
zookeeper.connect=node1:2181,node2:2181,node3:2181/kafka    

启动服务:

$ bin/kafka-server-start.sh config/server.properties

测试:

查看zookeeper集群数据:
ls /kafka/brokers/ids
输出结果:
[1, 2, 3]

二、基本命令使用

2.1 创建Topic

$ bin/kafka-topics.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --create  --topic hello-topic --partitions 3 --replication-factor 3

–topic hello-topic:topic主题名称(例:hello-topic为topic名称)
–partitions 3:设置分区数量(例:该topic有3个分区)
–replication-factor 3:副本因子(例: 该topic下每个分区有3个副本备份)

2.2 列出所有Topic

$ bin/kafka-topics.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --list
将会显示所有的topic列表

2.3 获取某个Topic的详细信息

$ bin/kafka-topics.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --describe --topic hello-topic
输出结果如下:(将会展示分区信息,分区主节点信息,副本节点信息,ISR节点信息)
Topic: hello-topic	TopicId: uX53uX8dSDW-qP-0KKreDg	PartitionCount: 3	ReplicationFactor: 3	Configs: segment.bytes=1073741824
Topic: hello-topic	Partition: 0	Leader: 1	Replicas: 1,3,2	Isr: 1,3,2
Topic: hello-topic	Partition: 1	Leader: 2	Replicas: 2,1,3	Isr: 2,1,3
Topic: hello-topic	Partition: 2	Leader: 3	Replicas: 3,2,1	Isr: 3,2,1

说明
Partition: 0 此处的0代表分区编号
Leader: 1 此分区的主节点信息
Replicas: 1,3,2 此分区的副本节点信息
Isr: 1,3,2 当前可用的ISR节点信息
可参照此文章进一步了解ISR机制

2.4 发送消息

$ bin/kafka-console-producer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic hello-topic
进入交互终端可在线发送消息到topic
>This is my first event
>This is my second event
>This is my third event
>

2.5 消费消息

$ bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic hello-topic --from-beginning
将会实时输出消息结果:
This is my first event
This is my third event
This is my second event

说明
以下选项可以定义消息的消费开始位置:
–from-beginning:如果使用者尚未建立要使用的偏移量,则从日志中出现的最早消息开始
–offset :要从中使用的偏移量id(非负数的数字),或字符“earliest”表示从开始,或“latest”表示从结束(默认值:最新。如果使用offset偏移量来精确指定消息位置,则需要指定–partition选项,明确指定分区信息
例如:
$ bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic hello-topic --offset 11 --partition 2
–group:可指定group消费分组

2.6 获取消费者Group列表

$  bin/kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --all-groups --list

2.7 获取消费者Group详细信息

$  bin/kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --group hello-group --describe
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                 HOST             CLIENT-ID
hello-group     hello-topic     0          14              14              0               consumer-hello-group-1-cc2ea682-4bb7-41a0-b87c-5970522de997 /192.168.206.201 consumer-hello-group-1
hello-group     hello-topic     1          19              19              0               consumer-hello-group-1-cc2ea682-4bb7-41a0-b87c-5970522de997 /192.168.206.201 consumer-hello-group-1
hello-group     hello-topic     2          24              24              0               consumer-hello-group-1-cc2ea682-4bb7-41a0-b87c-5970522de997 /192.168.206.201 consumer-hello-group-1

*说明*
CURRENT-OFFSET:该group对当前分区的消费偏移量
LOG-END-OFFSET:当前分区的偏移量,即已提交的最大消息偏移量(CURRENT-OFFSET <= LOG-END-OFFSET)

2.8 查看当前topic的消息总数

$ ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list node1:9092,node3:9092,node2:9092 --topic hello-topic  --time -1
输出结果如下:
hello-topic:0:20
hello-topic:1:27
hello-topic:2:29

说明
通常采用 kafka-consumer-groups.sh命令查看LOG-END-OFFSET即可,如若没有消费者的情况下可使用此命令

三、SDK使用

3.1 工程Maven依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

3.2 生产者Demo

package com.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Properties;

public class ProducerDemo {
    private static final Logger LOGGER = LoggerFactory.getLogger(ProducerDemo.class);

    public static void main(String[] args) throws IOException {
        String topic = "hello-topic";
        String bootstrapServer = "node1:9092,node2:9092,node3:9092";

        Properties properties = new Properties();
        //broker连接信息
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        //消息Key序列化器
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //消息Value序列化器
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //Ack响应 0:不等待broker的ack响应,即本地开始socket即返回,1:broker主节点写入成功即返回,不保证副本成功,-1:broker主节点及ISR集合内至少有1个副本写入成功才返回(分布式环境建议使用此模式)
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 100000; i++) {
            for (int j = 0; j < 3; j++) {
                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, j, j + "", j + "-" + i + "-msg");
                producer.send(record, (metadata, exception) -> {
                    int partition = metadata.partition();
                    long offset = metadata.offset();
                    LOGGER.debug("partition:{},offset:{},key:{},value:{}!", partition, offset, record.key(), record.value());
                    if (exception != null) {
                        LOGGER.error("error!", exception);
                    }
                });
            }
        }
        System.in.read();
    }
}

Demo模拟了key、value均为String形式的消息发送,并采用明确指定分区的方式连续发送数据。

参数中:需要关注ack的参数设置
0:不等待broker的ack响应,即本地开始socket即返回
1:broker主节点写入成功即返回,不保证副本成功
-1:broker主节点及ISR集合内至少有1个副本写入成功才返回(分布式环境建议使用此模式)

3.3 消费者Demo

package com.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.*;

public class ConsumerDemo {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerDemo.class);

    public static void main(String[] args) throws IOException {
        String topic = "hello-topic";
        String bootstrapServer = "node1:9092,node2:9092,node3:9092";
        String groupId = "consumer-java-group";

        Properties properties = new Properties();
        //broker连接信息
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        //消息Key反序列化器
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //消息Value反序列化器
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //消费组ID
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //设置自动提交为true,则按照AUTO_COMMIT_INTERVAL_MS_CONFIG参数,进行消费offset的自动提交
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE.toString());
        //自动提交消费offset的时间间隔
        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
        //该group消息消费的offset设置
        //earliest:如果Kafka中没有初始偏移量,或者如果服务器上不再存在当前偏移量(例如,因为该数据已被删除),自动将偏移量重置为最早偏移量
        //latest:自动将偏移量重置为最新偏移量
        //none:如果没有,则向使用者抛出异常为消费者组找到上一个偏移量
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        //可监听分区信息的变化
        consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                partitions.forEach(e -> LOGGER.info("onPartitionsRevoked -> topic:{},partition:{}", e.topic(), e.partition()));
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                partitions.forEach(e -> LOGGER.info("onPartitionsAssigned -> topic:{},partition:{}", e.topic(), e.partition()));
            }
        });

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000L));
            //按照分区读取消息
            records.forEach(r -> {
                LOGGER.debug("partition:{},offset:{},key:{},value:{}", r.partition(), r.offset(), r.key(), r.value());
            });
        }
    }
}

关于offset
Consumer的offset可自行编码采用第三方维护,如数据库、redis等,在消费开始时,根据自己维护的信息,使用Consumer的seek(TopicPartition partition, long offset)函数进行重置,开始消费。在没有特殊业务需求的情况下,则使用kafka的自动保存机制即可,其原理是kafka中维护了__consumer_offsets的topic,用来保存consume的不同group的offset,默认5秒提交一次。当然也可以使用非自动提交机制,显式的调用提交函数,commitAsync()或commitSync()。
关于offset可进一步参考文章

四、系统架构

//todo
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-06 11:13:37  更:2021-09-06 11:14:08 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 13:50:03-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码