前提:集群已经搭建好hadoop,启动集群。
cd /usr/hadoop/hadoop-2.7.3/
sbin/start-all.sh
1、数据准备
首先我们需要有数据文档,1.data、The_Man_of_Property.txt。
head -3 1.data
2、map创建的初始
创建map.py,vi map.py
import sys
import time
import re
p = re.compile(r'\w+')
for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
time.sleep(1)
if s.strip() != "":
print(type(ss))
print(ss)
我们需要的map结果是需要统计次数的; 因此输出要改为:print("%s\t%s" % (s,1)) ,注释掉print(type(ss)) 。 执行:cat 1.data | python map.py 结果显示如下:
3、在map.py基础下创建red.py
import sys
current_word = None
sum = 0
for line in sys.stdin:
word, val = line.strip().split("\t")
if current_word == None:
current_word = word
if current_word != word:
print("%s\t%s" % (current_word, sum))
current_word = word
sum = 0
sum += int(val)
print("%s\t%s" % (current_word, str(sum)))
一个例子:
map输出:
current_word = None
sum = 0
is 1 cur=is word=is sum=0+1=1
is 1 cur=is word=is sum=1+1=2
is 1 cur=is word=today sum=2+1=3 => is 3
today 1 cur=today word=good sum=0+1=1 => today 1
good 1 cur=good sum=0+1=1 => goog 1
进行测试map,red,先注释掉time.sleep(1) 执行:cat 1.data | python map.py | sort -k1 | python red.py 结果如下:
4、map.py 重修版
通过正则表达findall(s),消除字符。 这个存在符号,不符合我们想要的结果。重新编辑map.py
import sys
import time
import re
p = re.compile(r'\w+')
for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
if len(p.findall(s)) < 1:
continue
s = p.findall(s)[0].lower()
if s.strip() != "":
print ("%s\t%s" % (s, 1))
修改完map.py,执行cat 1.data | python map.py | sort -k1 | python red.py 结果如下: 这个结果才是我们想要的!!!
5、正确完整代码:
map.py,不能有#注释解释。
import sys
import time
import re
p = re.compile(r'\w+')
for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
if len(p.findall(s)) < 1:
continue
s = p.findall(s)[0].lower()
if s.strip() != "":
print ("%s\t%s" % (s, 1))
red.py 代码, 不能有#注释解释。
import sys
current_word = None
sum = 0
for line in sys.stdin:
word, val = line.strip().split("\t")
if current_word == None:
current_word = word
if current_word != word:
print("%s\t%s" % (current_word, sum))
current_word = word
sum = 0
sum += int(val)
print("%s\t%s" % (current_word, str(sum)))
6、用脚本run.sh 一步到位 执行map.py与red.py
6.1 上传数据文档到hdfs
hadoop fs -mkdir /test
hadoop fs -mkdir /output
fs -put ./1.data /test/
fs -put ./The_Man_of_Property.txt /test/
hadoop fs -ls /test
6.2 创建 run.sh脚本
vi run.sh 内容如下:
HADOOP_CMD="/usr/hadoop/hadoop-2.7.3/bin/hadoop"
STREAM_JAR_PATH="/usr/hadoop/hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar"
INPUT_FILE_PATH_1="/test/The_Man_of_Property.txt"
OUTPUT_PATH="/output/file_broadcast"
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map.py" \
-reducer "python red.py" \
-file ./map.py \
-file ./red.py
脚本执行:sh -x run.sh 可以看到输出路径已经有文件了,我们注释掉 # $HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH。再执行 sh -x run.sh 由上图已经知道map,reduce 已经执行成功,证明这个run.sh 脚步已经一步到位。查看新的数据结果。
hadoop fs -ls /output
hadoop fs -ls /output/file_broadcast
fs -cat /output/file_broadcast/part* | tail
7、总结:
对wordcount有更一步的理解,为什么不用IDE、pycharm呢? 这是考虑到企业以前员工会在服务器进行需求操作,因此在节点进行操作锻炼。
一些小知识点:
查找文件:* --> 匹配任何字符
find /usr/hadoop -name "hadoop-streaming*"
需求:sh脚本内容有streaming,要查有关于streaming在脚本的内容
find ./ -name "*.sh" | xargs grep "streaming"
history 可以显示自己的历史输入命令
|