IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> kafka--基准测试,javaAPI,分片与副本,生产者-broker-消费者数据的防丢措施,消息存储和查询,数据分发策略,数据负载均衡 -> 正文阅读

[大数据]kafka--基准测试,javaAPI,分片与副本,生产者-broker-消费者数据的防丢措施,消息存储和查询,数据分发策略,数据负载均衡

kakfa的基准测试

    1. 创建一个topic : 在实际上生产中, 可以创建多个 拥有不同数量的分片和副本topic
    ./kafka-topics.sh  --create --zookeeper node1:2181,node2:2181,node3:2181 
    --topic test02 --partitions 3 --replication-factor 1
    
    1. 测试写入效率:
    ./kafka-producer-perf-test.sh --topic test02 --num-records 5000000   
    --throughput -1 --record-size 1000 --producer-props 
    bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1
    
    属性说明:
    	--num-records  :  测试消息的条数
    	--throughput  : 是否需要限流  -1 不指定
    	--record-size :  每条数据的字节大小
    	acks : 消息确认方案
    

在这里插入图片描述

    1. 测试读取效率:
    ./kafka-consumer-perf-test.sh --broker-list node1:9092,node2:9092,node3:9092  
    --topic test02 --fetch-size 1048576 --messages 5000000
    
    属性说明:
    	--fetch-size 每次拉取的数量大小
    	--messages : 消息总条数
    	
    

在这里插入图片描述
总结

前提: 假设broker数量是无限的
	1) 当topic分片的数量越多, 读写效率越高
	2) topic的副本数量越多, 对读写效率影响越大

kafka的javaAPI操作

    1. 创建maven项目, 并导入相关的依赖
        <repositories><!--代码库-->
            <repository>
                <id>aliyun</id>
                <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
                <releases><enabled>true</enabled></releases>
                <snapshots>
                    <enabled>false</enabled>
                    <updatePolicy>never</updatePolicy>
                </snapshots>
            </repository>
        </repositories>
    
        <dependencies>
    
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.4.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-io</artifactId>
                <version>1.3.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.6</version>
            </dependency>
    
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.16</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <target>1.8</target>
                        <source>1.8</source>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    1. 创建包结构:
      producer
      consumer

使用java API 将数据生产到 kafka
代码实现:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

// 模拟kafka生产者:
public class KafkaProducerTest {

    public static void main(String[] args) {

        //1. 创建kafka生产者的核心类对象: KafkaProducer

        //1.1: 创建生产者配置对象: 设置相关配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        props.put("acks", "all"); // 消息的确认方案
        // key序列化类型
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
        // value 序列化类型
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

        Producer<String, String> producer = new KafkaProducer<>(props);

        //2. 发送数据

        for (int i = 0; i < 10; i++) {
            //2.1: 创建 生产者数据承载对象  一个对象代表是一条消息数据
	 ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test01",  Integer.toString(i));
            producer.send(producerRecord);
        }
        //3. 释放资源
        producer.close();

    }

}

使用javaAPI 消费kafka中数据
代码实现:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

// 模拟消费者JAVA API:
public class KafkaConsumerTest {


    public static void main(String[] args) {

        //1. 创建kafka的消费者的核心对象: KafkaConsumer

        //1.1: 创建消费者配置对象, 并设置相关的参数:
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        props.setProperty("group.id", "test"); // 消费者组的 id
        props.setProperty("enable.auto.commit", "true"); // 是否启动消费者自动提交消费偏移量
        props.setProperty("auto.commit.interval.ms", "1000"); // 每间隔多长时间提交一次偏移量: 单位 毫秒
         // key 反序列化
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         // val 发序列化
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");


        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        //2. 给消费者设置订阅topic:
        consumer.subscribe(Arrays.asList("test01"));

        //3. 循环获取相关的消息数据
        while (true) {
            //3.1: 从kafka中获取消息数据: 参数表示等待超时时间
            //  注意: 如果没有获取到数据, 返回一个空集合对象, 如果数据集合中有多个 ConsumerRecord 对象
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            // 3.2 遍历ConsumerRecords 获取每一个 ConsumerRecord 对象 : ConsumerRecord 消费者数据承载对象, 
            //一个对象就是一条消息
            for (ConsumerRecord<String, String> record : records) {
                String massage = record.value();

                System.out.println("消息数据为:"+massage);

            }
        }
    }
}

kafka的原理

kafka的分片与副本机制

  • topic的分片:
    • 描述: topic是一个逻辑架构, 理解为大大的容器, 而分片对这个大容器切割操作, 将其划为多个小的容器, 分别放置在不同的broker中, 进行分布式存储操作 , 分片数量与broker数量没有关系的
    • 作用:
      • 提高读写的效率, 或者说可以提升承受并发量
      • 解决单台节点存储容量有限的问题
  • topic的副本:
    • 描述: 对topic中每一个分片都可以构建多个副本, 副本的数量最多和broker节点数量是相等, 一般副本为 1~3
    • 作用:
      • 提升数据的可靠性, 保证数据不丢失
      • 会导致占用更多的磁盘空间, 冗余较大

kafka如何保证数据不丢失

生产端如何保证数据不丢失的
在这里插入图片描述
生产者是基于ack方案, 确保数据不丢失

生产端是如何保证数据不丢失:   ack 确认机制
0 :  当ACK确认级别设置为0时             
	生产者只管向broker发送数据, 并不去接收或者等待broker响应确认消息               
1:  当ACK确认级别设置为1时             
	生产者向broker发送消息, 需要等待对应topic的对应分片上主副本接收到消息后, 生产者认为消息发送成功了       
-1(ALL):  当ACK确认级别设置为-1(ALL)时               
	生产者向broker发送消息, 需要等待对应topic的对应分片上所有的副本都接收到数据后, 生产者认为数据发送成功了

效率角度:  0 > 1 > -1
安全角度:  -1 > 1 > 0

思考: 在实际生产中, 一般设置ack为多少呢?   三种都需要设置    
一般会根据数据的重要程度, 以及数据发生的频率确定合适的ack方案如何设置生产者ack方案呢?  props.put("acks", "all"); 

相关面试题:

生产者数据不丢失相关的面试题:      
1) 生产者发送一条数据, broker需要给予ack响应, 如果broker迟迟没有给与ack响应, 如何解决呢?     
	解决方案:             
		设置超时时间, 如果过了超时时间, 依然没有给与响应, 可以尝试进行重试策略(一般重试3次), 如果依然没有响应呢, 
		程序直接抛出异常, 通知相关人员进行处理       
2) 生产者发送一次消息, 就需要一次ack响应, 请问这样是否会对宽带带宽造成更大的影响呢? 如何解决呢       
	解决方案:  会产生影响              
		引入缓存池, 采用异步批量发送数据操作, 一批一批发送, 当缓存池中数据达到一批后, 就会触发执行发送操作        
3) 采用一批一批发送操作 如果broker又没有给与响应, 此时缓存池中数据以及满了, 如何解决呢?          
	解决方案:                  
		程序员可以选择直接清空缓存池或者不清空, 将缓存池中数据存储在临时的容器中, 然后程序抛出异常 通知相关的人员,
		 重启后, 先加载临时容器中数据重新发送即可

扩展配置信息:

扩展相关的配置信息:       
1) 超时时间:  
	delivery.timeout.ms : 总超时时间 默认 120s            
	request.timeout.ms : 每次超时时间 默认 30s                
2) 重试次数: retries 默认值  2147483647             
	最终重试次数, 是由  总超时时间和每次超时时间计算得出          
3) 一批数据:              
	数据大小: batch.size  默认值为  16kb             
	时间阈值(间隔时间):  linger.ms  默认值为  0        
4) 缓冲池大小: buffer.memory  默认值为: 32M        

5) 如何进行同步和异步发送操作

所有的配置参数, 全部都需要设置到生产者的properties对象中
props.put(key,value)

生产者所有配置:
http://kafka.apache.org/24/documentation.html#producerconfigs

实现同步发送消息:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

// 模拟kafka生产者_异步发送方式 (同步发送数据)
public class KafkaProducerTest02 {

	public static void main(String[] args) {

    //1. 创建kafka生产者的核心类对象: KafkaProducer

    //1.1: 创建生产者配置对象: 设置相关配置
    Properties props = new Properties();
    props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
    props.put("acks", "all"); // 消息的确认方案
     // key序列化类型
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     // value 序列化类型
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);

    //2. 发送数据

    for (int i = 0; i < 10; i++) {
        //2.1: 创建 生产者数据承载对象  一个对象代表是一条消息数据
       ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test01",  Integer.toString(i));
        try {
            producer.send(producerRecord).get(); // 如果没有抛出异常, 说明消息发送成功了
        } catch (Exception e) {
            // 如果捕获到了异常 认为消息发送失败了(重试后的失败)
            // 在此处, 编写异常后处理业务代码....
            
            e.printStackTrace();
        }
    }
    //3. 释放资源
    producer.close();
}
}

实现异步发送消息:

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

// 模拟kafka生产者_异步发送方式 (异步有返回值)
public class KafkaProducerTest03 {

public static void main(String[] args) {

    //1. 创建kafka生产者的核心类对象: KafkaProducer

    //1.1: 创建生产者配置对象: 设置相关配置
    Properties props = new Properties();
    props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
    props.put("acks", "all"); // 消息的确认方案
     // key序列化类型
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     // value 序列化类型
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);

    //2. 发送数据

    for (int i = 0; i < 10; i++) {
        //2.1: 创建 生产者数据承载对象  一个对象代表是一条消息数据
       ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test01",  Integer.toString(i));
        producer.send(producerRecord, new Callback() {
            // 回调函数: 由于采用异步发送操作, 当触发send方法时候, 并没有直接将消息发送到broker端, 而是将消息
            //          存储到缓存池中即可, send方法相当于不断向缓冲池写入数据
            // 当缓存池子中数据达到一批数据大小后, 会专门有子线程进行数据发送broker端: 一批一批发送操作 
            //      发送完一批数据后, 就会调用一次这个回调函数
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                
                if(exception != null){
                    // 认为数据发送失败了...  
                    
                    // 编写数据发生失败的业务代码
                }
                
                
            }
        });
    }
    //3. 释放资源
    producer.close();
}

}

broker端如何保证数据不丢失

  • 通过副本机制来保证, 副本越多 数据可靠性越高
单独通过副本机制, 是否可以保证不丢失呢?    不行的, 因为 如何 生产者设置ack为 0或在 1的时候, 副本再多可能都没用
	一般保证不丢失: 多副本 + ack为 -1(ALL)

消费端如何保证数据不丢失
在这里插入图片描述

不丢失机制流程:
	1) consumer连接kakfa集群, 开始读取数据进行消费, 首先kafka集群会根据传递过来group.id 查询上一次
	消费到哪个偏移量, 如果没有找到, 默认从当前位置开始消费, 如果找到了, 从上一次位置开始消费
	2) 消费者接收到消息后,开始消费数据即可
	3) 当消费者消费完成后, 将当前消费偏移量提交给kaka集群, 集群更新一下当前这个消费者组消费的偏移量的位置信息

总结: 通过此种模型, 可以确保数据不会丢失, 但是会导致数据重复消费问题


那么偏移量信息是记录在那个位置呢? 
	在老版本(0.8x以前)的kafka中, 偏移量信息是保持在zookeeper中, 但是在0.8x后, 偏移量数据保存到broker端, 
	通过一个topic来存储: __consumer_offsets 
		此topic有50个分区, 每个分区有一个副本

消费者偏移量的提交方式:
	自动提交偏移量 (推荐使用)
	手动提交偏移量

手动提交偏移量 :

	import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

// 模拟消费者JAVA API_ 手动提交偏移量
public class KafkaConsumerTest02 {

    public static void main(String[] args) {

        //1. 创建kafka的消费者的核心对象: KafkaConsumer

        //1.1: 创建消费者配置对象, 并设置相关的参数:
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        props.setProperty("group.id", "test"); // 消费者组的 id
        props.setProperty("enable.auto.commit", "false"); // 是否启动消费者自动提交消费偏移量
        //props.setProperty("auto.commit.interval.ms", "1000"); // 每间隔多长时间提交一次偏移量: 单位 毫秒
        // key 反序列化
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
        // val 发序列化
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 


        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        //2. 给消费者设置订阅topic:
        consumer.subscribe(Arrays.asList("test01"));

        //3. 循环获取相关的消息数据
        while (true) {
            //3.1: 从kafka中获取消息数据: 参数表示等待超时时间
            //  注意: 如果没有获取到数据, 返回一个空集合对象, 如果数据集合中有多个 ConsumerRecord 对象
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            // 3.2 遍历ConsumerRecords 获取每一个 ConsumerRecord 对象 : ConsumerRecord 消费者数据承载对象,
            // 一个对象就是一条消息
            for (ConsumerRecord<String, String> record : records) {
                String massage = record.value();

                System.out.println("消息数据为:"+massage);


                consumer.commitSync(); // 同步提交偏移量

            }
        }


    }
}

kafka的消息存储和查询机制

kafka的消息存储
在这里插入图片描述

如何修改默认168小时呢?  修改server.properties	
log.retention.hours=168

如何修改默认1GB log文件大小呢?  修改server.properties	
log.segment.bytes=1073741824

kafka的数据查询机制
在这里插入图片描述
需求: 假设目前某个分片下有如下的目录结构, 请查询 368776这个偏移量消息:

1) 确定消息在那个segment段中: 在第二个segment段中

2) 根据消息的偏移量到index文件中, 寻找对应消息在log文件的物理偏移范围

3) 读取log文件, 采用顺序查询方式, 找到对应范围下的数据, 直接获取

磁盘: 顺序读写 和 随机读写:   这两种对磁盘的读写操作, 那个效率更高一些?   顺序读写

kafka中生产者的数据分发策略

生产者的数据分发策略:  
当生产者将数据生产到对应topic中后, 那么这条数据最终被topic中那个分片所接收, 这就是分发机制

思考: 我们能想到分发策略有那些呢?  
1) 轮询
2) hash取模方案
3) 指定分片方式
4) 基于范围分发

那么kafka采用哪种分发策略呢? 
1) 轮询(在新版本中:2.4以上 更改为 粘性分区方案)
2) hash取模方案
3) 指定分片方式
4) 自定义分发策略


如何实现分发策略呢?  
分区类: DefaultPartitioner (默认分区类)
生产者数据承载对象:  ProducerRecord

如何模拟指定分区方案:  与分区类没有任何关系, 不需要采用分区类进行分发
// 参数2: 表示指定往哪个分片上发送数据: 分片编号是0开始
public ProducerRecord(String topic, Integer partition, K key, V value) {
    this(topic, partition, null, key, value, null);
}

如何模拟hash取模方案: 
public ProducerRecord(String topic, K key, V value) {
    this(topic, null, null, key, value, null);
}
此时会基于 DefaultPartitioner 根据key计算发送到那个分片上
// 参数1: topic名称  参数2: key值  参数3: key值字节数据  参数4: value值, 参数5; value字段数组 参数6: 集群对象
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    if (keyBytes == null) {  // 判断key是否为null
        return stickyPartitionCache.partition(topic, cluster);
    } 
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    // hash the keyBytes to choose a partition
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;  // 当key不为null, 基于hash取模计算操作
}

注意: 采用hash取模方式, 一定要确保key是可变的, 否则会出现所有的数据发往同一个分片情况 
	好处: 相同key 可以发往同一个分片上

如何模拟粘性分区方案: 
public ProducerRecord(String topic, V value) {
    this(topic, null, null, key, value, null);
}	

    此时会基于 DefaultPartitioner 根据key计算发送到那个分片上
// 参数1: topic名称  参数2: key值  参数3: key值字节数据  参数4: value值, 参数5; value字段数组 参数6: 集群对象
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    if (keyBytes == null) {  // 判断key是否为null
        return stickyPartitionCache.partition(topic, cluster);  // 粘性分区操作
    } 
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    // hash the keyBytes to choose a partition
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;  
}

粘性分区: 粘性分区是kafka在2.4版本以上新推出一种分发策略, 主要是为了替代轮询方案
	描述: 在进行分发数据时候, 首先会先随机选择某一个分片, 然后尽可能黏上这个分区, 
	将这一批数据全部写入到这个分区上即可, 每次写入一批数据, 都会先随机选择一个分片
	
	
轮询: 三个分片  0   1  2
	一批数据: 0 1 2 3 4 5 6 7 8 9
	
	0分片:  0 3 6 9
	1分片:  1 4 7
	2分片:  2 5 8
轮询方式在发送一批数据到broker端, 对数据根据分片数量拆分多个小的批次, 
一个批次对应一个分片, 发送到对应分片即可, 此种操作由于需要再次进行划分批次, 导致整个效率相对较低


如何自定义分区策略: 抄   抄DefaultPartitioner
1) 创建一个类, 实现 Partitioner 接口
2) 重写接口下的方法:  
	int partition(String topic, Object key, byte[] keyBytes, Object value, 
	byte[] valueBytes, Cluster cluster);  -- 设置分区的方法, 返回值表示要分到那个区
	void close(); -- 释放资源
	
3) 在 partition方法中自定义分区策略, 返回值为对应要分发的分区编号
4) 在生产者的properties对象中, 设置自定义分区类:
	key: partitioner.class
	value值: 
		默认值为: org.apache.kafka.clients.producer.internals.DefaultPartitioner
		修改为自定义类的 权限类名

kafka的消费者负载均衡的机制—消息积压

在这里插入图片描述

负载均衡机制规定: 
在一个消费者组内, 消费者数量最多和所监听的topic的分片数量相等, 如果消费者数量大于分片数量, 
必然会有某些消费者无法消费数据, 处于闲置状态

思考点:

请问 如何模拟 点对点发送消息模型? 
让所有监听这个topic的消费者都在同一个组中即可

请问  如何模拟 发布订阅发送消息模型?
让所有监听这个topic的消费者都在不同组中即可
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-11 16:42:24  更:2021-07-11 16:42:40 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/7 0:38:56-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码