IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> spark-Streaming -> 正文阅读

[大数据]spark-Streaming

spark-Streaming

1、SparkStreaming简介

  • SparkStreaming是流式处理框架,是Spark API(RDD)的扩展支持可扩展、高吞吐量、容错的准实时数据流处理
  • 实时数据的来源可以是:Kafka, Flume, Twitter, ZeroMQ或者TCP sockets,在接受数据同时可以使用高级功能的复杂算子来处理流数据
  • 最终处理后的数据可以存放在文件系统,数据库等,方便实时展现。

在这里插入图片描述

2、SparkStreaming与Storm的区别

  • 处理模型以及延迟
    • 都提供了可扩展性(scalability)和可容错性(fault tolerance)
    • Storm可以实现亚秒级时延的处理,而每次只处理一条event,而 Spark Streaming可以在一个短暂的时间窗口里面处理多条(batches)Event
    • 说Storm可以实现亚秒级时延的处理,而Spark Streaming则有一定的时延。
  • 容错和数据保证
    • 在Storm中,每条记录在系统的移动过程中都需要被标记跟踪,所以Storm只能保证每条记录最少被处理一次,但是允许从错误状态恢复时被处理多次。
    • Spark Streaming仅仅需要在批处理级别对记录进行追踪,所以他能保证每个批处理记录仅仅被处理一次
  • 实现和编程API
    • Storm主要是由Clojure语言实现,Spark Streaming是由Scala实现。
    • Storm提供了Java API,同时也支持其他语言的API。 Spark Streaming支持Scala和Java语言 (其实也支持Python)。
  • 批处理框架集成
    • 是在Spark框架上运行的。这样你就可以想使用其他批处理代码一样来写Spark Streaming程序,或者是在Spark中交互查询。这就减少了单独 编写流批量处理程序和历史数据处理程序。
  • 生产支持

3、SparkStreaming流式计算

3.1. 流式计算过程

  • Spark Streaming是将流式计算分解成一系列短小的批处理作业(batch)
  • 批处理引擎是Spark Core,按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每段数据都转换成Spark中的RDD(Resilient Distributed Dataset)
  • Streaming中对DStream的Transformation操作变为针对Spark中对RDD的 Transformation操作
  • 将RDD经过操作变成中间结果保存在内存中。整个流式计算根据业务的需求可以对中间的结 果进行叠加或者存储到外部设备。

在这里插入图片描述

  • Spark Streaming在内部的处理机制:

    • 接收实时流的数据,并根据一定的时间间隔拆分成一批批的数据,然后通过Spark Engine处 理这些批数据,最终得到处理后的一批批结果数据。
    • 对应的批数据,在Spark内核对应一个RDD实例,因此,对应流数据的DStream可以看成是一 组RDDs,即RDD的一个序列
    • 通俗点理解的话,在流数据分成一批一批后,通过一个先进先出的队列,然后 Spark Engine 从该队列中依次取出一个个批数据,把批数据封装成一个RDD,然后进行处理,这是一个典型的生产者消费者模型,对应的就有生产者消费者模型的问题,即如何协调生产速率和消费速 率。
  • receiver task

    • 在这里插入图片描述

    • receiver task是一直在执行,一直接受数据,将一段时间内接收来的数据保 存到batch中。

    • 假设batchInterval 为 5s,那么会将接收来的数据每隔 5 秒封装到一个 batch 中

    • batch 没有分布式计算特性,这一个batch的数据又被封装到一个RDD中最终封装到一个 DStream中,然后sparkStreaming回启动一个job去计算.

3.2. 流式计算特性

  • 容错性
    • 每一个RDD都是一个不可变的分布式可重算的数据集,其记录着确定性的操作继承关系 (lineage)
    • 只要输入数据是可容错的,那么任意一个RDD的分区(Partition)出错或不可用,都是可以 利用原始输入数据通过转换操作而重新算出的。
    • 每一行最后一个 RDD则表示每一个Batch Size所产生的中间结果RDD。都是通过lineage相连接的。
  • 实时性
    • Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会经过Spark DAG图分解以及Spark的任务集的调度过程。
    • 最小的Batch Size的选取在0.5~2秒钟之间(Storm 目前最小的延迟是100ms左右)
  • 扩展性与吞吐量
    • 已能够线性扩展到100个节点(每个节点4Core),可以以数秒的延迟处理 6GB/s的数据量(60M records/s),其吞吐量也比流行的Storm高2~5倍

3.3. 编程模型DStream

在这里插入图片描述

  • DStream(Discretized Stream)作为Spark Streaming的基础抽象,它代表持续性的数据流
  • DStream由一组时间序列上连续的RDD来表示。每个RDD都包含了自己特定时间间隔内的数据流。
  • DStream与RDD
    • Spark Streaming 计算还是基于Spark Core的Spark Core 的核心又是RDD. 所以Spark Streaming 肯定也要和RDD扯上关系。
    • Spark Streaming 并没有直接让用户使用RDD而是自己抽象了一套DStream的概念
    • DStream 和 RDD 是包含的关系,你可以理解为Java里的装饰模式,也就是DStream 是对 RDD的增强,但是行为表现和RDD是基本上差不多的。

4、SparkStreaming代码实现

4.1. 代码实现

  • pom.xml

    • <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
      -->
      <dependency>
      	<groupId>org.apache.spark</groupId>
      	<artifactId>spark-streaming_2.12</artifactId>
      	<version>2.4.6</version>
      </dependency>
      <!-- https://mvnrepository.com/artifact/org.apache.spark/sparkstreaming-kafka-0-10 -->
      <dependency>
      	<groupId>org.apache.spark</groupId>
      	<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
      	<version>2.4.6</version>
      </dependency>
      
  • 代码实现

    • def main(args: Array[String]): Unit = {
        //搭建环境
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("App")
        //设置参数
        val streamingContext = new StreamingContext(sparkConf, streaming.Seconds(5))
        //获取数据
        val value: ReceiverInputDStream[String] = streamingContext.socketTextStream("localhost", 9999)
        //操作数据
        val dStream: DStream[(String, Int)] = value.flatMap(_.split("\\s")).map((_, 1)).reduceByKey((x, y) => x + y)
        //打印数据
        dStream.print()
        //开启服务
        streamingContext.start()
        //等待停止
        streamingContext.awaitTermination()
        //关闭服务
        streamingContext.stop(false)
      }
      

4.2. DStream转换操作

  • transform(func)
    • 通过对源DStream的每RDD应用RDD-to-RDD函数返回一个 新的DStream,这可以用来在DStream做任意RDD操作。
  • updateStateByKey(func)
    • 该 updateStateByKey 操作可以让你保持任意状态,同时不断有新的信息进行更新。要使用此功能,必须进行两个步骤 :
    • 定义状态 - 状态可以是任意的数据类型。
    • 定义状态更新函数 - 用一个函数指定如何使用先前的状态和从输入流中获取的新值 更新状态
    • 使用到updateStateByKey要开启checkpoint机制和功能。

4.3. DStream窗口操作

  • 在Spark Streaming中,数据处理是按批进行的,而数据采集是逐条进行的,因此在Spark Streaming中会先设置好批处理间隔(batch duration),当超过批处理间隔的时候就会把采 集到的数据汇总起来成为一批数据交给系统去处理。

  • 窗口操作而言,在其窗口内部会有N个批处理数据,批处理数据的大小由窗口间隔 (window duration)决定,而窗口间隔指的就是窗口的持续时间,只有窗口 的长度满足了才会触发批数据的处理

  • 有另一参数就是滑动间隔(slide duration),它指的是经过多长时间窗口滑动一次形成新的窗口,滑动窗口默认情况下和批次间隔的相同

  • 在这里插入图片描述

  • 批处理间隔是1个时间单位,窗口间隔是3个时间单位,滑动间隔是2个时间单位。

  • 窗口长度和滑动间隔必须是batchInterval的整数倍。如果不是整数倍会检测报错

  • window(windowLength, slideInterval)

    • 窗口总长度(window length):你想计算多长时间的数据
    • 滑动时间间隔(slide interval):你每多长时间去更新一次
  • window(windowLength, slideInterval) 返回一个基于源DStream的窗口批次计算后得到新的DStream。

  • countByWindow(windowLength,slideInterval) 返回基于滑动窗口的DStream中的元素的数量。

  • reduceByWindow(func, windowLength,slideInterval) 基于滑动窗口对源DStream中的元素 进行聚合操作,得到一个新的 DStream。

  • countByValueAndWindow(windowLength,slideInterval, [numTasks]) 基于滑动窗口计算源DStream中每个 RDD内每个元素出现的频次并返回 DStream[(K,Long)],其中K是RDD 中元素的类型,Long是元素频次。

4.4. DStream输出操作

  • print() 打印出DStream中数据的前10个元素
  • saveAsTextFiles(prefix, [suffix]) 将DStream中的内容以文本的形式保存为文本文件,件以prefix-TIME_IN_MS[.suffix]的 方式命名。
  • saveAsObjectFiles(prefix, [suffix]) 按对象序列化并且以SequenceFile的格式保存。
  • saveAsHadoopFiles(prefix, [suffix]) 容以文本的形式保存为Hadoop文件,
  • foreachRDD(func) 将func函数应用于DStream中的RDD上,这个操作会输出数据到外部系统

5、SparkStreaming数据源

5.1. 基础数据源

  • streamingContext.socketTextStream()方法,可以通过 TCP 套接字连接,从文本数据中创建了一个 DStream。
  • streamingContext.**fileStream(dataDirectory)方法可以从任何文件系统 (如:HDFS、S3、NFS 等)**的文件中读取数据。
  • streamingContext.textFileStream(dataDirectory)来读取简单的文本件
  • streamingContext.actorStream(actorProps, actor-name)可以基于自定 义 Actors 的流创建DStream。
  • treamingContext.queueStream(queueOfRDDs)方法可以创建基于 RDD 队列的DStream。

5.2. 高级数据源

  • Twitter:Spark Streaming的TwitterUtils工具类使用Twitter4j。
  • Flume:Spark Streaming可以从Flume中接受数据
  • Kafka:Spark Streaming可以从Kafka中接受数据
  • Kinesis:Spark Streaming可以从Kinesis中接受数据。

7、SparkStreaming2.2(包含以前)+Kafka0.8

  • pom.xml
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
      <version>2.4.6</version>
    </dependency>
  • scala代码实现
    • 首先要启动zkserver
    • 再启动kafka
  def main(args: Array[String]): Unit = {
    //搭建环境
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("stream05_kafka")
      .set("spark.streaming.stopGracefullyOnShutdown","true")
    //设置streaming封装间隔
    val streamingContext = new StreamingContext(sparkConf, streaming.Seconds(3))

    //kafka配置
    val kafkaPar: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "yjx_bigdata",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (true: lang.Boolean)
    )
    //创建主题
    val topics: Array[String] = Array("userlog")
    //创建kafka
    val kfDStream: InputDStream[ConsumerRecord[String, String]]
        = KafkaUtils.createDirectStream(streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaPar))
    //编辑数据
    val result: DStream[(String, Int)] = kfDStream.map(_.value()).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    //打印数据
    result.print()
    //result.saveAsHadoopFiles("yjx", "txt")

    //开启streaming
    streamingContext.start()
    streamingContext.awaitTermination()
    //关闭streaming
    streamingContext.stop(false)

  }

8、Kafka接受数据方式

  • Kafka与Spark Streaming集成时有两种方法:旧的基于receiver的方法,新的基于direct stream的 方法

在这里插入图片描述

8.1. 基于receiver的方法

  • 基于receiver的方法采用Kafka的高级消费者API每个executor进程都不断拉取消息,并同时保存在executor内存与HDFS上的预写日志(write-ahead log/WAL)。当消息写入WAL后, 自动更新ZooKeeper中的offset
  • 可以保证at least once语义,但无法保证exactly once语义。虽然引入了WAL来确保消息不会丢失,但还有可能会出现消息已经写入WAL,但offset更新失败的情况,Kafka就会按上一次的offset重新发送消息。
  • receiver的并行度是由spark.streaming.blockInterval来决定的,默认为200ms
  • 假设 batchInterval为5s,那么每隔5s就会产生一个block,这里就对应每批次产生RDD的 partition,这样5秒产生的这个Dstream中的这个RDD的partition为25个,并行度就是25。如 果想提高并行度可以减少blockInterval的数值,但是最好不要低于50ms。

在这里插入图片描述

8.2. 基于direct stream的方法

  • 基于direct stream的方法采用Kafka的简单消费者API,它的流程大大简化了。executor不再从Kafka中连续读取消息,也消除了receiver和WAL。还有一个改进就是Kafka分区与RDD分区是一一对应的,更可控。

  • driver进程只需要每次从Kafka获得批次消息的offset range,然后executor进程根据offset range去读取该批次对应的消息即可。由于offset在Kafka中能唯一确定一条消息,且在外部只 能被Streaming程序本身感知到,因此消除了不一致性,达到了exactly once。

  • 由于它采用了简单消费者API,我们就需要自己来管理offset

  • Direct模式的并行度是由读取的kafka中topic的partition数决定的

在这里插入图片描述

8.3. 偏移量offset处理

  • SparkStreaming Kafka 维护offset 官网有三种实现方式
    • Checkpoints
    • Kafka itself
    • Your own data store 自定义

8.3.1. CheckPoint

  • spark streaming里面管理偏移量的策略,默认的spark streaming它自带管理的offset的方式是通过checkpoint来记录每个批次的状态持久化到HDFS中,如果机器发生故障,或者程序故障停止, 下次启动时候,仍然可以从checkpoint的目录中读取故障时候rdd的状态,便能接着上次处理的数据继续处理。
  • checkpoint方式最大的弊端是如果代码升级,新版本的jar不能复用旧版本的序列化状态,导致两个版本不能平滑过渡,结果就是要么丢数据,要么数据重复。
  • checkpoint第一次持久化的时候会把整个相关的jar给序列化成一个二进制文件,每次重启都会从里面恢复,但是当你新的程序打包之后序列化加载的仍然是旧的序列化文件,这就会导致报错或者依旧执行旧代码。
  • 一旦你删除了旧的checkpoint,新启动的程序,只能从kafka的smallest或者largest的偏移量消 费,默认是从最新的。

8.3.2. Kafka Itself

  • kafka本身就有机制可以定时存储消费者分组的偏移量,但是这样会有重复消费的情况,还如果采用这种方式那么就是将kafka的offset全部交给kafka管理
  • kafka的的数据其实也是内存和磁盘存储的,如果数据量上来了,无疑也是对kafka集群的一种压力。
  • 因为我们项目在实际开发中的时候,遇到数据峰值很高的时候kafka集群的磁盘io是特别高的,这样是非常不安全的。

8.3.3. Your own data store

  • 自己写代码管理offset,其实就是把每批次offset存储到一个外部的存储系统里面包括(Hbase,HDFS,Zookeeper,Kafka,DB等等
    • 当一个新的spark streaming+kafka的流式项目第一次启动的时候,这个时候发现外部的存储系统并没有记录任何有关这个topic所有分区的偏移量,所以就从 KafkaUtils.createDirectStream直接创建InputStream流默认是从最新的偏移量消费,如果 是第一次其实最新和最旧的偏移量时相等的都是0,然后在以后的每个批次中都会把最新的offset给存储到外部存储系统中,不断的做更新。
    • 当流式项目停止后再次启动,会首先从外部存储系统读取是否记录的有偏移量,如果有的话, 就读取这个偏移量,然后把偏移量集合传入到KafkaUtils.createDirectStream中进行构建 InputSteam,这样的话就可以接着上次停止后的偏移量继续处理,

9、数据的反压机制

? 数据流入的速度远高于数据处理的速度,对流处理系统构成巨大的负载压力,如果不能正确处理, 可能导致集群资源耗尽最终集群崩溃,因此有效的反压机制(backpressure)对保障流处理系统的稳定至关重要。

在这里插入图片描述

9.1. Storm反压

  • 旧版本
    • 开启了acker机制的storm程序,可以通过设置conf.setMaxSpoutPending参数来实现反压效果,如果下游组件(bolt)处理速度跟不上导致spout发送的tuple没有及时确认的数超过了参数设定的值spout会停止发送数据
    • 但是conf.setMaxSpoutPending参数的设置很难达到最好的反压效果
      • 设小了会导致吞吐上不去
      • 设大了会导致worker OOM;有震荡,数据流会处于一个颠簸状态,效果不如逐级反 压;
      • 另外对于关闭acker机制的程序无效;
  • 新版本
    • storm自动反压机制(Automatic Back Pressure)通过监控bolt中的接收队列的情况当超过高水位值时专门的线程会将反压信息写到 Zookeeper ,Zookeeper上的watch会通知该拓扑的所有Worker都进入反压状态,最后Spout降低tuple发送的速度

在这里插入图片描述

9.2. Spark反压

  • 旧版本

    • Spark Streaming程序中当计算过程中出现batch processing time > batch interval的情况时,(其中batch processing time为实际计算一个批次花费时间,batch interval为Streaming 应用设置的批处理间隔)

    • 意味着处理数据的速度小于接收数据的速度,如果这种情况持续过长的时间,会造成数据在内存中堆积,导致Receiver所在Executor内存溢出等问题(如果设置StorageLevel包含disk, 则内存存放不下的数据会溢写至disk, 加大延迟)

    • 可以通过设置参数spark.streaming.receiver.maxRate来限制Receiver的数据接收速率,此举 虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。

    • 比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。

  • 新版本

  • Spark Streaming 从v1.5开始引入反压机制(back-pressure),通过动态控制数据接收速率来适配集群数据处理能力

  • Spark Streaming Backpressure: 根据JobScheduler反馈作业的执行信息来动态调整 Receiver数据接收率

  • 在这里插入图片描述

  • spark streaming的数据源方式有两种:

    • 若是基于Receiver的数据源,可以通过设置spark.streaming.receiver.maxRate来控制最大输入速率
    • 若是基于Direct的数据源(如Kafka Direct Stream),则可以通过设置 spark.streaming.kafka.maxRatePerPartition来控制最大输入速率
  • 反压原理

    • RateController 继承自接口StreamingListener,并实现了onBatchCompleted方 法。每一个Batch处理完成后都会调用此方法。记录批处理的处理时间与数量。
    • RateEstimator 是速率估算器,主要用来估算最大处理速率。
    • RateLimiter 是一个抽象类,实质上是一个限流器,也可以叫做令牌
      • 如果Executor中task每秒计算的速度大于该值则阻塞,如果小于该值则通过将流数据加入缓存中进行计算。这种机制也可以叫做令牌桶机制。
      • 令牌桶机制: 大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。当进行某操作时需要令牌时会从令牌桶中取出相应的令牌数,如果获取到则继续操作,否则阻塞。用完之后不用放回

在这里插入图片描述

  • 注意:
    • 反压机制真正起作用时需要至少处理一个批
    • 保证反压机制真正起作用前应用不会崩溃,需要控制每个批次最大摄入速率

10、SparkStreaming事务

  • Exactly-once 语义是实时计算的难点之一。要做到每一条记录只会被处理一次,即使服务器或网络 发生故障时也能保证没有遗漏,这不仅需要实时计算框架本身的支持,还对上游的消息系统、下游 的数据存储有所要求。

  • 一个Spark Streaming程序由三步组成:输入、处理逻辑、输出。要达到exactly once的理想状态,需要三步协同进行,而不是只与处理逻辑有关。

10.1. 逻辑处理

  • Spark容错分为:Driver级别的容错和Executor级别的容错。
    • Driver级别的容错具体为DAG生成的模板,即DStreamGraph,存储着RecevierTracker中存储的元数据信息和JobScheduler中存储的Job进行的进度情况等信息,只要通过checkpoint就可以了,每个Job生成之前进行checkpoint,在Job生成之后再进行checkpoint,如果出错的话就从checkpoint中恢复
    • 在Executor级别的容错具体为接收数据的安全性任务执行的安全性。在接收数据安全性方面,一种方式是Spark Streaming接收到数据默认为MEMORY_AND_DISK_2的方式,另外一种方式是WAL(Write Ahead Log),在 数据到来时先通过WAL机制将数据进行日志记录,如果有问题则从日志记录中恢复。
  • Spark Streaming的容错机制是基于RDD的容错机制
    • checkpoint
    • 基于血统(lineage)的高度容错机制
    • 出错了之后会从出错的位置从新计算,而不会导致重复计算。
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-15 15:39:36  更:2021-08-15 15:42:31 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年9日历 -2024/9/28 23:27:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码