IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> kafka消息队列 -> 正文阅读

[大数据]kafka消息队列

kafka介绍

简介:kafka是一个事件流平台,专门为分布式高吞吐量系统而设计的消息传递系统,相对其他消息系统有更好的吞吐量、内置分区、复制和固有的容错能留,使其更适合处理大规模的消息程序。

kafka的两种模式

  • 点对点:消息被保留在队列中。一个或者多个消费者可以消耗队列中的消息,但是特定消息只能由最多一个消费者消费。消息被消费后将会从队列消失。

在这里插入图片描述

  • 发布-订阅(pub-sub)消息系统:不同于点对点的是消息被保留在主题中,消费者可以订阅一个或者多个主题并使用该主题中的所有消息。这个和redis的发布订阅模式一样。
    在这里插入图片描述

kafka的特点

  • 可靠性-kafka是分布式、分区、复制和容错的。
  • 可扩展性-kafka消息传递系统轻松缩放,无需停机。
  • 耐用性-kafka使用‘分布式提交日志’,这意味着消息会尽可能快的保留在磁盘上,因此他是持久的
  • 性能-kafka对于发布订阅消息都具有高吞吐量。即使存储了许多TB的消息,它也保持稳定的性能。保证零停机和零数据丢失。
    kafka是一个分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力。在架构中起到解偶、削峰、异步处理的作用

kafka关键术语

1、消息的生产者叫Producer,消息的接受者叫Consumer,生产者将数据保存到kafka集群中,消费者从中获取消息进行处理
2、broker:英文含义中间人。在kafka中存储消息。
3、主题(topic):一个topic保存的同一类消息,生产者要发送消息必须制定topic。
4、分区(partition):每个topic都可以分成多个partition,每一个partition在存储层面都是append log文件。分区的根本原因是:kafka基于文件存储,当文件达到一定程度,磁盘空间告急,因此采用分区的办法,一个分区一个文件,这样就可以将数据存储到不同的server,另外这样做也可以负载均衡。
5、偏移量(Offset):一个分区对应一个磁盘上的文件,而消息在文件中的位置称为offset(便宜量),offset为一个long型数字,它可以唯一标记一条消息。由于kafka并没有提供其他的索引机制来存储offset,文件只能顺序的读写,所以在kafka中几乎不允许对消息进行‘随机读写’。

kafka分布式、分区和备份数据流图

在这里插入图片描述

安装部署kafka

系统:mac10.15.7 (19H2)

brew install kafka
brew install zookeeper

修改 /usr/local/etc/kafka/server.properties, 找到 listeners=PLAINTEXT://:9092 那一行,把注释取消掉。
然后修改为:

listeners=PLAINTEXT://localhost:9092

以服务的方式启动

brew services start zookeeper
brew services start kafka

数据传输的事务定义:

数据传输的事务定义有以下三种级别:

最多一次(at most once):消息不会被重复发送,最多被传输一次,但也有可能一次不传输。更精确的来说就是发出数据就可以,不关心broker的写入状态
最少一次(at least once):消息不会被漏发送,最少被传输一次,但也有可能被重复传输。
精确的一次(exactly once):不会漏传输也不会重复传输,每个消息都被传输一次而且仅仅被传输一次,这是大家期望的

站在producer分析三种语义

at most once意味着producer发送完一条消息后,不会确认消息是否成功。那么就存在丢失的可能。
at least once意味着producer发送完一条消息后,会议确认消息是否成功,如果producer没有收到broker的ack确认消息,那么将不断尝试重新发送。那就存在消息重复的可能性。
exactly once意味着producer的发送是幂等的。意味着消息无论发送多少遍,最终broker上的记录只有一天不重复的数据

producer at least once配置
kafka默认消息语义就是至少一次,意味着不用配置

{
		'acks': 1, 

}
可能会造成的问题:
producer发送数据给broker等待ack,partition的leader写入了数据回复了ack,
但是在isr动态队列中的follower同步数据的时候leader挂掉了,那么会造成数据丢失

producer at most once配置

{
	'acks': 0
	
}
producer只管发送一次数据,不等待broker的ack状态回复。此时如果broker挂掉之后,就会造成数据丢失。

acks=0表示期望的broker的确认数。0:producer发完消息后不会等待任何broker确认;1表示会等待broker集群中的leader的确认写入消息;设置为all表示等待broker集群的leader和其所有的follower的确认写入消息
retires表示发送失败重新发送的次数。配置了retires后,如果没有将max_in_flight_requests_per_connection配置为1,有可能在造成乱序的结果。max_in_flight_requests_per_connection的配置代表着一个producer同时可以发送的未收到确认的消息数量。如果值大于1,那么可能发送了msg1后,在没有收到确认就发送了msg2,此时msg1失败后重新发送,而msg2发送成功,就造成了borker上消息的乱序。这个配置默认值为5

producer Exactly once配置

{
	'acks': 'all'
	
}
producer发送数据的等待ack,isr动态队列中的follower同步leader的数据,
但是此时的isr队列只有partition-leader那么就会丢失数据;
producer发送数据等待ack,isr动态队列中的follower同步leader的数据,
但是在leader返回ack之前,leader挂了,那么producer会重新发送,导致数据重复

再让我们来看看kafka-python的producer的源码,我这里只分析配置:

DEFAULT_CONFIG = {
        'bootstrap_servers': 'localhost',  # kafka的serverip
        'client_id': None, # 客户端版本号,默认值为kafka-python-producer-#(唯一数字)
        'key_serializer': None, # 将发送的参数key序列化为bytes
        'value_serializer': None, # 将发送的参数value序列化为bytes
        'acks': 1, # 包含(0, 1, 'all')上面已经解释了
        'bootstrap_topics_filter': set(), # 对topic去重
        'compression_type': None, # 数据的压缩类型,支持'gzip', 'snappy', 'lz4', 'zstd' or None
        'retries': 0, # 重试次数
        'batch_size': 16384, # 发送的批量处理的大小
        'linger_ms': 0,
        'partitioner': DefaultPartitioner(),
        'buffer_memory': 33554432,
        'connections_max_idle_ms': 9 * 60 * 1000,
        'max_block_ms': 60000,
        'max_request_size': 1048576,
        'metadata_max_age_ms': 300000,
        'retry_backoff_ms': 100,
        'request_timeout_ms': 30000,
        'receive_buffer_bytes': None,
        'send_buffer_bytes': None,
        'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
        'sock_chunk_bytes': 4096,  # undocumented experimental option
        'sock_chunk_buffer_count': 1000,  # undocumented experimental option
        'reconnect_backoff_ms': 50,
        'reconnect_backoff_max_ms': 1000,
        'max_in_flight_requests_per_connection': 5,
        'security_protocol': 'PLAINTEXT',
        'ssl_context': None,
        'ssl_check_hostname': True,
        'ssl_cafile': None,
        'ssl_certfile': None,
        'ssl_keyfile': None,
        'ssl_crlfile': None,
        'ssl_password': None,
        'ssl_ciphers': None,
        'api_version': None,
        'api_version_auto_timeout_ms': 2000,
        'metric_reporters': [],
        'metrics_num_samples': 2,
        'metrics_sample_window_ms': 30000,
        'selector': selectors.DefaultSelector,
        'sasl_mechanism': None,
        'sasl_plain_username': None,
        'sasl_plain_password': None,
        'sasl_kerberos_service_name': 'kafka',
        'sasl_kerberos_domain_name': None,
        'sasl_oauth_token_provider': None
    }

kafka如何实现幂等发送
kafka实现幂等的关键就是要实现broker的去重。为了实现消息发送的幂等性,kafka引入了两个概念:

  • pid。每个新的producer在初始化的时候会被分配一个唯一的PID,这个PID对于用户是不可见的。

  • Sequence Number。对于每个PID,该Producer发送数据的每个<Topic,Partition>都对应一个从0开始单调递增的Sequence Number。而borker端会对<PID,Topic,Partition>做缓存,当具有相同主键的消息提交的时,broker只会持久化一条。

但是如果PID发生变化,同时不同的partition也具有不同的主键,那么幂等性无法保证跨分区跨会话的exactly once

也就是说幂等性的成立需要保证 单次会话同一个分区

站在Consumer分析三种语义
consumer at least once

意味着consumer对一条消息可能多次消费。下面的情况:consumer先读取消息,然后处理这条消息,最后提交offset。在处理消息时成功后,consumer宕机了,此时offset还未提交,下一次读取消息时依旧是这条消息,那么处理消息的逻辑又将被执行一遍,就是at least once消费。

  • 配置enable_auto_commit=false。禁止后台自动提交offset
  • 手动调用consumer.commit_async()来提交offset。手动保证了offset即使更新。

通过手动提交offset,就可以实现at least once语义

consumer at most once

意味着consumer对一条消息最多消费一次,因此存在消息消费失败依旧提交offset的情况。考虑下面的情况:consumer首先读取消息。然后提交offset,最后处理这条消息。在处理消息时,consumer宕机了,此时offset已提交,下次读取消息时就是下一条消息,这就是at most once。

  • 配置enable_auto_commit=True。后台定时提交offset。
  • auto_commit_interval_ms配置一个很小的数值。auto_commit_interval_ms表示后台提交offset的时间间隔

通过自动提交offset,并且将定时提交时间间隔设置的很小,就可以实现consumer at most once语义

exactly once

exactly once意味着消息的消费处理逻辑和offset的提交是原子性的,即消息消费成功后offset改变,消息消费失败offset也能回滚

  • isolation.level=read_committed。此参数表示何种类型的message对consumer可见。

一个常见的exactly once的使用场景是:但我们订阅了一个topic,然后往另一个topic里写数据时,我们希望连个操作是原子性的,即如果写入消息失败了,那么我们希望读取消息的offset可以回滚。
这个特性可以通过kafka的transaction特性实现。kafka是在0.11版本之后开始提供事务特性的。我们可以将consumer读取数据和producer写入数据放在同一个事务中,在事务没有成功结束前,所有这个事务包含的消息都被标记为uncommitted。只有事务成功后,所有的消息才会被标记为committed。offset消息是以消息的方式存储在broker的__consumer_offsets topic中的。因此在事务开始后,consumer读取消息后,所有的offset消息都是uncommitted状态,所有的producer写入的消息也都是uncommitted状态。
consumer通过配置isolation.level来决定uncommitted状态的message是否对consumer可见。

isolation.level有两个可选值:read_committed–>表示所有事务未提交的数据都对consumer不可见,read_uncommitted相反

再来看看kafka的消息存储
server.properties配置文件
在这里插入图片描述
在这里插入图片描述
这里文件夹下存储这每个主题,例如我这设置的topic就是howtousekafka-0,里面的东西包括消息文件和索引文件。
在这里插入图片描述
其中的index文件记录offset的位置和消息的消息的消息的长度。log文件记录消息。

使用python操作kafka

pip install kafka-python

保证zookeeper和kafka启动的情况下:

最简单的生产者:

import json
import time
import datetime

from kafka import KafkaProducer
from config import config



producer = KafkaProducer(bootstrap_servers=config.SERVER,
                         value_serializer=lambda m: json.dumps(m).encode())

for i in range(100):
    data = {'num': i, 'ts': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
    producer.send(config.TOPIC, data)
    time.sleep(1)

最简单的消费者:

from config import config
from kafka import KafkaConsumer

consumer = KafkaConsumer(config.TOPIC,
                         bootstrap_servers=config.SERVER,
                         group_id='test',
                         auto_offset_reset='earliest')
for msg in consumer:
    print(msg.value)
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-07 11:46:23  更:2021-07-07 11:47:48 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 21:52:35-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码