需要有Hadoop、zookeeper和spark的环境,配置详细请查阅本专栏的其他篇幅
搭建Scala-IDE环境
1、下载安装开发包 由于当前有个eclipse-ide,所以Scala-ide要改个名,不然冲突
hadoop@ddai-desktop:~$ sudo tar xvzf /home/hadoop/scala-SDK-4.7.0-vfinal-2.12-linux.gtk.x86_64.tar.gz
hadoop@ddai-desktop:~$ sudo chown -R hadoop:hadoop eclipse/
hadoop@ddai-desktop:~$ sudo mv eclipse /opt/eclipse2
2、从主节点复制spark和Scala到desktop
hadoop@ddai-desktop:~$ sudo scp -r hadoop@ddai-master:/opt/scala-2.12.11 /opt
hadoop@ddai-desktop:~$ sudo scp -r hadoop@ddai-master:/opt/spark-2.1.0-bin-hadoop2.7 /opt
hadoop@ddai-desktop:~$ sudo chown -R hadoop:hadoop /opt/scala-2.12.11/
hadoop@ddai-desktop:~$ sudo chown -R hadoop:hadoop /opt/spark-2.1.0-bin-hadoop2.7/
3、修改环境变量并生效
hadoop@ddai-desktop:~$ vim /home/hadoop/.profile
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export SCALA_HOME=/opt/scala-2.12.11
export SPARK_HOME=/opt/spark-2.1.0-bin-hadoop2.7
export PATH=$PATH:$SCALA_HOME/bin:$SPARK_HOME/bin
单词计数编程
使用Scala语言实现
在图形窗口运行Scala-Ide
选择“New”→“Scala Project”,输入项目名(Project Name):ScalaWC
右击“ScalaWC”→“src”,单击“New”→“Scala Class”,输入Name:WordCount,输入代码
import org.apache.spark.sql.SparkSession
object WordCount {
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: WordCount <file>")
System.exit(1)
}
val spark = SparkSession
.builder
.appName("Scala WordCount")
.getOrCreate()
val line = spark.read.textFile(args(0)).rdd
line.flatMap(_.split(" ")).map((_, 1))
.reduceByKey(_ + _).collect().foreach(println)
spark.stop()
}
}
添加“/opt/spark-2.1.0-bin-hadoop2.7/jars”目录下所有jar包
右击“ScalaWC”,选择“Scala”,单击“Set the Scala Installation”,选择“Fix Scala Installation: 2.10.6(bundled)
右击“ScalaWC”→“src”→“WordCount.scala”,单击“Export…”,选择“Java”→“JAR File”,导出ScalaWC.jar包
运行ScalaWC.jar
hadoop@ddai-desktop:~/workspace1$ spark-submit --master spark://ddai-master:7077 --class WordCount ScalaWC.jar /input > WordCount.txt
查看运行结果
Java语言实现
图形窗口运行Scala-Ide 选择“New”→“Other…”,选择“Java Project”,输入项目名(Project Name):JavaWC 单击“JavaWC”→“src”,单击“New”→“Class”,输入Name:WordCount,输入代码
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
public final class WordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: JavaWordCount <file>");
System.exit(1);
}
SparkSession spark = SparkSession
.builder()
.appName("JavaWordCount")
.getOrCreate();
JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) {
return Arrays.asList(SPACE.split(s)).iterator();
}
});
JavaPairRDD<String, Integer> ones = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
spark.stop();
}
}
添加“/opt/spark-2.1.0-bin-hadoop2.7/jars”目录下所有jar包 右击“JavaWC”→“src”→“WordCount.java”,单击“Export…”,选择“Java”→“JAR File”,导出JavaWC.jar包 运行JavaWC.jar
hadoop@ddai-desktop:~/workspace1$ spark-submit --master spark://ddai-master:7077 --class WordCount JavaWC.jar /input >WordCount2.txt
查看运行结果
python语言实现
需要先搭建python工具,请先看下一节
from __future__ import print_function
import sys
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: wordcount <file>", file=sys.stderr)
exit(-1)
spark = SparkSession\
.builder\
.appName("PythonWordCount")\
.getOrCreate()
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
spark.stop()
hadoop@ddai-desktop:~/workspace1$ spark-submit --master spark://ddai-master:7077 --name PythonWC WordCount.py /input > WordCount3.txt
搭建python环境
各集群节点安装python相关组件
hadoop@ddai-master:~$ sudo apt-get install g++
hadoop@ddai-master:~$ sudo apt-get install gfortran
hadoop@ddai-master:~$ sudo apt-get install python-numpy
hadoop@ddai-master:~$ sudo apt-get install python3-scipy
hadoop@ddai-master:~$ sudo apt-get install python3-pandas
hadoop@ddai-master:~$ sudo apt-get install python3-sklearn
hadoop@ddai-master:~$ sudo apt-get install python-dap python-sklearn-doc ipython3
开发客户端安装python可视化组件
hadoop@ddai-desktop:~$ sudo apt-get install g++
hadoop@ddai-desktop:~$ sudo apt-get install gfortran
hadoop@ddai-desktop:~$ sudo apt-get install python3-pip
hadoop@ddai-desktop:~$ sudo apt-get install python-numpy
hadoop@ddai-desktop:~$ sudo apt-get install python3-scipy
hadoop@ddai-desktop:~$ sudo apt-get install python3-matplotlib
hadoop@ddai-desktop:~$ sudo apt-get install python3-pandas
hadoop@ddai-desktop:~$ sudo apt-get install python3-sklearn
hadoop@ddai-desktop:~$ sudo apt-get install python-dap python-sklearn-doc ipython3
试用期30天的
hadoop@ddai-desktop:~$ sudo tar xzvf /home/hadoop/pycharm-professional-2019.3.5.tar.gz
hadoop@ddai-desktop:~$ sudo chown -R hadoop:hadoop /opt/pycharm-2019.3.5/
实现k-means聚类算法
hadoop@ddai-desktop:~$ /opt/pycharm-2019.3.5/bin/pycharm.sh
创建一个使用的空间,我在根目录创建了个workspace2使用
from __future__ import print_function
import sys
import numpy as np
from pyspark.sql import SparkSession
def parseVector(line):
return np.array([float(x) for x in line.split(' ')])
def closestPoint(p, centers):
bestIndex = 0
closest = float("+inf")
for i in range(len(centers)):
tempDist = np.sum((p - centers[i]) ** 2)
if tempDist < closest:
closest = tempDist
bestIndex = i
return bestIndex
if __name__ == "__main__":
if len(sys.argv) != 4:
print("Usage: kmeans <file> <k> <convergeDist>", file=sys.stderr)
exit(-1)
spark = SparkSession\
.builder\
.appName("PythonKMeans")\
.getOrCreate()
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
data = lines.map(parseVector).cache()
K = int(sys.argv[2])
convergeDist = float(sys.argv[3])
kPoints = data.takeSample(False, K, 1)
tempDist = 1.0
while tempDist > convergeDist:
closest = data.map(
lambda p: (closestPoint(p, kPoints), (p, 1)))
pointStats = closest.reduceByKey(
lambda p1_c1, p2_c2: (p1_c1[0] + p2_c2[0], p1_c1[1] + p2_c2[1]))
newPoints = pointStats.map(
lambda st: (st[0], st[1][0] / st[1][1])).collect()
tempDist = sum(np.sum((kPoints[iK] - p) ** 2) for (iK, p) in newPoints)
for (iK, p) in newPoints:
kPoints[iK] = p
print("Final centers: " + str(kPoints))
spark.stop()
hadoop@ddai-desktop:~/workspace2$ vim kmeans.txt
hadoop@ddai-desktop:~/workspace2$ more kmeans.txt
1.30 2.00
2.00 1.60
3.00 2.40
4.00 5.60
5.00 4.70
6.00 5.00
7.00 2.00
8.00 2.50
8.70 1.80
hadoop@ddai-desktop:~/workspace2$ hdfs dfs -put kmeans.txt /test
hadoop@ddai-desktop:~/workspace2$ hdfs dfs -cat /test/kmeans.txt
1.30 2.00
2.00 1.60
3.00 2.40
4.00 5.60
5.00 4.70
6.00 5.00
7.00 2.00
8.00 2.50
8.70 1.80
bug
并没有找到python 经过查看,发现只有python3,所以pyspark需要改成python3来进行运行 导入环境变量进行修改
export PYSPARK_PYTHON=/usr/bin/python3
export PYSPARK_DRIVER_PYTHON=/usr/bin/python3
hadoop@ddai-desktop:~/workspace2$ spark-submit --master spark://ddai-master:7077 --name kmeans kmeans.py /test/kmeans.txt 3 0.001 > kmean.txt
bug2观察中
python版本太高,需降到3.5以下版本,现默认是3.8版本,需手动配置,建议使用低版本的ubuntu16自带python2
详细请看:https://blog.csdn.net/gdkyxy2013/article/details/80164773
误操作删除python版本后出问题请看:https://blog.csdn.net/stickmangod/article/details/85316142/ python3.5版本转换详解请看:https://blog.csdn.net/qq_27657429/article/details/53482595 自行下载python包编译安装请看:https://blog.csdn.net/ashiners/article/details/118754917
修改好后记得上一步的环境配置也要修改成3.5
pip install --user --upgrade numpy
再次去workspace2执行spark-submit命令
显示
Final centers:[array([2.1, 2. ]), array([7.9, 2.1]), array([5. , 5.1])]
|