IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kafka(四).Kafka&JAVA 基础API -> 正文阅读

[大数据]Kafka(四).Kafka&JAVA 基础API

Kafka(四).Kafka&JAVA 基础API

1.环境

使用java 来测试Kafka API 运行环境基于Kafka(三)的搭建的集群环境;

测试电脑(windows) 需要配置host

192.168.141.131 CentOSA
192.168.141.132 CentOSB
192.168.141.133 CentOSC

maven配置

		<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.0</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.7</version>
        </dependency>

2. 查看&创建&删除&查看 topic

public class KafkaTopicDML {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.创建KafkaAdminClient  相当创建了一个topic 连接的配置
        Properties properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");

        KafkaAdminClient adminClient = (KafkaAdminClient)KafkaAdminClient.create(properties);

        ListTopicsResult listTopics = adminClient.listTopics();
        Set<String> names = listTopics.names().get();
        for (String name : names) {
            System.out.println("topic 为:"+ name);
        }
        
        //异步创建  
        adminClient.createTopics(Arrays.asList(new NewTopic("topic03",3/*分区**/,(short)3/*副本因子**/)));
        
	    //同步创建
        //CreateTopicsResult topic05 = adminClient.createTopics(Arrays.asList(new NewTopic("topic05", 3, (short) 3)));
        //topic05.all().get();
        
        //异步删除
        adminClient.deleteTopics(Arrays.asList("topic05", "topic06"));
        
        //同步删除
        //DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("topic05", "topic06"));
        //deleteTopicsResult.all().get();
        
        //同步查看
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(names);
        Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
        System.out.println(stringTopicDescriptionMap);
        
        
        //关闭 adminClient
        adminClient.close();
    }
}

3. 生产者& 消费者

//生产者
public class KafkaProducerDemo {

    public static void main(String[] args) {
        //1.创建 KafkaProducer
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        KafkaProducer<String,String> kafkaProducer = new KafkaProducer (properties);
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("topic02", "key" + i, "value" + i);
            //发送
            kafkaProducer.send(record);
        }

        kafkaProducer.close();
    }
}

//消费者
public class KafkaConsumerDemo {

    public static void main(String[] args) {
        //1.创建 KafkaProducer
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"g2");

        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer (properties);
        kafkaConsumer.subscribe(Pattern.compile("^topic02.*"));
        while (true){
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            if(!records.isEmpty()){
                Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
                while (iterator.hasNext()){
                    ConsumerRecord<String, String> next = iterator.next();
                    System.out.println(next);
                }
            }
        }
    }
}

测试验证 不指定分区的情况下

###打开一个消费者   消费了三个分区  S1
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[topic02-0, topic02-1, topic02-2])

###再打开一个消费者  S2  ====>S1的一个分区被同组另一个消费者S2获取了
##S1 log
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Finished assignment for group at generation 8: {consumer-g2-1-58d640e4-0269-4c1c-a212-5f21bc761cca=Assignment(partitions=[topic02-0, topic02-1]), consumer-g2-1-81b66c7f-6c84-4095-809c-ce7afb4100a9=Assignment(partitions=[topic02-2])}

=======================================================================

##S2 log
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[topic02-2])

####再打开一个消费者  S3  ====>====>S1的一个分区被同组另一个消费者S3获取了
##s1 log
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Finished assignment for group at generation 9: {consumer-g2-1-27f9cab4-cacc-425c-81c8-6e9fc141580c=Assignment(partitions=[topic02-0]), consumer-g2-1-58d640e4-0269-4c1c-a212-5f21bc761cca=Assignment(partitions=[topic02-1]), consumer-g2-1-81b66c7f-6c84-4095-809c-ce7afb4100a9=Assignment(partitions=[topic02-2])}

###S3 log
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[topic02-0])

=======================================================================

###再打开一个消费者  S4  ====>已经没有分区在分给新增的同组消费者了
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[])

=======================================================================
##关闭S2   候补S4 接替S2监听的分区
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[])
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Adding newly assigned partitions: 
INFO  - AbstractCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Attempt to heartbeat failed since group is rebalancing
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Revoke previously assigned partitions 
INFO  - AbstractCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] (Re-)joining group
INFO  - AbstractCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Successfully joined group with generation Generation{generationId=11, memberId='consumer-g2-1-8f55c39e-9bf4-45a5-a35b-42ab9eb64d4f', protocol='range'}
INFO  - AbstractCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Successfully synced group in generation Generation{generationId=11, memberId='consumer-g2-1-8f55c39e-9bf4-45a5-a35b-42ab9eb64d4f', protocol='range'}
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Notifying assignor about the new Assignment(partitions=[topic02-2])
INFO  - ConsumerCoordinator        - [Consumer clientId=consumer-g2-1, groupId=g2] Adding newly assigned partitions: topic02-2

得出:消费者组内消费者每添加一个,消费者组内部会有一个简单的负载均衡机制,当消费者组成员数目大于分区数的时候就没有分区可以分配了,会进行候补;直到其中一个消费者宕机,最后新增的才可以替补宕机消费者的分区;

当开启三个消费者S1 S2 S3时 开启生产者生产10个记录

=======================================================================
##S1
ConsumerRecord(topic = topic02, partition = 1, leaderEpoch = 2, offset = 7, CreateTime = 1632753328402, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key0, value = value0)
ConsumerRecord(topic = topic02, partition = 1, leaderEpoch = 2, offset = 8, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key3, value = value3)
ConsumerRecord(topic = topic02, partition = 1, leaderEpoch = 2, offset = 9, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key5, value = value5)
ConsumerRecord(topic = topic02, partition = 1, leaderEpoch = 2, offset = 10, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key8, value = value8)

=======================================================================
##S2
ConsumerRecord(topic = topic02, partition = 2, leaderEpoch = 2, offset = 5, CreateTime = 1632753328415, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key1, value = value1)
ConsumerRecord(topic = topic02, partition = 2, leaderEpoch = 2, offset = 6, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key2, value = value2)
ConsumerRecord(topic = topic02, partition = 2, leaderEpoch = 2, offset = 7, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key9, value = value9)

=======================================================================
##S3
ConsumerRecord(topic = topic02, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key4, value = value4)
ConsumerRecord(topic = topic02, partition = 0, leaderEpoch = 0, offset = 7, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key6, value = value6)
ConsumerRecord(topic = topic02, partition = 0, leaderEpoch = 0, offset = 8, CreateTime = 1632753328416, serialized key size = 4, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = key7, value = value7)

得出:消费者消费的数据实在分区内有序 在分区之间比较是没有顺序的

4.指定消费那个分区

可以指定消费开始的偏移量 失去消费组的特性,消费者实例之间没有任何关系了

public class KafkaConsumerDemo_1 {
	//同时启动多个实例  实例之间是没有联系的
    public static void main(String[] args) {
        //1.创建 KafkaProducer
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
	    // properties.put(ConsumerConfig.GROUP_ID_CONFIG,"g2");  不需要指定组了

        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer (properties);
        List<TopicPartition> topics = Arrays.asList(new TopicPartition("topic02", 0));
        //指定订阅那个topic  那个分区 Partition
        kafkaConsumer.assign(topics);
        kafkaConsumer.seek(new TopicPartition("topic02", 0),3);//还可以设置偏移量  

        while (true){
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            if(!records.isEmpty()){
                Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
                while (iterator.hasNext()){
                    ConsumerRecord<String, String> next = iterator.next();
                    System.out.println(next);
                }
            }
        }
    }
}

4.1负载均衡

当启动三个消费者是 发送30 条数据 得到的log ===>默认的不是轮训而是hash

##S1  ==》11
key4
key6
key7
key10
key13
key14
key23
key24
key25
key26
key29

##S2  ==》9
key1
key2
key9
key11
key12
key19
key20
key21
key22
##S3   ==》10
key0
key3
key5
key8
key15
key16
key17
key18
key27
key28

4.2编写简单轮训

可以通过实现Partitioner接口 并配置到KafkaProducer中 来实现指定分区发送数据

//指定生产者分区 
/**
 * @Description 指定分区写数据
 * @Date 23:47 2021/9/27
 **/
public class KafkaProducerDemo_2 {

    public static void main(String[] args) {
        //1.创建 KafkaProducer
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer(properties);
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("topic02", "key" + i, "value" + i);
            kafkaProducer.send(record);
        }
        kafkaProducer.close();
    }

}
//MyPartitioner 是实现了轮训的   分区策略类  需要实现Partitioner 接口
public class MyPartitioner implements Partitioner {
    public MyPartitioner() {
    }
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        int size = cluster.topics().size();
        Integer key1 = null;
        try {
            key1 = Integer.valueOf(key.toString().replace("key", ""));
        } catch (NumberFormatException e) {
             return 1;
        }
        return key1%3;
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

5.发送对象序列化

可以使用现成的序列化包

		 <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.9</version>
        </dependency>

配置对象序列化

public class ObjectSerializer implements Serializer {
    @Override
    public byte[] serialize(String topic, Object data) {
        return SerializationUtils.serialize((Serializable) data);
    }
}

配置对象反序列化

public class ObjectDeserializer implements Deserializer<Object> {
    @Override
    public Object deserialize(String topic, byte[] data) {
        return  SerializationUtils.deserialize(data);
    }
}

测试对象

public class DemoObj implements Serializable {
    private Integer f1;
    private String  f2;
    private Date f3;
}

开始测试

生产者

public class KafkaProducerObjectDemo {

    public static void main(String[] args) {
        //1.创建 KafkaProducer
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,ObjectSerializer.class.getName());//修改
        KafkaProducer<String,DemoObj> kafkaProducer = new KafkaProducer (properties);
        for (int i = 0; i < 10; i++) {
            //修改
            ProducerRecord<String, DemoObj> record = new ProducerRecord<>("topic04", "key" + i,new DemoObj(i, UUID.randomUUID().toString(),new Date() ));
            kafkaProducer.send(record);
        }
        kafkaProducer.close();
    }
}

消费者

public class KafkaConsumerObjectDemo {

    public static void main(String[] args) {
        //1.创建 KafkaProducer
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ObjectDeserializer.class.getName());//修改
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"g2");
        KafkaConsumer<String,DemoObj> kafkaConsumer = new KafkaConsumer (properties);
        kafkaConsumer.subscribe(Arrays.asList("topic04"));
        while (true){
            //修改
            ConsumerRecords<String, DemoObj> records = kafkaConsumer.poll(Duration.ofSeconds(1));
            if(!records.isEmpty()){
                Iterator<ConsumerRecord<String, DemoObj>> iterator = records.iterator();
                while (iterator.hasNext()){
                    ConsumerRecord<String, DemoObj> next = iterator.next();
                    System.out.println(next.value());
                }
            }
        }
    }
}

测试结果 顺利反序列化

DemoObj{f1=0, f2='22c16c50-f703-4b78-ba1e-8fb2f58d9801', f3=Tue Sep 28 23:06:38 CST 2021}
DemoObj{f1=3, f2='abd6089a-3ac8-4952-b184-d09f94336893', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=5, f2='4732aafa-8a24-42b1-941d-b5efd72395e9', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=8, f2='08a56491-f238-4bba-afa5-008a39577756', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=4, f2='7bf2cb8b-e898-4581-b3ea-fecda46ba899', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=6, f2='0c14cf73-96e3-4f89-999b-5662e86900bf', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=7, f2='aa88582b-59ef-449c-a86d-c147aa1815e7', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=1, f2='b94e35a0-69ce-4190-b111-b9a65e9d60ed', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=2, f2='57c54244-9522-4689-97b3-411bd2fc587c', f3=Tue Sep 28 23:06:39 CST 2021}
DemoObj{f1=9, f2='6a3c5526-c015-425b-bdda-e242b1fda780', f3=Tue Sep 28 23:06:39 CST 2021}

6.自定义拦截器

可以对数据的发送 做一些拦截处理 ,比如发送失败处理

定义拦截器

public class KafkaProducerInterceptor implements ProducerInterceptor {
    //消息的进一步包装
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        return new ProducerRecord(record.topic(),record.key(),record.value()+"sffffffffff") ;
    }
    //成功或者失败调用
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println(metadata);
        System.out.println(exception);
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

生产者配置

public class KafkaProducerObjectInterceptorDemo {

    public static void main(String[] args) {
        //1.创建 KafkaProducer
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaProducerInterceptor.class.getName());//添加配置
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer (properties);
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("topic07", "key" + i,"value"+i);
            kafkaProducer.send(record);
        }
        kafkaProducer.close();
    }
}

消费者不改动;

消费者日志

//对应上面拦截器对  ProducerRecord 的value 重新包装
//打印 消息的 value
value0sffffffffff
value3sffffffffff
value5sffffffffff
value8sffffffffff
value1sffffffffff
value2sffffffffff
value9sffffffffff

生产者日志

//对应上面拦截器对  onAcknowledgement 方法对发送后的调用
topic07-0@10
null
topic07-0@11
null
topic07-0@12
null
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-29 10:21:27  更:2021-09-29 10:22:18 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 11:00:49-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码