IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 认识与简单使用kafka -> 正文阅读

[大数据]认识与简单使用kafka

kafka基本概念

kafka消息被消费了不会删除 会一直保存在存储消息的文件中 配置文件中有参数可以设置 这个文件多久删除一次

broker(读音 哺乳可 中文意思经纪人、代理人 在kafka中可以理解为 kafka的服务端 就是运行kafka作为消息队列逻辑的代码)

topic(读音 套白可 中文意思主题、题目、总论 在kafka中可以理解为消息存储的最大单位)

producer(读音 破丢色 中文意思制作人、制片人、生产者、发生器 在kafka中理解为消息的生产者)

consumer(读音 看搜磨 中文意思消费者、用户、顾客 在kafka中理解为 消息的消费者)

consumer group(读音 看搜磨 谷如破 中文意思消费者组 在kafka的世界中 它把每个consumer全都放在一个小组中 你如果没设置这个小组它默认也是有一个小组的)

partition(读音 破提神 中文意思划分、分开、分割等 在kafka中消息存储的最小单位 topic中包含partition partition代表一个文件 存储消息的文件)

kafka安装与使用

kafka是scala语言开发 这个语言跟java一样也要运行在jvm中 所以我们要安装jdk 安装jdk百度有一大堆方法这个不是主线

然后需要安装zookeeper 安装zookeeper之前写过

安装kafka

wget https://mirror.bit.edu.cn/apache/kafka/2.4.1/kafka_2.11-2.4.1.tgz # 2.11是scala的版本,2.4.1是kafka的版本
tar -xzf kafka_2.11-2.4.1.tgz
cd kafka_2.11-2.4.1

修改kafka配置文件config/server.properties

#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口号 这个用来让生产者或消费者访问的地址
listeners=PLAINTEXT://192.168.65.60:9092
#kafka的消息存储文件
log.dir=/usr/local/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=192.168.65.60:2181

启动服务

启动脚本语法:kafka-server-start.sh [-daemon] server.properties
可以看到,server.properties的配置路径是一个强制的参数,-daemon表示以后台进程运行,否则ssh客户端退出后,就会停止服务。(注意,在启动kafka时会使用linux主机名关联的ip地址,所以需要把主机名和linux的ip映射配置到本地host里,用vim /etc/hosts)

# 启动kafka,运行日志在logs目录的server.log文件里
bin/kafka-server-start.sh -daemon config/server.properties   #后台启动,不会打印日志到控制台
或者用
bin/kafka-server-start.sh config/server.properties &

# 我们进入zookeeper目录通过zookeeper客户端查看下zookeeper的目录树
bin/zkCli.sh 
ls /		#查看zk的根目录kafka相关节点
ls /brokers/ids	#查看kafka节点

# 停止kafka
bin/kafka-server-stop.sh

创建主题

bin/kafka-topics.sh --create --zookeeper 192.168.65.60:2181 --replication-factor 1 --partitions 1 --topic test

查看主题

bin/kafka-topics.sh --list --zookeeper 192.168.65.60:2181

删除主题

bin/kafka-topics.sh --delete --topic test --zookeeper 192.168.65.60:2181

发送消息

bin/kafka-console-producer.sh --broker-list 192.168.65.60:9092 --topic test

执行完上面的命令会出现>这个 然后在这后面写的内容就是消息 写完了 按回车就可以发送

消费消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --topic test

bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --from-beginning --topic test #这个命令的意思是从test的第一条消息开始消费

消费多个主题

bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --whitelist “test|test-2”

单播消费
一条消息只能让同一个组中的一个消费者消费 消费者组会记一个当前主题的消息消费到哪了的偏移量 然后消费者消费消息时会找消费者组要消息的偏移量 然后根据这个偏移量去消费消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --consumer-property group.id=testGroup --topic test

多播消费
根据单播消费可以得出 不同的组可以消费同一条消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --consumer-property group.id=testGroup-2 --topic test

查看消费者组名

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.65.60:9092 --list

查看消费者组的消费偏移量

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.65.60:9092 --describe --group testGroup

current-offset:当前消费组的已消费偏移量
log-end-offset:主题对应分区消息的结束偏移量(HW)
lag:当前消费组未消费的消息数

topic跟partition

topic是某个小区的保安队队长 都能当队长了肯定人数大于1 假设队长的工作是管理队员其它的都不管 partition就是队员了 每个队长都有一个或者多个队员 在kafka中的topic包含partition partition跟队员一样 负责干活 干活的流程 老板找了保安的队长跟队长说接下来干什么什么 然后队长回来之后跟队员说 然后队员去干活 kafka中 生产者发消息这个消息就会找topic然后去找partition 然后把消息存到磁盘 partition可以跟磁盘看到一个东西 partition是消息在内存中的叫法 这个磁盘文件在哪根据配置文件中log.dir=/usr/local/data/kafka-logs这个属性设置的 可以有多个partition也就是说会有多个保存消息的文件 如果有多台kafka机器 这个多个文件会在不同的机器上 多台kafka机器只能有一台机器对这个文件进行修改 partition称为分区

创建多个分区的主题
bin/kafka-topics.sh --create --zookeeper 192.168.65.60:2181 --replication-factor 3 --partitions 2 --topic test1

–replication-factor 3 // 表示分区之后的文件再备份3分

–partitions 2 // 表示给主题设置两个分区

查看topic情况

bin/kafka-topics.sh --describe --zookeeper 192.168.65.60:2181 --topic test1

leader节点负责给定partition的所有读写请求。
replicas 表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。
isr 是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。

给主题扩容

bin/kafka-topics.sh -alter --partitions 3 --zookeeper 192.168.65.60:2181 --topic test

kafka集群
机器1用上面写的哪个server.properties配置文件
机器2
#broker.id属性在kafka集群中必须要是唯一
broker.id=1
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.65.60:9093
log.dir=/usr/local/data/kafka-logs-1
#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=192.168.65.60:2181

机器3
broker.id=2
listeners=PLAINTEXT://192.168.65.60:9094
log.dir=/usr/local/data/kafka-logs-2
zookeeper.connect=192.168.65.60:2181

启动2跟3机器
bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties

kafka做集群只要zookeeper连接地址相同 它们就可以自行的互相发现 kafka集群是一个无状态的链接 topic会有状态 去zookeeper可以看到kafka配置的一些信息brokers这个节点可以看到集群的状态等 kafka没有单机这一说 kafka生来就是集群状态 kafka一台机器也可以说是一个集群

创建新的topic

bin/kafka-topics.sh --create --zookeeper 192.168.65.60:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic

查看topic状态

bin/kafka-topics.sh --describe --zookeeper 192.168.65.60:2181 --topic my-replicated-topic

发送消息

bin/kafka-console-producer.sh --broker-list 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094 --topic my-replicated-topic

消费消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094 --from-beginning --topic my-replicated-topic

停止机器1

ps -ef | grep server.properties

kill 进程号

再次查看topic状态

bin/kafka-topics.sh --describe --zookeeper 192.168.65.60:9092 --topic my-replicated-topic

发送消费机制

Producers
生产者将消息发送到topic中去,同时负责选择将message发送到topic的哪一个partition中。通过round-robin做简单的负载均衡。也可以根据消息中的某一个关键字来进行区分。通常第二种方式使用的更多。

Consumers
消费者只有两种消息传递模式 队列模式 跟 发布订阅模式

队列模式就是同一个消息只能被消费者组中的一个消费者消费 这样的一个东西

发布订阅模式就是同一个消息可以被多个消费者组进行消费

顺序消费

topic创建一个分片就可以实现顺序消费

java客户端

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.4.1</version>
</dependency>

kafka-clients版本根kafka的版本一样就可以

消息发送者代码

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class MsgProducer {
    private final static String TOPIC_NAME = "my-replicated-topic";

public static void main(String[] args) throws InterruptedException, ExecutionException {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");
     /*
     发出消息持久化机制参数
    (1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。
    (2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一
         条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
    (3)acks=-1或all: 需要等待 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略会保证
        只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。
     */
    /*props.put(ProducerConfig.ACKS_CONFIG, "1");
     *//*
    发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在
    接收者那边做好消息接收的幂等性处理
    *//*
    props.put(ProducerConfig.RETRIES_CONFIG, 3);
    //重试间隔设置
    props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
    //设置发送消息的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    *//*
    kafka本地线程会从缓冲区取数据,批量发送到broker,
    设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去
    *//*
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    *//*
    默认值是0,意思就是消息必须立即被发送,但这样会影响性能
    一般设置10毫秒左右,就是说这个消息发送完后会进入本地的一个batch,如果10毫秒内,这个batch满了16kb就会随batch一起被发送出去
    如果10毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长
    *//*
    props.put(ProducerConfig.LINGER_MS_CONFIG, 10);*/
    //把发送的key从字符串序列化为字节数组
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    //把发送消息value从字符串序列化为字节数组
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<String, String> producer = new KafkaProducer<String, String>(props);

    int msgNum = 5;
    final CountDownLatch countDownLatch = new CountDownLatch(msgNum);
    for (int i = 1; i <= msgNum; i++) {
        Order order = new Order(i, 100 + i, 1, 1000.00);
        //指定发送分区
        /*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
                , 0, order.getOrderId().toString(), JSON.toJSONString(order));*/
        //未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
                , order.getOrderId().toString(), JSON.toJSONString(order));

        //等待消息发送成功的同步阻塞方法
        /*RecordMetadata metadata = producer.send(producerRecord).get();
        System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
                + metadata.partition() + "|offset-" + metadata.offset());*/

        //异步回调方式发送消息
        producer.send(producerRecord, new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    System.err.println("发送消息失败:" + exception.getStackTrace());

                }
                if (metadata != null) {
                    System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
                            + metadata.partition() + "|offset-" + metadata.offset());
                }
                countDownLatch.countDown();
            }
        });

        //送积分 TODO

    }

    countDownLatch.await(5, TimeUnit.SECONDS);
    producer.close();
}
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-12 16:40:01  更:2021-08-12 16:40:59 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 21:08:26-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码