Kafka结构
结构图示
Kafka优点
1.可以无缝的支持多个生产者,不管客户端在使用单个主题还是多个主题。所以它很适合从多个前端系统收集数据,并以统一的格式对外提供数据。 2.支持多个消费者从消息流上读取数据,而且消费者之间互不影响。 3.基于磁盘的数据存储允许消费者非实时的读取消息。 4.具有灵活的伸缩性,可以先使用单个broker,再逐渐扩大。 总结:解除了生产者和消费者的耦合,通过基于磁盘的数据存储允许消费者非实时的读取消息,且broker可灵活拓展,这些特性使Kafka可以轻松的处理巨大的信息流,同时保证亚秒级的消息延迟。
broker部分
Zookeeper配置
Zookeeper配置中列出的地址格式遵循servier.X=hostname:peerPort:leaderPort的格式 hostname:服务器主机名或IP地址 peerPort:用于节点间通信的TCP端口 leaderPort:用于首领选举的TCP端口
broker和Zookeeper的连接
可以有多个Kafka集群连接一个Zookeeper群组 broker通过设置zookeeper.connect与Zookeeper连接,格式为hostname:post/path hostname:Zookeeper服务器的主机名或IP地址 post:Zookeeper的客户连接端口 path:可选的Zookeeper路径,作为Kafka集群的chroot环境
broker配置
broker通过log.dir设置日志存放目录和分区,通过可配置的线程池处理日志片段,当添加新的broker到集群中时,只需修改zookeeper.connect和broker.id两个配置参数 此外,有多个参数用于设置broker关闭文件的存储大小、时间和删除文件的存储大小、时间 (有关系统调优和硬件配置详见《Kafka权威指南》第二章)
tips:
集群的好处:
- 负载均衡
- 通过复制来避免单点故障造成的数据丢失和吞吐量低造成的性能问题
向Kafka写入数据
过程图示
生产者
生产者配置
三个必有属性:
- bootstrap.servers,格式为host:port,通过给定的broker可以找到其他的broker信息,但至少提供两个broker信息,以防其中一个宕机
- key.serializer,提供类将键对象序列化成字节数组
- value.serializer,将值序列化
代码示例:
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers","broker1:9002,broker2:9092);
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
producer=new KafkaProducer<String, String>(kafkaProps);
发送消息的三种方式
- 发送并忘记
不关心消息是否正常到达,有时候会丢失一些消息
ProducerRecord<String,String> record=new ProducerRecord<>("CustomerCountry","Precision Products","France");
try{
producer.send(record);
}catch(Exception e){
e.printStackTrace();
}
- 同步发送
用send()方法发送消息,返回一个Future对象,调用get()方法进行等待以知道是否发送成功
ProducerRecord<String,String> record=new ProducerRecord<>("CustomerCountry","Precision Products","France");
try{
producer.send(record).get();
}catch(Exception e){
e.printStackTrace();
}
- 异步发送
调用sent()方法,并指定一个回调函数,服务器在返回响应时调用该函数
private class DemoProducerCallback implements Callback{
@Override
public void onCompletion(RecordMetadaata recordMetadata,Exception e){
if(e!=null){
e.printStackTrace();
}
}
}
ProducerRecord<String,String> record=new ProducerRecord<>("CustomerCountry","Precision Products","France");
producer.send(record,new DemoProducerCallback());
序列化器
用于将生产者发送的对象转化为字节数组 例:JOSN、Avro、Thrift、Protobuf
|