| 1:offset自动控制Kafka消费者默认对于未订阅的topic的offset的时候,也就是系统并没有存储该消费者的消费分区的记录信息,默认Kafka消费者的默认首次消费策略:latest auto.offset.reset=latest earliest - 自动将偏移量重置为最早的偏移量latest - 自动将偏移量重置为最新的偏移量none - 如果未找到消费者组的先前偏移量,则向消费者抛出异常
 Kafka消费者在消费数据的时候默认会定期的提交消费的偏移量,这样就可以保证所有的消息至少可以被消费者消费1次,用户可以通过以下两个参数配置://是否自动提交offset
 enable.auto.commit = true 默认
 //自动提交时,多久提交
 auto.commit.interval.ms = 5000 默认
 如果用户需要自己管理offset的自动提交,可以关闭offset的自动提交,手动管理offset提交的偏移量,注意用户提交的offset偏移量永远都要比本次消费的偏移量+1,因为提交的offset是kafka消费者下一次抓取数据的位置。 1:消费者自动提交offsetpackage com.kafka.offset;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
public class KafkaConsumerDemo_01 {
    public static void main(String[] args) {
        
        Properties props=new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,10000);
        
        KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);
        
        consumer.subscribe(Pattern.compile("^topic.*$"));
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();
            while (recordIterator.hasNext()){
                ConsumerRecord<String, String> record = recordIterator.next();
                String key = record.key();
                String value = record.value();
                long offset = record.offset();
                int partition = record.partition();
                System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset);
            }
        }
    }
}
 2:手动提交
 package com.kafka.offset;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
public class KafkaConsumerDemo_02 {
    public static void main(String[] args) {
        
        Properties props=new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        
        KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);
        
        consumer.subscribe(Pattern.compile("^topic.*$"));
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();
            while (recordIterator.hasNext()){
                ConsumerRecord<String, String> record = recordIterator.next();
                String key = record.key();
                String value = record.value();
                long offset = record.offset();
                int partition = record.partition();
                Map<TopicPartition, OffsetAndMetadata> offsets=new HashMap<TopicPartition, OffsetAndMetadata>();
                offsets.put(new TopicPartition(record.topic(),partition),new OffsetAndMetadata(offset));
                consumer.commitAsync(offsets, new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        System.out.println("完成:"+offset+"提交!");
                    }
                });
                System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset);
            }
        }
    }
}
 3:指定offset消费auto.offset.reset = earliest | latest | none 默认是 latest。当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
 (1)earliest:自动将偏移量重置为最早的偏移量,–from-beginning。
 (2)latest(默认值):自动将偏移量重置为最新偏移量。
 (3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
 
  
  2:Ackes应答和Retores重试Kafka生产者在发送完一个的消息之后,要求Broker在规定的额时间Ack应答答,如果没有在规定时间内应答,Kafka生产者会尝试n次重新发送消息。 acks=1 默认 acks=1 - Leader会将Record写到其本地日志中,但会在不等待所有Follower的完全确认的情况下做出响应。在这种情况下,如果Leader在确认记录后立即失败,但在Follower复制记录之前失败,则记录将丢失。acks=0 - 生产者根本不会等待服务器的任何确认。该记录将立即添加到套接字缓冲区中并视为已发送。在这种情况下,不能保证服务器已收到记录。acks=all - 这意味着Leader将等待全套同步副本确认记录。这保证了只要至少一个同步副本仍处于活动状态,记录就不会丢失。这是最有力的保证。这等效于acks = -1设置。
 如果生产者在规定的时间内,并没有得到Kafka的Leader的Ack应答,Kafka可以开启reties机制。 request.timeout.ms = 30000 默认等待应答时间retries = 2147483647 默认重试次数
 
 package com.kafka.acks;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerDemo_02 {
    public static void main(String[] args) {
        
        Properties props=new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,UserDefineProducerInterceptor.class.getName());
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,1);
        props.put(ProducerConfig.ACKS_CONFIG,"-1");
        props.put(ProducerConfig.RETRIES_CONFIG,3);
        
        
        KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);
        
        for(Integer i=0;i< 1;i++){
            ProducerRecord<String, String> record = new ProducerRecord<>("topic01", "key" + i, "value" + i);
            producer.send(record);
        }
        producer.close();
    }
}
 3:幂等写HTTP/1.1中对幂等性的定义是:一次和多次请求某一个资源对于资源本身应该具有同样的结果(网络超时等问题除外)。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。 Kafka在0.11.0.0版本支持增加了对幂等的支持。幂等是针对生产者角度的特性。幂等可以保证上生产者发送的消息,不会丢失,而且不会重复。实现幂等的关键点就是服务端可以区分请求是否重复,过滤掉重复的请求。要区分请求是否重复的有两点: 唯一标识:要想区分请求是否重复,请求中就得有唯一标识。例如支付请求中,订单号就是唯一标识 记录下已处理过的请求标识:光有唯一标识还不够,还需要记录下那些请求是已经处理过的,这样当收到新的请求时,用新请求中的标识和处理记录进行比较,如果处理记录中有相同的标识,说明是重复记录,拒绝掉。
  等又称为exactly once。要停止多次处理消息,必须仅将其持久化到Kafka Topic中仅仅一次。在初始化期间,kafka会给生产者生成一个唯一的ID称为Producer ID或PID。
 PID和序列号与消息捆绑在一起,然后发送给Broker。由于序列号从零开始并且单调递增,因此,仅当消息的序列号比该PID / TopicPartition对中最后提交的消息正好大1时,Broker才会接受该消息。如果不是这种情况,则Broker认定是生产者重新发送该消息。 enable.idempotence= false 默认 注意:在使用幂等性的时候,要求必须开启retries=true和acks=all package com.kafka.acks;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerDemo_02 {
    public static void main(String[] args) {
        
        Properties props=new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,UserDefineProducerInterceptor.class.getName());
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,1);
        props.put(ProducerConfig.ACKS_CONFIG,"-1");
        props.put(ProducerConfig.RETRIES_CONFIG,3);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
        
        KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);
        
        for(Integer i=0;i< 1;i++){
            ProducerRecord<String, String> record = new ProducerRecord<>("topic01", "key" + i, "value" + i);
            producer.send(record);
        }
        producer.close();
    }
}
 4:生产者事务在Kafka0.11.0.0除了引入的幂等性的概念,同时也引入了事务的概念。通常Kafka的事务分为 生产者事务Only、消费者&生产者事务。一般来说默认消费者消费的消息的级别是read_uncommited数据,这有可能读取到事务失败的数据,所有在开启生产者事务之后,需要用户设置消费者的事务隔离级别。 isolation.level = read_uncommitted 默认 该选项有两个值read_committed|read_uncommitted,如果开始事务控制,消费端必须将事务的隔离级别设置为read_committed 开启的生产者事务的时候,只需要指定transactional.id属性即可,一旦开启了事务,默认生产者就已经开启了幂等性。但是要求"transactional.id"的取值必须是唯一的,同一时刻只能有一"transactional.id"存储在,其他的将会被关闭。  事务的api一共有5个,如下:
 
void initTransactions();
void beginTransaction() throws ProducerFencedException;
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
 String consumerGroupId) throws 
ProducerFencedException;
void commitTransaction() throws ProducerFencedException;
void abortTransaction() throws ProducerFencedException;
 1:生产者事务package com.kafka.transactions;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerDemo01 {
    public static void main(String[] args) {
        
        Properties props=new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction-id");
        
        KafkaProducer<String,String> producer=new KafkaProducer<String, String>(props);
        producer.initTransactions();
        try{
            producer.beginTransaction();
            
            for(Integer i=0;i< 10;i++){
                Thread.sleep(10000);
                ProducerRecord<String, String> record = new ProducerRecord<>("topic01", "key" + i, "value" + i);
                producer.send(record);
            }
            producer.commitTransaction();
        }catch (Exception e){
            producer.abortTransaction();
        }
        producer.close();
    }
}
 2:消费者事务package com.kafka.transactions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;
public class KafkaConsumerDemo {
    public static void main(String[] args) {
        
        Properties props=new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"group01");
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");
        
        KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(props);
        
        consumer.subscribe(Pattern.compile("^topic.*$"));
        while (true){
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            Iterator<ConsumerRecord<String, String>> recordIterator = consumerRecords.iterator();
            while (recordIterator.hasNext()){
                ConsumerRecord<String, String> record = recordIterator.next();
                String key = record.key();
                String value = record.value();
                long offset = record.offset();
                int partition = record.partition();
                System.out.println("key:"+key+",value:"+value+",partition:"+partition+",offset:"+offset);
            }
        }
    }
}
 5:消费者&生产者事务我们来设想一个场景:双十一的时候
 1:业务系统将订单消息放入了topicA中,
 2:仓库系统将订单从A中拿到并进行业务处理,然后将处理成功的订单消息发送给topicB
 3:消息通知系统将订单消息从B中拿到后,给用户发送消息;
 那么在以上业务场景中,一旦消息系统处理失败,俺么仓库系统也应该回滚,将订单在topic中的订单消息设为未处理,否则将会造成重复消费; 上述场景被称为消费者$生产者事务。 package com.kafka.transactions;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.*;
public class KafkaProducerDemo02 {
    public static void main(String[] args) {
        
        KafkaProducer<String,String> producer=buildKafkaProducer();
        KafkaConsumer<String, String> consumer = buildKafkaConsumer("group01");
        consumer.subscribe(Arrays.asList("topic01"));
        producer.initTransactions();
        try{
            while(true){
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                Iterator<ConsumerRecord<String, String>> consumerRecordIterator = consumerRecords.iterator();
                
                producer.beginTransaction();
                Map<TopicPartition, OffsetAndMetadata> offsets=new HashMap<TopicPartition, OffsetAndMetadata>();
                while (consumerRecordIterator.hasNext()){
                    ConsumerRecord<String, String> record = consumerRecordIterator.next();
                    
                    ProducerRecord<String,String> producerRecord=new ProducerRecord<String,String>("topic02",record.key(),record.value());
                    producer.send(producerRecord);
                    
                    offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()+1));
                }
                
                producer.sendOffsetsToTransaction(offsets,"group01");
                producer.commitTransaction();
            }
        }catch (Exception e){
            producer.abortTransaction();
        }finally {
            producer.close();
        }
    }
    public static KafkaProducer<String,String> buildKafkaProducer(){
        Properties props=new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction-id");
        return new KafkaProducer<String, String>(props);
    }
    public static KafkaConsumer<String,String> buildKafkaConsumer(String group){
        Properties props=new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,group);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");
        return new KafkaConsumer<String, String>(props);
    }
}
 |