IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 大数据课程——Spark RDD 编程 -> 正文阅读

[大数据]大数据课程——Spark RDD 编程

大数据课程——Spark RDD 编程

?

实验内容以及要求

现有大约500万条搜索引擎产生的记录,数据格式如下:
在这里插入图片描述
每一行包含6个字段:
字段1代表数据产生的时间;
字段2代表用户,即UID;
字段3代表用户搜索关键词;
字段4代表URL超链接在返回结果中的排名;
字段5代表用户单击超链接的顺序号;
字段6代表用户单击的URL超链接的地址。
请编写Scala程序,实现如下功能:
(1)统计用户数量,输出格式如下:
在这里插入图片描述
(2)统计搜索次数在20次及以上的用户UID及搜索次数,输出格式(按照搜索次数降序排列,搜索次数相同时按UID升序排列)如下:
在这里插入图片描述
注意:
(1)数据请上传至HDFS;
(2)统计结果保存至HDFS。

?

问题总结

首次使用Scala语言进行编程

本次实验对于任务的思路来说,其实并不难,跟之前的某次实验是一样的。但难点主要在于对于Scala语言编程的首次使用,由于对Scala的语法不熟悉成为了本次实验的难点,比如不知道怎么写函数,怎么调用函数等。

?

节点拒绝连接

在实验过程中,本人猜测是zookeeper的问题,可能某个节点宕掉,导致运行结果不完整,会报“拒绝连接”的错误。如下图所示。后来重启了几次就解决了。
在这里插入图片描述
在这里插入图片描述

?
?

任务一

实现思路

得到数据后,首先进行数据分割,提取字段1,得到所有用户RDD。但其中会存在重复的用户(因为一个用户会进行多次搜索),所以要进行去重操作,除去重复的用户数据。最后添加行号,调整排序得到最终结果,最后一名用户的编号即为总人数。
?

核心代码(详细代码放在文末)

在这里插入图片描述

?

运行结果

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
?

任务二

实现思路

得到数据后,进行分割,提取字段1,并且将字段1和数字1组成一个元组作为一条新的记录(因为每个记录代表该用户进行了一次搜索)。
合并相同key值的记录,将其value值相加,统计该用户的总共的搜索次数。
使用二次排序,将统计结果进行排序,最后输出搜索次数大于等于20的记录。

?

核心代码

在这里插入图片描述

?

运行结果

在这里插入图片描述

在这里插入图片描述
运行中的图片

?

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

?

具体代码

MySortClass.scala

package spark.demo

/*
* 该class用于辅助实现二次排序
* */
class MySortClass(val x:String, val y:Int) extends Serializable with Ordered[MySortClass] {
  override def compare(that: MySortClass): Int = {
    if (!this.y.equals(that.y)) {
      this.y - that.y
    }
    else {
      this.x.hashCode - that.x.hashCode
    }
  }
}

?

SearchCount.scala

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import spark.demo.MySortClass

object SearchCount{
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象,存储应用程序的配置信息
    val conf = new SparkConf()
    //设置名称
    conf.setAppName("My-Spark-Work")
    //设置集群master节点访问地址
    conf.setMaster("spark://centos01:7077");

    //创建SparkContext对象,该对象是提交Spark程序的入口
    val sc = new SparkContext();

    //分割数据
    val linesRDD:RDD[String] = sc.textFile(args(0))
    val usersRDD:RDD[String] = linesRDD.map(_.split("\t")(1))
    val paresRDD:RDD[(String, Int)] = usersRDD.map((_,1))
    //合并用户记录
    val userCountsRDD:RDD[(String, Int)] = paresRDD.reduceByKey((x,y)=>x+y)
    //排序
    val tmpRDD = userCountsRDD.map(a =>(new MySortClass(a._1, a._2), a))
    val userCountsSortRDD = tmpRDD.sortByKey()
//    val resultRDD = userCountsSortRDD.map(a=>a._2)
    //如果大于等于20,则返回结果
    val resultRDD = userCountsSortRDD.map(a=>{if(a._2._2>=20)a._2})
    resultRDD.saveAsTextFile(args(1))
    sc.stop();
  }
}

?

UserCount.scala

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object UserCount {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象,存储应用程序的配置信息
    val conf = new SparkConf()
    //设置名称
    conf.setAppName("My-Spark-Work")
    //设置集群master节点访问地址
    conf.setMaster("spark://centos01:7077");

    //创建SparkContext对象,该对象是提交Spark程序的入口
    val sc = new SparkContext();
    //读取指定路径,生成RDD集合
    val linesRDD:RDD[String] = sc.textFile(args(0))

    //得到用户列表
    val test = "qwe"
    val usersRDD:RDD[String] = linesRDD.map(_.split("\t")(1))
    //去除重复用户
    val tmpRDD:RDD[(String, Int)] = usersRDD.map((_, 1))
    val reduceRDD:RDD[(String, Int)] = tmpRDD.reduceByKey((x,y)=>x+y)
    //去除辅助value
    val reduceUserRDD:RDD[String] = reduceRDD.map(line=>{line._1})
    //添加行号
    val indexRDD:RDD[(String, Long)] = reduceUserRDD.zipWithIndex().map{case(x, y)=>(x, y+1)}
    //调整排序得到结果
    val resultRDD:RDD[(Long, String)] = indexRDD.map(line => {(line._2, line._1)})

    //保存结果到指定路径(取第二运行参数)
    resultRDD.saveAsTextFile(args(1))
    sc.stop();
  }
}

?

WordCount.scala

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object WordCount {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象,存储应用程序的配置信息
    val conf = new SparkConf()
    //设置名称
    conf.setAppName("My-Spark-Work")
    //设置集群master节点访问地址
    conf.setMaster("spark://centos01:7077");

    //创建SparkContext对象,该对象是提交Spark程序的入口
    val sc = new SparkContext();

    //读取指定路径,生成RDD集合
    val linesRDD:RDD[String] = sc.textFile(args(0))
    //将RDD每个元素拆分,合并成新的RDD
    val wordsRDD:RDD[String] = linesRDD.flatMap(_.split(" "))
    //将RDD中每个单词和数字1组成元组
    val paresRDD:RDD[(String, Int)] = wordsRDD.map((_, 1))
    //对单词根据key进行聚合,对相同key进行value累加
    val wordCountsRDD:RDD[(String, Int)] = paresRDD.reduceByKey(_+_)
    //排序
    val wordCountsSortRDD:RDD[(String, Int)] = wordCountsRDD.sortBy(_._2, false)
    //保存结果到指定路径(取第二运行参数)
    wordCountsSortRDD.saveAsTextFile(args(1))
    sc.stop();
  }
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-06-08 19:06:49  更:2022-06-08 19:07:13 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 4:38:31-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码