目录
一、Producer是什么
二、Producer初始化代码流程
2.1 阅读源码的位置
2.2 源码阅读
1、producer有哪些func
?2、producer的属性
?3、producer的构造函数
一、Producer是什么
? ? ? ? kafka集群中,有三种角色:producer、broker、consumer。producer是数据生产者,作用是将数据发送到集群。consumer是数据消费者,从集群中消费数据。Broker可以看作是kafka集群实例,或者一个kafka服务实例。下面这张图描述了producer需要完成哪些功能。
?
二、Producer初始化代码流程
2.1 阅读源码的位置
?Producer源码位置:
org.apache.kafka.clients.producer.Producer
Producer Intergace位置:
org.apache.kafka.clients.producer.Producer
2.2 源码阅读
1、producer有哪些func
? ? ? ?先观察producer这个类有哪些行为,或者初始化它,需要来作些什么事情,具体执行做的事情就是发送数据、刷写、分区、关闭。那么后面producer初始化需要配置信息基本上就是和这些行为有关系。
?2、producer的属性
这部分看producer的初始化流程就是先大致粗略过一下producer初始化需要哪些东西,初始化过程中对哪些东西进行了赋值。
一般来说类型初始化的过程就是外部调用这个类的构造函数,函数中的参数列表就是需要初始化的东西。
例如对一个Student Class进行初始化,它传入的参数,一定是在类内部定义的。
public class Student {
String name;
Integer year;
public Student(String name, Integer year) {
this.name = name;
this.year = year;
}
}
那么接下来分析一下,producer中的常量,这里可以看到源码里面的常量,且所有的常量都是使用final修饰的,也就是说这些变量只能被赋值一次,赋值后,值就不可以更改。(为啥这样设计,后面分析。)
private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new
AtomicInteger(1);
private static final String JMX_PREFIX = "kafka.producer";
//客户端id
private String clientId;
// the func to split the data into groups
private final Partitioner partitioner;
private final int maxRequestSize;
private final long totalMemorySize;
//meatadata means the data how to be stored who can be used
private final Metadata metadata;
// 数据累加器
private final RecordAccumulator accumulator;
private final Sender sender;
private final Metrics metrics;
private final Thread ioThread;
private final CompressionType compressionType;
private final Sensor errors;
private final Time time;
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
private final ProducerConfig producerConfig;
private final long maxBlockTimeMs;
private final int requestTimeoutMs;
private final ProducerInterceptors<K, V> interceptors;
那么这么多变量,现在都先不做细致分析,大致的了解到,这些变量决定了producer发送消息的缓冲是多大,发送数据的过程中,序列化方式有哪些就好。
?3、producer的构造函数
下面这个是完整的构造函数,可以看出来有很多东西,一下子有点懵。我们可以看一下删减后的核心一些的代码,去除监控指标、序列化函数、一些老旧废弃指标的加载外,可以看到kafka的构造函数中就两段核心代码。
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
try {
log.trace("Starting the Kafka producer");
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
this.time = new SystemTime();
clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
Map<String, String> metricTags = new LinkedHashMap<String, String>();
metricTags.put("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.tags(metricTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
// kafka分区器
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
// 默认时间是100ms
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.keySerializer.configure(config.originals(), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = keySerializer;
}
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.valueSerializer.configure(config.originals(), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = valueSerializer;
}
// load interceptors and make sure they get clientId
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
/* check for user defined settings.
* If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
* This should be removed with release 0.9 when the deprecated configs are removed.
*/
if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
"Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
if (blockOnBufferFull) {
this.maxBlockTimeMs = Long.MAX_VALUE;
} else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
"Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
} else {
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
}
} else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
"Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
} else {
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
}
/* check for user defined settings.
* If the TIME_OUT config is set use that for request timeout.
* This should be removed with release 0.9
*/
if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
} else {
this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
}
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time);
//获得server地址
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
//获得通道构造器
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
//制作网络通信client
NetworkClient client = new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
this.metadata,
clientId,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.requestTimeoutMs, time);
//制作网络通信sender
this.sender = new Sender(client,
this.metadata,
this.accumulator,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
config.getInt(ProducerConfig.RETRIES_CONFIG),
this.metrics,
new SystemTime(),
clientId,
this.requestTimeoutMs);
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
//开启网络通信线程,就是那个上面说的background I/O
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
this.errors = this.metrics.sensor("errors");
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
close(0, TimeUnit.MILLISECONDS, true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}
4、Producer中的核心类RecordAccumulator构造和Kfaka IOThread构造
构造函数的核心代码:RecordAccumulator的构造和Kfaka ioThread的构造代码。这两块有啥妙用,等我学会了,再写。
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time);
//获得server地址
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
//获得通道构造器
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
//制作网络通信client
NetworkClient client = new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
this.metadata,
clientId,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.requestTimeoutMs, time);
//制作网络通信sender
this.sender = new Sender(client,
this.metadata,
this.accumulator,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
config.getInt(ProducerConfig.RETRIES_CONFIG),
this.metrics,
new SystemTime(),
clientId,
this.requestTimeoutMs);
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
//开启网络通信线程,就是那个上面说的background I/O
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
|