kafka介绍
简介:kafka是一个事件流平台,专门为分布式高吞吐量系统而设计的消息传递系统,相对其他消息系统有更好的吞吐量、内置分区、复制和固有的容错能留,使其更适合处理大规模的消息程序。
kafka的两种模式
- 点对点:消息被保留在队列中。一个或者多个消费者可以消耗队列中的消息,但是特定消息只能由最多一个消费者消费。消息被消费后将会从队列消失。

- 发布-订阅(pub-sub)消息系统:不同于点对点的是消息被保留在主题中,消费者可以订阅一个或者多个主题并使用该主题中的所有消息。这个和redis的发布订阅模式一样。

kafka的特点
- 可靠性-kafka是分布式、分区、复制和容错的。
- 可扩展性-kafka消息传递系统轻松缩放,无需停机。
- 耐用性-kafka使用‘分布式提交日志’,这意味着消息会尽可能快的保留在磁盘上,因此他是持久的
- 性能-kafka对于发布订阅消息都具有高吞吐量。即使存储了许多TB的消息,它也保持稳定的性能。保证零停机和零数据丢失。
kafka是一个分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力。在架构中起到解偶、削峰、异步处理的作用
kafka关键术语
1、消息的生产者叫Producer,消息的接受者叫Consumer,生产者将数据保存到kafka集群中,消费者从中获取消息进行处理 2、broker:英文含义中间人。在kafka中存储消息。 3、主题(topic):一个topic保存的同一类消息,生产者要发送消息必须制定topic。 4、分区(partition):每个topic都可以分成多个partition,每一个partition在存储层面都是append log文件。分区的根本原因是:kafka基于文件存储,当文件达到一定程度,磁盘空间告急,因此采用分区的办法,一个分区一个文件,这样就可以将数据存储到不同的server,另外这样做也可以负载均衡。 5、偏移量(Offset):一个分区对应一个磁盘上的文件,而消息在文件中的位置称为offset(便宜量),offset为一个long型数字,它可以唯一标记一条消息。由于kafka并没有提供其他的索引机制来存储offset,文件只能顺序的读写,所以在kafka中几乎不允许对消息进行‘随机读写’。
kafka分布式、分区和备份数据流图

安装部署kafka
系统:mac10.15.7 (19H2)
brew install kafka brew install zookeeper
修改 /usr/local/etc/kafka/server.properties, 找到 listeners=PLAINTEXT://:9092 那一行,把注释取消掉。 然后修改为:
listeners=PLAINTEXT://localhost:9092
以服务的方式启动
brew services start zookeeper brew services start kafka
数据传输的事务定义:
数据传输的事务定义有以下三种级别:
最多一次(at most once):消息不会被重复发送,最多被传输一次,但也有可能一次不传输。更精确的来说就是发出数据就可以,不关心broker的写入状态 最少一次(at least once):消息不会被漏发送,最少被传输一次,但也有可能被重复传输。 精确的一次(exactly once):不会漏传输也不会重复传输,每个消息都被传输一次而且仅仅被传输一次,这是大家期望的
站在producer分析三种语义
at most once意味着producer发送完一条消息后,不会确认消息是否成功。那么就存在丢失的可能。 at least once意味着producer发送完一条消息后,会议确认消息是否成功,如果producer没有收到broker的ack确认消息,那么将不断尝试重新发送。那就存在消息重复的可能性。 exactly once意味着producer的发送是幂等的。意味着消息无论发送多少遍,最终broker上的记录只有一天不重复的数据
producer at least once配置 kafka默认消息语义就是至少一次,意味着不用配置
{
'acks': 1,
}
可能会造成的问题:
producer发送数据给broker等待ack,partition的leader写入了数据回复了ack,
但是在isr动态队列中的follower同步数据的时候leader挂掉了,那么会造成数据丢失
producer at most once配置
{
'acks': 0
}
producer只管发送一次数据,不等待broker的ack状态回复。此时如果broker挂掉之后,就会造成数据丢失。
acks=0表示期望的broker的确认数。0:producer发完消息后不会等待任何broker确认;1表示会等待broker集群中的leader的确认写入消息;设置为all表示等待broker集群的leader和其所有的follower的确认写入消息 retires表示发送失败重新发送的次数。配置了retires后,如果没有将max_in_flight_requests_per_connection配置为1,有可能在造成乱序的结果。max_in_flight_requests_per_connection的配置代表着一个producer同时可以发送的未收到确认的消息数量。如果值大于1,那么可能发送了msg1后,在没有收到确认就发送了msg2,此时msg1失败后重新发送,而msg2发送成功,就造成了borker上消息的乱序。这个配置默认值为5
producer Exactly once配置
{
'acks': 'all'
}
producer发送数据的等待ack,isr动态队列中的follower同步leader的数据,
但是此时的isr队列只有partition-leader那么就会丢失数据;
producer发送数据等待ack,isr动态队列中的follower同步leader的数据,
但是在leader返回ack之前,leader挂了,那么producer会重新发送,导致数据重复
再让我们来看看kafka-python的producer的源码,我这里只分析配置:
DEFAULT_CONFIG = {
'bootstrap_servers': 'localhost',
'client_id': None,
'key_serializer': None,
'value_serializer': None,
'acks': 1,
'bootstrap_topics_filter': set(),
'compression_type': None,
'retries': 0,
'batch_size': 16384,
'linger_ms': 0,
'partitioner': DefaultPartitioner(),
'buffer_memory': 33554432,
'connections_max_idle_ms': 9 * 60 * 1000,
'max_block_ms': 60000,
'max_request_size': 1048576,
'metadata_max_age_ms': 300000,
'retry_backoff_ms': 100,
'request_timeout_ms': 30000,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
'sock_chunk_bytes': 4096,
'sock_chunk_buffer_count': 1000,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 1000,
'max_in_flight_requests_per_connection': 5,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
'ssl_cafile': None,
'ssl_certfile': None,
'ssl_keyfile': None,
'ssl_crlfile': None,
'ssl_password': None,
'ssl_ciphers': None,
'api_version': None,
'api_version_auto_timeout_ms': 2000,
'metric_reporters': [],
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
'selector': selectors.DefaultSelector,
'sasl_mechanism': None,
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None
}
kafka如何实现幂等发送 kafka实现幂等的关键就是要实现broker的去重。为了实现消息发送的幂等性,kafka引入了两个概念:
-
pid。每个新的producer在初始化的时候会被分配一个唯一的PID,这个PID对于用户是不可见的。 -
Sequence Number。对于每个PID,该Producer发送数据的每个<Topic,Partition>都对应一个从0开始单调递增的Sequence Number。而borker端会对<PID,Topic,Partition>做缓存,当具有相同主键的消息提交的时,broker只会持久化一条。
但是如果PID发生变化,同时不同的partition也具有不同的主键,那么幂等性无法保证跨分区跨会话的exactly once
也就是说幂等性的成立需要保证 单次会话同一个分区
站在Consumer分析三种语义 consumer at least once
意味着consumer对一条消息可能多次消费。下面的情况:consumer先读取消息,然后处理这条消息,最后提交offset。在处理消息时成功后,consumer宕机了,此时offset还未提交,下一次读取消息时依旧是这条消息,那么处理消息的逻辑又将被执行一遍,就是at least once消费。
- 配置enable_auto_commit=false。禁止后台自动提交offset
- 手动调用consumer.commit_async()来提交offset。手动保证了offset即使更新。
通过手动提交offset,就可以实现at least once语义
consumer at most once
意味着consumer对一条消息最多消费一次,因此存在消息消费失败依旧提交offset的情况。考虑下面的情况:consumer首先读取消息。然后提交offset,最后处理这条消息。在处理消息时,consumer宕机了,此时offset已提交,下次读取消息时就是下一条消息,这就是at most once。
- 配置enable_auto_commit=True。后台定时提交offset。
- auto_commit_interval_ms配置一个很小的数值。auto_commit_interval_ms表示后台提交offset的时间间隔
通过自动提交offset,并且将定时提交时间间隔设置的很小,就可以实现consumer at most once语义
exactly once
exactly once意味着消息的消费处理逻辑和offset的提交是原子性的,即消息消费成功后offset改变,消息消费失败offset也能回滚
- isolation.level=read_committed。此参数表示何种类型的message对consumer可见。
一个常见的exactly once的使用场景是:但我们订阅了一个topic,然后往另一个topic里写数据时,我们希望连个操作是原子性的,即如果写入消息失败了,那么我们希望读取消息的offset可以回滚。 这个特性可以通过kafka的transaction特性实现。kafka是在0.11版本之后开始提供事务特性的。我们可以将consumer读取数据和producer写入数据放在同一个事务中,在事务没有成功结束前,所有这个事务包含的消息都被标记为uncommitted。只有事务成功后,所有的消息才会被标记为committed。offset消息是以消息的方式存储在broker的__consumer_offsets topic中的。因此在事务开始后,consumer读取消息后,所有的offset消息都是uncommitted状态,所有的producer写入的消息也都是uncommitted状态。 consumer通过配置isolation.level来决定uncommitted状态的message是否对consumer可见。
isolation.level有两个可选值:read_committed–>表示所有事务未提交的数据都对consumer不可见,read_uncommitted相反
再来看看kafka的消息存储 server.properties配置文件   这里文件夹下存储这每个主题,例如我这设置的topic就是howtousekafka-0,里面的东西包括消息文件和索引文件。  其中的index文件记录offset的位置和消息的消息的消息的长度。log文件记录消息。
使用python操作kafka
pip install kafka-python
保证zookeeper和kafka启动的情况下:
最简单的生产者:
import json
import time
import datetime
from kafka import KafkaProducer
from config import config
producer = KafkaProducer(bootstrap_servers=config.SERVER,
value_serializer=lambda m: json.dumps(m).encode())
for i in range(100):
data = {'num': i, 'ts': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
producer.send(config.TOPIC, data)
time.sleep(1)
最简单的消费者:
from config import config
from kafka import KafkaConsumer
consumer = KafkaConsumer(config.TOPIC,
bootstrap_servers=config.SERVER,
group_id='test',
auto_offset_reset='earliest')
for msg in consumer:
print(msg.value)
|