IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> SparkStreaming 数据生成器代码 -> 正文阅读

[大数据]SparkStreaming 数据生成器代码

模拟爬虫从网络中爬取数据的过程,并将数据灌输到Kafka中,供下游消费者使用

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.wspark</groupId>
    <artifactId>KafkaProducer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <build>
        <plugins>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>


            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


    <dependencies>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.12.12</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.12.12</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.12.12</version>
        </dependency>


        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.1.2</version>
            <!--<scope>compile</scope>-->
            <exclusions>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>

                </exclusion>

                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-reflect</artifactId>

                </exclusion>

                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-compiler</artifactId>

                </exclusion>

            </exclusions>
        </dependency>

        <!--Spark核心依赖包-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.1.2</version>
            <!--<scope>compile</scope>-->
            <exclusions>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>

                </exclusion>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-reflect</artifactId>

                </exclusion>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-compiler</artifactId>

                </exclusion>
            </exclusions>
        </dependency>

        <!--SparkStreaming依赖包-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.1.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>

                </exclusion>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-reflect</artifactId>

                </exclusion>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-compiler</artifactId>

                </exclusion>
            </exclusions>
        </dependency>


        <!--Spark Streaming Kafka依赖包 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.1.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>

                </exclusion>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-reflect</artifactId>

                </exclusion>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-compiler</artifactId>

                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.1.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>

                </exclusion>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-reflect</artifactId>

                </exclusion>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-compiler</artifactId>

                </exclusion>
            </exclusions>
        </dependency>


        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.12</artifactId>
            <version>3.1.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>

                </exclusion>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-reflect</artifactId>

                </exclusion>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-compiler</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


        <!--   <dependency>
               <groupId>org.scala-lang</groupId>
               <artifactId>scala-swing</artifactId>
               <version>2.12.12</version>
           </dependency>-->

        <!--<dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>1.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.0</version>
        </dependency>-->

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.21</version>
            <scope>compile</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/log4j/log4j -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
  

    </dependencies>

</project>

完成jar包引入,创建Producer进行虚拟生产者的开发,连接kafka并向其推送模拟的用户评论数据

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.util.Random

/*
* 用户生成模拟数据的生产者
* */

object Producer extends App {

  @Override
  override def main(args: Array[String]): Unit = {

    val rnd = new Random()
    val events = 100000 // 一万条消息
    val props = new Properties()
    props.put("bootstrap.servers", "192.168.30.21:9092,192.168.30.22:9092,192.168.30.23:9092") // broker集群地址
    props.put("client.id", "KafkaFreqGenerator")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") // 可以序列化
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") // value 序列化

    val producer = new KafkaProducer[String, String](props) // 创建kafka对象
    val startTime = System.currentTimeMillis(); // 毫秒时间

    // 读取汉字字典 /home/hadoop/tools/jsonfile/  当前工作目录
    val source = scala.io.Source.fromFile("D:\\360Downloads\\hanzi.txt") // 从指定的目录读取文本文件
    val lines = try source.mkString finally source.close() // try finally 使用技巧
    for (nEvents <- Range(0, events)) { // 循环生成一万条用户的评论数据
      // 模拟评论数据(user,comment)
      val sb = new StringBuilder()
      for (ind <- Range(0, rnd.nextInt(200))) {
        sb += lines.charAt(rnd.nextInt(lines.length()))
      }
      val userName = "user_" + rnd.nextInt(100)
      //println("<message content> :"+sb.toString())
      val data = new ProducerRecord[String, String]("t01", userName, sb.toString()) // userName --  key sb.toString()为值
      // 发送评论到kafka
     /* val futureCallback = producer.send(data)
      val answer = futureCallback.get()
      val topic = answer.topic();*/
      //println("messege has sended : "+topic)
      producer.send(data)
    }
    System.out.println("sent per second: " + events * 1000 / (System.currentTimeMillis() - startTime))
    producer.close()
  }
}

?

为了验证数据生成器效果,创建ProducerConsumer,创建一个简单的消费者来进行检验

import java.time.Duration
import java.util.concurrent._
import java.util.{Collections, Properties}
import scala.collection.JavaConversions._
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}

class ProducerConsumer  (val brokers:String,
                         val groupId:String,
                         val topic:String){
  val props  = createCunsumerConfig(brokers,groupId)
  val consumer = new KafkaConsumer[String,String](props)

  def shutdown() = {  // 关闭函数
    if(consumer != null)
      consumer.close()
  }
  // 定义类中的方法
  def createCunsumerConfig(brokers: String,groupId: String): Properties ={  // 定义类中的方法
    val props = new Properties()
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers) // 设置kafaka集群地址
    props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId)
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true") // 设置自动提交
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000")
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000")
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")
    return props
  }
  // 定义无返回值的方法
  def run()={
    consumer.subscribe(Collections.singletonList(this.topic)) // 一个topic主题数组
    Executors.newSingleThreadExecutor.execute(new Runnable {
      override def run(): Unit = {  // 覆盖run方法
        //不断拉取指定Topic的数据,并打印输出查看+ T
        while (true) {
          val records = consumer.poll(Duration.ofSeconds(1000)) // 1秒钟拉一次
          for (record <- records) {
            System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset())
          }
        }
      }
    })
  }
}
object ProducerConsumer extends App {  // 伴生对象
  override def main(args: Array[String]): Unit = {
    val brokers = "192.168.30.21:9092,192.168.30.22:9092,192.168.30.23:9092"
    val groupId ="KafkaProducerConsumerGroup"
    val topic ="t01"
    val example = new ProducerConsumer(brokers,groupId,topic)
    example.run()
  }
}

结果:

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-01 12:00:29  更:2021-09-01 12:01:28 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 16:56:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码