IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> spark数据分析 -> 正文阅读

[大数据]spark数据分析

spark案例

1、sparkpi

hadoop@ddai-master:/opt/spark-2.1.0-bin-hadoop2.7$ vim SparkPi
hadoop@ddai-master:/opt/spark-2.1.0-bin-hadoop2.7$ run-example SparkPi 10 > SparkPi.txt
object SparkPi {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder
      .appName("Spark Pi")
      .getOrCreate()
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / (n - 1))
    spark.stop()
  }
}
hadoop@ddai-master:/opt/spark-2.1.0-bin-hadoop2.7$ more SparkPi.txt 

Pi is roughly 3.1427711427711427

2、单词计数

对/opt/spark-2.1.0-bin-hadoop2.7/examples/src/main/java/org/apache/spark/examples目录中的JavaWordCount,java进行编译

详细请查阅:http://www.360doc.com/content/17/0323/17/41381374_639514974.shtml

运行

hadoop@ddai-master:/opt/spark-2.1.0-bin-hadoop2.7$ bin/spark-submit --ddai-master spark://ddai-master:7077 --class org.apache.spark.examples.JavaWordCount --executor-memory 2g examples/jars/spark-examples_2.11-2.1.0.jar /input > WordCount.txt

代码详解

public final class JavaWordCount {
  private static final Pattern SPACE = Pattern.compile(" ");
  public static void main(String[] args) throws Exception {
    if (args.length < 1) {
      System.err.println("Usage: JavaWordCount <file>");
      System.exit(1);
    }
    //SparkSession为用户提供了一个统一的切入点来使用Spark的各项功能,
    //SparkSession的设计遵循了工厂设计模式(factory design pattern)。
    SparkSession spark = SparkSession
      .builder()
      .appName("JavaWordCount")
      .getOrCreate();
    //创建完SparkSession之后,我们就可以使用它来读取数据,
    //textFile()读取数据,javaRDD()定义一个RDD,返回每一行作为一个元素的RDD。
    JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
    //FlatMapFunction<String, String>第一个参数为传入的内容,
    //第二个参数为函数操作完后返回的结果类型,将每一行映射成多个单词。
    JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterator<String> call(String s) {
        return Arrays.asList(SPACE.split(s)).iterator();
      }
    });
    //map操作:将单词转化成(word, 1)对
    //PairFunction<String, String, Integer>第一个参数为内容,
    //第三个参数为函数操作完后返回的结果类型。
    JavaPairRDD<String, Integer> ones = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<>(s, 1);
        }
      });
    //reduce操作: 分组并按键值添加对以产生计数。
    JavaPairRDD<String, Integer> counts = ones.reduceByKey(
      new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      });
    //完成单词集合计算,返回整个RDD。
    List<Tuple2<String, Integer>> output = counts.collect();
    for (Tuple2<?,?> tuple : output) {
      System.out.println(tuple._1() + ": " + tuple._2());
    }
    //Spark服务关闭
    spark.stop();
  }
}

spark函数

在这里插入图片描述

hadoop@ddai-master:~$ hdfs dfs -mkdir /spark
hadoop@ddai-master:~$ hdfs dfs -put spam.data /spark
hadoop@ddai-master:~$ hdfs dfs -text /spark/spam.data

在这里插入图片描述
在这里插入图片描述

map:文本映射成双精度

scala> val nums = inFile.map(x=>x.split(’ ').map(_.toDouble))
nums: org.apache.spark.rdd.RDD[Array[Double]] = MapPartitionsRDD[2] at map at <console>:26

scala> nums.first()
res1: Array[Double] = Array(0.0, 0.64, 0.64, 0.0, 0.32, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.64, 0.0, 0.0, 0.0, 0.32, 0.0, 1.29, 1.93, 0.0, 0.96, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.778, 0.0, 0.0, 3.756, 61.0, 278.0, 1.0)

collect:list转化成rdd

scala> val rdd = sc.parallelize(List(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> val mapRdd = rdd.map(2*_)
mapRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:26
scala> mapRdd.collect
res2: Array[Int] = Array(2, 4, 6, 8, 10)

filter:数据过滤

scala> val filterRdd = mapRdd.filter(_>5)
filterRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at filter at <console>:28
scala> filterRdd.collect
res3: Array[Int] = Array(6, 8, 10)

flatMap:将单词转化成(key, value)对

scala> val rdd = sc.textFile("/input")
rdd: org.apache.spark.rdd.RDD[String] = /input MapPartitionsRDD[30] at textFile at :24
scala> rdd.cache
res12: rdd.type = /input MapPartitionsRDD[30] at textFile at :24
scala> val wordCount = rdd.flatMap(.split(’ ')).map(x=>(x,1)).reduceByKey(+_)
wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[33] at reduceByKey at :26
scala> wordCount.collect
res13: Array[(String, Int)] = Array((little.,1), (between,1), (station,1), (too,2), (hadoop,3), (a,1), (good,1), (much,1), (is,1), (hello,2), (bye,1), (world,1), (Happiness,1), (and,1), (way,1))
scala> wordCount.saveAsTextFile("/output/w")

在这里插入图片描述

union:联合运算

scala> val rdd1 = sc.parallelize(List((‘a’,1),(‘a’,2)))
rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List((‘b’,1),(‘b’,2)))
rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> rdd1 union rdd2
res4: org.apache.spark.rdd.RDD[(Char, Int)] = UnionRDD[8] at union at <console>:29

scala> res4.collect
res7: Array[(Char, Int)] = Array((a,1), (a,2), (b,1), (b,2))

join:连接运算

scala> val rdd1 = sc.parallelize(List((‘a’,1),(‘a’,2),(‘b’,3),(‘b’,4)))
rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[9] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List((‘a’,5),(‘a’,6),(‘b’,7),(‘b’,8)))
rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala> val rdd3 = rdd1 join rdd2
rdd3: org.apache.spark.rdd.RDD[(Char, (Int, Int))] = MapPartitionsRDD[13] at join at <console>:28
scala> rdd3.collect
res8: Array[(Char, (Int, Int))] = Array((b,(3,7)), (b,(3,8)), (b,(4,7)), (b,(4,8)), (a,(1,5)), (a,(1,6)), (a,(2,5)), (a,(2,6)))

lookup:按key查找values

scala> var rdd1 = sc.parallelize(List((‘a’,1),(‘a’,2),(‘b’,3),(‘b’,4)))
rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[14] at parallelize at <console>:24
scala> rdd1.lookup(‘a’)
res9: Seq[Int] = WrappedArray(1, 2)

groupByKey:按key值进行分组

scala> var wc = sc.textFile("/input").flatMap(.split(’ ')).map((,1)).groupByKey
wc: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[21] at groupByKey at <console>:24
scala> wc.collect
res10: Array[(String, Iterable[Int])] = Array((little.,CompactBuffer(1)), (between,CompactBuffer(1)), (station,CompactBuffer(1)), (too,CompactBuffer(1, 1)), (hadoop,CompactBuffer(1, 1, 1)), (a,CompactBuffer(1)), (good,CompactBuffer(1)), (much,CompactBuffer(1)), (is,CompactBuffer(1)), (hello,CompactBuffer(1, 1)), (bye,CompactBuffer(1)), (world,CompactBuffer(1)), (Happiness,CompactBuffer(1)), (and,CompactBuffer(1)), (way,CompactBuffer(1)))

sortByKey:按key进行排序,参数false表示倒序

scala> var wc = sc.textFile("/input").flatMap(.split(’ ')).map((,1)).sortByKey(false)
wc: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[28] at sortByKey at <console>:24
scala> wc.collect
res11: Array[(String, Int)] = Array((world,1), (way,1), (too,1), (too,1), (station,1), (much,1), (little.,1), (is,1), (hello,1), (hello,1), (hadoop,1), (hadoop,1), (hadoop,1), (good,1), (bye,1), (between,1), (and,1), (a,1), (Happiness,1))

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-16 11:48:52  更:2021-08-16 11:52:07 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:03:47-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码