Kafka
1、安装
集群规划
hadoop151 | hadoop152 | hadoop153 |
---|
zookeeper | zookeeper | zookeeper | kafka | kafka | kafka |
安装
-
第一步:将 kafka_2.11-0.11.0.0.tgz 上传到服务器并解压 -
第二步:在kafka目录下创建data目录 -
第三步:修改 kafka 目录下的 config 目录下的 server.properties 文件
broker.id=1
delete.topic.enable=true
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/module/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
zookeeper.connect=hadoop151:2181,hadoop152:2181,hadoop153:2181
-
第四步:将kafka整个文件分发到另外两台服务器上 -
第五步:将 hadoop152和hadoop153 服务器中的 server.properties 文件中的 broker.id 参数分别修改成 2和3 -
第六步:将三台服务器上的 zookeeper 启动 -
第七步:将三台服务器上的 kafka 启动 bin/kafka-server-start.sh -daemon server.properties文件目录及文件名
bin/kafka-server-start.sh -daemon config/server.properties
2、常用命令
-
查看当前服务器中的所有 topic(–list) bin/kafka-topics.sh --bootstrap-server [主机名:9092](集群) --list
bin/kafka-topics.sh --bootstrap-server hadoop151:9092,hadoop152:9092,hadoop153:9092 --list
-
创建 topic(–create)
bin/kafka-topics.sh --zookeeper [zookeeper的主机名:端口号](集群) \
--topic topic名称 --partitions 分区数 --replication-factor 副本数 --create
bin/kafka-topics.sh \
--zookeeper hadoop151:2181,hadoop152:2181,hadoop153:2181 \
--topic fzk --partitions 2 --replication-factor 2 --create
-
删除 topic(–delete) bin/kafka-topics.sh --zookeeper [zookeeper的主机名:端口号](集群) --topic topic名称 --delete
bin/kafka-topics.sh --zookeeper hadoop151:2181,hadoop152:2181,hadoop153:2181 --topic fzk --delete
-
查看某个 Topic 的详情(–describe) bin/kafka-topics.sh --zookeeper [zookeeper的主机名:端口号](集群) --topic topic名称 --describe
bin/kafka-topics.sh --zookeeper hadoop151:2181,hadoop152:2181,hadoop153:2181 \
--topic fzk --describe
-
修改分区数(–alter) bin/kafka-topics.sh --zookeeper [zookeeper的主机名:端口号](集群) \
--topic topic名称 --partitions 修改后的分区数 --alter
bin/kafka-topics.sh --zookeeper hadoop151:2181,hadoop152:2181,hadoop153:2181 \
--topic fzk --partitions 6 --alter
-
发送消息 bin/kafka-console-producer.sh --broker-list [主机名:9092](集群) --topic topic名称
bin/kafka-console-producer.sh \
--broker-list hadoop151:9092,hadoop152:9092,hadoop153:9092 \
--topic fzk
-
消费消息
bin/kafka-console-consumer.sh --bootstrap-server [主机名:9092](集群) --topic topic名称
bin/kafka-console-consumer.sh \
--bootstrap-server hadoop151:9092,hadoop152:9092,hadoop153:9092 \
--topic fzk
bin/kafka-console-consumer.sh --zookeeper 主机名:2181 --topic topic名称
bin/kafka-console-consumer.sh \
--zookeeper hadoop151:2181,hadoop152:2181,hadoop153:2181 \
--topic fzk
3、API
环境配置
生产者(Producer)
不带回调函数
- 使用到的类的说明
- KafkaProducer:需要创建一个生产者对象,用来发送数据
- ProducerConfig:获取所需的一系列配置参数
- ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class MyProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop151:9092,hadoop152:9092,hadoop153:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, 1);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>("fzk", "fzk -- " + i));
}
if(producer != null){
producer.close();
}
}
}
带回调函数
- 回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果Exception 不为 null,说明消息发送失败
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CallbackProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop151:9092,hadoop152:9092,hadoop153:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>("fzk", "fzk -- " + i), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){
System.out.println(recordMetadata.partition() + " --- " + recordMetadata.offset());
}
}
});
}
if (producer != null){
producer.close();
}
}
}
自定义分区(Partition)
-
第一步:自定义分区类 package com.itfzk.kafka.partitioner;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPartitioner implements Partitioner {
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
return 分区数;
}
public void close() {
}
public void configure(Map<String, ?> map) {
}
}
-
第二步:使用自定义分区
- 在 properties 中配置(ProducerConfig.PARTITIONER_CLASS_CONFIG)
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class PartitionerProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop151:9092,hadoop152:9092,hadoop153:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.itfzk.kafka.partitioner.MyPartitioner");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>("fzk", "fzk -- " + i), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){
System.out.println(recordMetadata.partition() + " --- " + recordMetadata.offset());
}
}
});
}
if (producer != null){
producer.close();
}
}
}
消费者(Consumer)
自动提交 offset
- 需要用到的类
- KafkaConsumer:需要创建一个消费者对象,用来消费数据
- ConsumerConfig:获取所需的一系列配置参数
- ConsuemrRecord:每条数据都要封装成一个 ConsumerRecord 对象
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class MyConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop151:9092,hadoop152:9092,hadoop153:9092");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList("fff", "zzz", "kkk"));
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + " --- " + consumerRecord.value());
}
}
}
}
手动提交 offset
异步提交
- 关闭自动提交 offset 功能:
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - 异步提交:
consumer.commitAsync()
package com.itfzk.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
public class ASyncCommitConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop151:9092,hadoop152:9092,hadoop153:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList("fff", "zzz", "kkk"));
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + " --- " + consumerRecord.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (e != null){
System.out.println("提交成功 -- " + map);
}
}
});
}
}
}
同步提交
- 关闭自动提交 offset 功能:
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - 同步提交:
consumer.commitSync();
package com.itfzk.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class SyncCommitConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop151:9092,hadoop152:9092,hadoop153:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList("fff", "zzz", "kkk"));
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + " --- " + consumerRecord.value());
}
consumer.commitSync();
}
}
}
自定义拦截器(Interceptor)
原理
- interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)
- 实现接口
org.apache.kafka.clients.producer.ProducerInterceptor ,其定义的方法包括:- configure(Map<String, ?> map)
- onSend(ProducerRecord<String, String> producerRecord)
- 该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法
- onAcknowledgement(RecordMetadata recordMetadata, Exception e)
- 该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程中失败时调用。并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率。
- close()
- 关闭 interceptor,主要用于执行一些资源清理工作如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意
案例
需求
- 实现一个简单的双 interceptor 组成的拦截链。
- 第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;
- 第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数
实现
-
添加时间戳(拦截器) import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class TimeInterceptor implements ProducerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> map) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
producerRecord.topic(),
producerRecord.partition(),
producerRecord.key(),
System.currentTimeMillis() + " -- " + producerRecord.value());
return record;
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
}
@Override
public void close() {
}
}
-
统计发送成功及失败的数量并打印(拦截器) import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class CountInterceptor implements ProducerInterceptor<String, String> {
private int successCount = 0;
private int errorCount = 0;
@Override
public void configure(Map<String, ?> map) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
return producerRecord;
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if(e == null){
successCount++;
}else{
errorCount++;
}
}
@Override
public void close() {
System.out.println("成功数量:" + successCount);
System.out.println("失败数量:" + errorCount);
}
}
-
生产者 import org.apache.kafka.clients.producer.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class InterceptorProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop151:9092,hadoop152:9092,hadoop153:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
List<String> interceptorList = new ArrayList<String>();
interceptorList.add("com.itfzk.kafka.interceptor.TimeInterceptor");
interceptorList.add("com.itfzk.kafka.interceptor.CountInterceptor");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptorList);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>("fzk", "fzk -- " + i));
}
if (producer != null){
producer.close();
}
}
}
4、Kafka监控(Eagle)
安装
-
第一步:修改 kafka 启动命令(kafka-server-start.sh) if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export JMX_PORT="9999"
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
fi
-
第二步:将 kafka-server-start.sh 分发到其他服务器 -
第三步:将 kafka-eagle-bin-1.3.7.tar.gz 上传到服务器并解压 -
第四步:进入到 eagle的bin目录,给启动文件(ke.sh)执行权限
-
第五步:修改配置文件(eagle的conf目录下的 system-config.properties) ######################################
# multi zookeeper&kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop151:2181,hadoop152:2181,hadoop153:2181
######################################
# kafka offset storage
######################################
cluster1.kafka.eagle.offset.storage=kafka
######################################
# enable kafka metrics
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.sql.fix.error=false
######################################
# kafka jdbc driver address
######################################
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://hadoop151:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
-
第六步:添加环境变量,并 source /etc/profile
export KE_HOME=/opt/software/eagle/kafka-eagle-web-1.3.7
export PATH=$PATH:$KE_HOME/bin
-
第七步:启动Zookeeper和Kafka -
第八步:启动Eagle
-
第九步:登录页面查看监控数据(用户名和密码:启动Eagle时显示)
http://192.168.9.102:8048/ke
5、Kafka对接Flume
-
第一步:编写Flume配置文件(flume-kafka.conf)
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop151
a1.sources.r1.port = 44444
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = fzk
a1.sinks.k1.kafka.bootstrap.servers = hadoop151:9092,hadoop152:9092,hadoop153:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
-
第二步:开启kafka消费者 bin/kafka-console-consumer.sh --bootstrap-server hadoop151:9092,hadoop152:9092,hadoop153:9092 \
--topic fzk
-
第三步:启动Flume bin/flume-ng agent -n agent名称 -c conf/ -f flume-kafka.conf文件路径及文件名
bin/flume-ng agent -n a1 -c conf/ -f job/flume-kafka.conf
|