IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> spark编程应用 -> 正文阅读

[大数据]spark编程应用

需要有Hadoop、zookeeper和spark的环境,配置详细请查阅本专栏的其他篇幅

搭建Scala-IDE环境

1、下载安装开发包
在这里插入图片描述
由于当前有个eclipse-ide,所以Scala-ide要改个名,不然冲突

hadoop@ddai-desktop:~$ sudo tar xvzf /home/hadoop/scala-SDK-4.7.0-vfinal-2.12-linux.gtk.x86_64.tar.gz 
hadoop@ddai-desktop:~$ sudo chown -R hadoop:hadoop eclipse/
hadoop@ddai-desktop:~$ sudo mv eclipse /opt/eclipse2

2、从主节点复制spark和Scala到desktop

hadoop@ddai-desktop:~$ sudo scp -r hadoop@ddai-master:/opt/scala-2.12.11 /opt
hadoop@ddai-desktop:~$ sudo scp -r hadoop@ddai-master:/opt/spark-2.1.0-bin-hadoop2.7 /opt
hadoop@ddai-desktop:~$ sudo chown -R hadoop:hadoop /opt/scala-2.12.11/
hadoop@ddai-desktop:~$ sudo chown -R hadoop:hadoop /opt/spark-2.1.0-bin-hadoop2.7/

3、修改环境变量并生效

hadoop@ddai-desktop:~$ vim /home/hadoop/.profile 
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export SCALA_HOME=/opt/scala-2.12.11
export SPARK_HOME=/opt/spark-2.1.0-bin-hadoop2.7
export PATH=$PATH:$SCALA_HOME/bin:$SPARK_HOME/bin

在这里插入图片描述

单词计数编程

使用Scala语言实现

在图形窗口运行Scala-Ide
在这里插入图片描述

选择“New”→“Scala Project”,输入项目名(Project Name):ScalaWC

右击“ScalaWC”→“src”,单击“New”→“Scala Class”,输入Name:WordCount,输入代码

import org.apache.spark.sql.SparkSession
object WordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: WordCount <file>")
      System.exit(1)
     }
    val spark = SparkSession
      .builder
      .appName("Scala WordCount")
      .getOrCreate()
    val line = spark.read.textFile(args(0)).rdd
    line.flatMap(_.split(" ")).map((_, 1))
      .reduceByKey(_ + _).collect().foreach(println)
    spark.stop()
  }
}

添加“/opt/spark-2.1.0-bin-hadoop2.7/jars”目录下所有jar包
在这里插入图片描述

右击“ScalaWC”,选择“Scala”,单击“Set the Scala Installation”,选择“Fix Scala Installation: 2.10.6(bundled)
在这里插入图片描述

右击“ScalaWC”→“src”→“WordCount.scala”,单击“Export…”,选择“Java”→“JAR File”,导出ScalaWC.jar包

运行ScalaWC.jar
在这里插入图片描述

hadoop@ddai-desktop:~/workspace1$ spark-submit --master spark://ddai-master:7077 --class WordCount ScalaWC.jar /input > WordCount.txt

查看运行结果
在这里插入图片描述

Java语言实现

图形窗口运行Scala-Ide
选择“New”→“Other…”,选择“Java Project”,输入项目名(Project Name):JavaWC
单击“JavaWC”→“src”,单击“New”→“Class”,输入Name:WordCount,输入代码

import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
public final class WordCount {
  private static final Pattern SPACE = Pattern.compile(" ");
  public static void main(String[] args) throws Exception {
    if (args.length < 1) {
      System.err.println("Usage: JavaWordCount <file>");
      System.exit(1);
    }
    SparkSession spark = SparkSession
      .builder()
      .appName("JavaWordCount")
      .getOrCreate();
    JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
    JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterator<String> call(String s) {
        return Arrays.asList(SPACE.split(s)).iterator();
      }
    });
    JavaPairRDD<String, Integer> ones = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<>(s, 1);
        }
      });
    JavaPairRDD<String, Integer> counts = ones.reduceByKey(
      new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      });
    List<Tuple2<String, Integer>> output = counts.collect();
    for (Tuple2<?,?> tuple : output) {
      System.out.println(tuple._1() + ": " + tuple._2());
    }
    spark.stop();
  }
}

添加“/opt/spark-2.1.0-bin-hadoop2.7/jars”目录下所有jar包
右击“JavaWC”→“src”→“WordCount.java”,单击“Export…”,选择“Java”→“JAR File”,导出JavaWC.jar包
运行JavaWC.jar

hadoop@ddai-desktop:~/workspace1$ spark-submit --master spark://ddai-master:7077 --class WordCount JavaWC.jar /input >WordCount2.txt

查看运行结果
在这里插入图片描述

python语言实现

需要先搭建python工具,请先看下一节

from __future__ import print_function
import sys
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Usage: wordcount <file>", file=sys.stderr)
        exit(-1)
    spark = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .getOrCreate()
    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
    output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))
    spark.stop()
hadoop@ddai-desktop:~/workspace1$ spark-submit --master spark://ddai-master:7077 --name PythonWC WordCount.py /input > WordCount3.txt

搭建python环境

各集群节点安装python相关组件
hadoop@ddai-master:~$ sudo apt-get install g++
hadoop@ddai-master:~$ sudo apt-get install gfortran
hadoop@ddai-master:~$ sudo apt-get install python-numpy 
hadoop@ddai-master:~$ sudo apt-get install python3-scipy 
hadoop@ddai-master:~$ sudo apt-get install python3-pandas
hadoop@ddai-master:~$ sudo apt-get install python3-sklearn
hadoop@ddai-master:~$ sudo apt-get install python-dap  python-sklearn-doc ipython3
开发客户端安装python可视化组件
hadoop@ddai-desktop:~$ sudo apt-get install g++
hadoop@ddai-desktop:~$ sudo apt-get install gfortran
hadoop@ddai-desktop:~$ sudo apt-get install python3-pip
hadoop@ddai-desktop:~$ sudo apt-get install python-numpy 
hadoop@ddai-desktop:~$ sudo apt-get install python3-scipy 
hadoop@ddai-desktop:~$ sudo apt-get install python3-matplotlib
hadoop@ddai-desktop:~$ sudo apt-get install python3-pandas
hadoop@ddai-desktop:~$ sudo apt-get install python3-sklearn
hadoop@ddai-desktop:~$ sudo apt-get install python-dap  python-sklearn-doc ipython3

试用期30天的
在这里插入图片描述

hadoop@ddai-desktop:~$ sudo tar xzvf /home/hadoop/pycharm-professional-2019.3.5.tar.gz 
hadoop@ddai-desktop:~$ sudo chown -R hadoop:hadoop /opt/pycharm-2019.3.5/

实现k-means聚类算法

hadoop@ddai-desktop:~$ /opt/pycharm-2019.3.5/bin/pycharm.sh 

创建一个使用的空间,我在根目录创建了个workspace2使用

#coding=utf-8
from __future__ import print_function
import sys
import numpy as np
from pyspark.sql import SparkSession
# 把读入的数据都转化为float类型的数据
def parseVector(line):
    return np.array([float(x) for x in line.split(' ')])
# 求取该点应该分到哪个点集中去,返回的是序号
def closestPoint(p, centers):
    bestIndex = 0
    closest = float("+inf")
    for i in range(len(centers)):
        tempDist = np.sum((p - centers[i]) ** 2)
        # 找出p最靠近那个点
        if tempDist < closest:
            closest = tempDist
            bestIndex = i
    return bestIndex
if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("Usage: kmeans <file> <k> <convergeDist>", file=sys.stderr)
        exit(-1)
    spark = SparkSession\
        .builder\
        .appName("PythonKMeans")\
        .getOrCreate()
    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    # 分割数据
    data = lines.map(parseVector).cache()
    K = int(sys.argv[2])
    # 阈值
    convergeDist = float(sys.argv[3])
    # 初始化K个默认的点,False表示不应大于K,1为种子。
    kPoints = data.takeSample(False, K, 1)
    tempDist = 1.0
    # 距离差大于阈值循环
    while tempDist > convergeDist:
        # 对所有数据执行map过程,最终生成的是(index, (point, 1))的rdd
        closest = data.map(
            lambda p: (closestPoint(p, kPoints), (p, 1)))
        # 执行reduce过程,把每一个K的集合的距离相加,生成(sum,num)的rdd
        pointStats = closest.reduceByKey(
            lambda p1_c1, p2_c2: (p1_c1[0] + p2_c2[0], p1_c1[1] + p2_c2[1]))
        # 生成新的聚类中心点
        newPoints = pointStats.map(
            # st[0] => K,st[1][0]距离和,st[1][0]/st[1][1]平均距离
            lambda st: (st[0], st[1][0] / st[1][1])).collect()
        # 计算一下新旧的聚类中心点的距离差的和
        tempDist = sum(np.sum((kPoints[iK] - p) ** 2) for (iK, p) in newPoints)
        for (iK, p) in newPoints:
            # 保存为聚类中心点
            kPoints[iK] = p
    print("Final centers: " + str(kPoints))
    spark.stop()
hadoop@ddai-desktop:~/workspace2$ vim kmeans.txt
hadoop@ddai-desktop:~/workspace2$ more kmeans.txt 
1.30 2.00
2.00 1.60
3.00 2.40
4.00 5.60
5.00 4.70
6.00 5.00
7.00 2.00
8.00 2.50
8.70 1.80
hadoop@ddai-desktop:~/workspace2$ hdfs dfs -put kmeans.txt /test

hadoop@ddai-desktop:~/workspace2$ hdfs dfs -cat /test/kmeans.txt
1.30 2.00
2.00 1.60
3.00 2.40
4.00 5.60
5.00 4.70
6.00 5.00
7.00 2.00
8.00 2.50
8.70 1.80

bug

并没有找到python
在这里插入图片描述
经过查看,发现只有python3,所以pyspark需要改成python3来进行运行
在这里插入图片描述
导入环境变量进行修改

export PYSPARK_PYTHON=/usr/bin/python3
export PYSPARK_DRIVER_PYTHON=/usr/bin/python3

在这里插入图片描述

hadoop@ddai-desktop:~/workspace2$ spark-submit --master spark://ddai-master:7077 --name kmeans kmeans.py /test/kmeans.txt 3 0.001 > kmean.txt

bug2观察中

python版本太高,需降到3.5以下版本,现默认是3.8版本,需手动配置,建议使用低版本的ubuntu16自带python2

详细请看:https://blog.csdn.net/gdkyxy2013/article/details/80164773
在这里插入图片描述

误操作删除python版本后出问题请看:https://blog.csdn.net/stickmangod/article/details/85316142/
python3.5版本转换详解请看:https://blog.csdn.net/qq_27657429/article/details/53482595
自行下载python包编译安装请看:https://blog.csdn.net/ashiners/article/details/118754917

修改好后记得上一步的环境配置也要修改成3.5
在这里插入图片描述

 pip install --user --upgrade numpy   #将numpy更新到最新版本

再次去workspace2执行spark-submit命令

显示

Final centers:[array([2.1, 2. ]), array([7.9, 2.1]), array([5. , 5.1])]
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-18 12:46:21  更:2021-08-18 12:47:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:21:08-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码