一、kafka介绍
kafka官网地址:http://kafka.apache.org/
kafka官方文档地址:http://kafka.apache.org/documentation/
1. 定义
Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
2. 消息队列
1.解耦(类似Spring的IOC) 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 2.可恢复性 系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 3.缓冲 有助于控制和优化数据流经过系统的速度, 解决生产消息和消费消息的处理速度不一致的情况。 4.灵活性 & 峰值处理能力(削峰) 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。 5.异步通信 很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
3. 消费模式
3.1 点对点模式
一对一,消费者主动拉取数据,消息收到后消息清除
消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。消息被消费以后, queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
3.2 发布/订阅模式
一对多,消费者消费数据之后不会清除消息
消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。
4. 基础架构
- Producer : 消息生产者,就是向 Kafka ;
- Consumer : 消息消费者,向 Kafka broker 取消息的客户端;
- Consumer Group (CG): 消费者组,由多个 consumer 组成。 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
- Broker :经纪人 一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。
- Topic : 话题,可以理解为一个队列, 生产者和消费者面向的都是一个 topic;
- Partition: 为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;
- Replica: 副本(Replication),为保证集群中的某个节点发生故障时, 该节点上的 partition 数据不丢失,且 Kafka仍然能够继续工作, Kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
- Leader: 每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
- Follower: 每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。 leader 发生故障时,某个 Follower 会成为新的 leader。
二、kafka安装
1. jar包下载
下载地址:http://kafka.apache.org/downloads
2. 安装
- 解压安装包
tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
2)修改解压后的文件名称
mv kafka_2.11-0.11.0.0/ kafka
3)创建data文件夹
cd kafka
mkdir data
4)修改配置文件
cd config/
vi server.properties
写入以下内容
#broker 的全局唯一编号,不能重复
broker.id=0
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接 Zookeeper 集群地址
zookeeper.connect=localhost:2181
5)配置环境变量
sudo vi /etc/profile
写入以下内容
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
执行source /etc/profile 生效
6)启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
3. 命令行操作
1)查看当前服务器中的所有topic
bin/kafka-topics.sh --zookeeper 10.10.2.114:2181 --list
2)创建topic
bin/kafka-topics.sh --zookeeper 10.10.2.114:2181 --create --replication-factor 3 --partitions 1 --
topic first
说明: – topic 定义 topic 名 – replication-factor 定义副本数 – partitions 定义分区数 3)删除topic
bin/kafka-topics.sh --zookeeper 10.10.2.114:2181 --delete --topic first
4)发送消息
bin/kafka-console-producer.sh --broker-list 10.10.2.114:9092 --topic first
5)消费消息
bin/kafka-console-consumer.sh --zookeeper 10.10.2.114:2181 --topic first
6)查看某个topic详情
bin/kafka-topics.sh --zookeeper 10.10.2.1114:2181 --describe --topic first
7)修改分区数
bin/kafka-topics.sh --zookeeper 10.10.2.114:2181 --alter --topic first --partitions 6
三、kafka案例
1.生产者代码
异步发送普通生产者
public class CustomProducer {
public static void main(String[] args) {
Properties props = new Properties();
// kafka 集群, broker-list
props.put("bootstrap.servers", "10.10.2.114:9092");
//可用ProducerConfig.ACKS_CONFIG 代替 "acks"
//props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put("acks", "all");
// 重试次数
props.put("retries", 1);
// 批次大小
props.put("batch.size", 16384);
// 等待时间
props.put("linger.ms", 1);
// RecordAccumulator 缓冲区大小
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("test", "test-" + Integer.toString(i),
"test-" + Integer.toString(i)));
}
producer.close();
}
}
异步发送回调生产者
public class CallBackProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "10.10.2.114:9092");//kafka 集群, broker-list
props.put("acks", "all");
props.put("retries", 1);//重试次数
props.put("batch.size", 16384);//批次大小
props.put("linger.ms", 1);//等待时间
props.put("buffer.memory", 33554432);//RecordAccumulator 缓冲区大小
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("test",
"test" + Integer.toString(i)), new Callback() {
//回调函数, 该方法会在 Producer 收到 ack 时调用,为异步调用
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println(metadata.partition() + " - " + metadata.offset());
} else {
exception.printStackTrace();
}
}
});
}
producer.close();
}
}
回调函数会在 producer 收到 ack 时调用,为异步调用, 该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果Exception 不为 null,说明消息发送失败。
注意: 消息发送失败会自动重试,不需要我们在回调函数中手动重试。
带自定义分区器的生产者
public class MyPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
// TODO Auto-generated method stub
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// TODO Auto-generated method stub
return 0;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}
具体内容填写可参考默认分区器org.apache.kafka.clients.producer.internals.DefaultPartitioner
然后Producer配置中注册使用
Properties props = new Properties();
...
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
...
Producer<String, String> producer = new KafkaProducer<>(props);
同步发送生产者
同步发送的意思就是,一条消息发送之后,会阻塞当前线程, 直至返回 ack。
由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同步发送的效果,只需在调用 Future 对象的 get 方发即可。
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("test", "test - 1"), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
...
}
}).get();//<----------------------
}
2.消费者代码
简单消费者
为了使我们能够专注于自己的业务逻辑, Kafka 提供了自动提交 offset 的功能。
自动提交 offset 的相关参数:
- enable.auto.commit:是否开启自动提交 offset 功能
- auto.commit.interval.ms:自动提交 offset 的时间间隔
public class CustomConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("group.id", "abc");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
消费者重置offset
Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。
由于 consumer 在消费过程中可能会出现断电宕机等故障, consumer 恢复后,需要从故障前的位置的继续消费,所以consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。
public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
Properties props = new Properties();
...
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("group.id", "abcd");//组id需另设,否则看不出上面一句的配置效果
...
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
从结果看,props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “earliest”);与命令行中bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning的–from-beginning拥有相同的作用。
消费者自定义存储offset
无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费。 offset 的维护是相当繁琐的, 因为需要考虑到消费者的 Rebalace。
当有新的消费者加入消费者组、 已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。
消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。
要实现自定义存储 offset,需要借助 ConsumerRebalanceListener, 以下为示例代码,其中提交和获取 offset 的方法,需要根据所选的 offset 存储系统自行实现。(可将offset存入MySQL数据库)
public class CustomSaveOffset {
private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
public static void main(String[] args) {
// 创建配置信息
Properties props = new Properties();
...
//<--------------------------------------
// 关闭自动提交 offset
props.put("enable.auto.commit", "false");
...
// 创建一个消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消费者订阅主题
consumer.subscribe(Arrays.asList("first"),
//<-------------------------------------
new ConsumerRebalanceListener() {
// 该方法会在 Rebalance 之前调用
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitOffset(currentOffset);
}
// 该方法会在 Rebalance 之后调用
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
currentOffset.clear();
for (TopicPartition partition : partitions) {
consumer.seek(partition, getOffset(partition));// 定位到最近提交的 offset 位置继续消费
}
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);// 消费者拉取数据
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
}
commitOffset(currentOffset);// 异步提交
}
}
// 获取某分区的最新 offset
private static long getOffset(TopicPartition partition) {
return 0;
}
// 提交该消费者所有分区的 offset
private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
}
}
|