消费者
基本概念
- 消费者:消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。
- 消费组:每个消费者都有一个对应的消费者组。也只属于一个消费组。
如果所有消费者都属于同一个消费组,那么所有的消息都会被均衡地投递到每一个消费者,每一个消息只会被一个消费者处理。如果所有的消费者都属于不同消费组,那么消息就会被广播给所有的消费者。
当某个主题中分区数小于小于消费者个数,那么就会出现有的消费者不能接收到消息。
代码
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
public class MyConsumer1 {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
Map<String,Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:port");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerDeserializer");
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
configs.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,1);
configs.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,500000);
configs.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,500);
configs.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,500000);
configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500);
configs.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,500000);
configs.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG,false);
configs.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG,65565);
configs.put(ConsumerConfig.SEND_BUFFER_CONFIG,65565);
configs.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,60000);
configs.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG,60000);
configs.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG,30);
configs.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG,30);
configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_uncommitted");
configs.put(ConsumerConfig.GROUP_ID_CONFIG,"consumer.demo");
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
KafkaConsumer<Integer,String> consumer = new KafkaConsumer<Integer, String>(configs);
final Pattern pattern = Pattern.compile("topic_[0-9]");
final List<String> topics = Arrays.asList("topic_1");
consumer.subscribe(topics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
collection.forEach(tp ->{
System.out.println("剥夺的分区:" + tp.partition());
});
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
collection.forEach(tp->{
System.out.println(tp.partition());
});
}
});
final ConsumerRecords<Integer,String> records = consumer.poll(Duration.ofSeconds(3));
final Iterable<ConsumerRecord<Integer,String>> topicIterable = records.records("topic_1");
topicIterable.forEach(record ->{
System.out.println("===============================");
System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));
System.out.println("消息的key:" + record.key());
System.out.println("消息的偏移量(当前消费到的位置):" + record.offset());
System.out.println("消息的分区号:" + record.partition());
System.out.println("消息的序列化key字节数:" + record.serializedKeySize());
System.out.println("消息的时间戳:" + record.timestamp());
System.out.println("消息的序列化value字节数:" + record.serializedValueSize());
System.out.println("消息的时间戳类型:" + record.timestampType());
System.out.println("消息的主题:" + record.topic());
System.out.println("消息的值:" + record.value());
});
consumer.commitSync();
}
}
订阅及取消订阅方式
public interface Consumer<K, V> extends Closeable {
void subscribe(Collection<String> topics);
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
void subscribe(Pattern pattern);
void assign(Collection<TopicPartition> partitions);
void unsubscribe();
}
public interface Consumer<K, V> extends Closeable {
List<PartitionInfo> partitionsFor(String topic);
}
consumer.subscribe(new ArrayList<String>());
consumer.assign(new ArrayList<TopicPartition>());
consumer.assignment();
位移提交
consumer.commitSync();
final ConsumerRecords<Integer,String> records = consumer.poll(Duration.ofSeconds(3));
final Iterable<ConsumerRecord<Integer,String>> topicIterable = records.records("topic_1");
topicIterable.forEach(record ->{
consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(),record.partition()),
new OffsetAndMetadata(record.offset()+1)));
});
final ConsumerRecords<Integer,String> records = consumer.poll(Duration.ofSeconds(3));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<Integer, String>> consumerRecords = records.records(partition);
for (ConsumerRecord<Integer, String> consumerRecord : consumerRecords) {
}
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(consumerRecords.get(consumerRecords.size()-1).offset())));
}
consumer.commitAsync();
拦截器
public interface ConsumerInterceptor<K, V> extends Configurable {
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
public void close();
}
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"类名");
|