经过上个星期的摸索,发现了mrjob这个神器,网络上搜出来的教程都是最传统的一个map+一个reduce的Wordcount例子,具体实现参考【神经网络并行训练(上)】。 但是要实现梯度下降算法的并行化远不止这么简单,所以决定好好研究一下如何用其实现梯度下降算法并行化。
思考一:mrjob任务接受的参数只能是一个文件吗?
答案:No,mrjob任务可以接受一个或多个文件,甚至一个文件夹,在执行命令后面加n个文件就会处理n个文件。 example.通过一个简单的wordcount例子来验证一下mrjob任务接受多个文件的情况 代码如下:
from mrjob.job import MRJob
class MRWordCount(MRJob):
def mapper(self, key, line):
for word in line.split():
yield(word, 1)
def reducer(self, word, counts):
yield(word, sum(counts))
if __name__ == '__main__':
count = 1
pairs = "hello\n" \
"map\n" \
"reduce\n" \
"reduce\n" \
"hello\n" \
"world\n" \
"map\n" \
"reduce\n" \
"map\n" \
"a\n" \
"map\n"
for i in range(3):
with open('/home/wuwenjing/Downloads/wordcounttest/test' + str(count) + '.txt', 'w') as f:
f.write("%s\n" % str(pairs))
count = count + 1
MRWordCount.run()
在命令行执行任务
root@ubuntu:/opt/PycharmProjects/Hadoop/WordCount
先生成3个.txt文件 运行结果,统计了3个文件中单词的数量
root@ubuntu:/opt/PycharmProjects/Hadoop/WordCount
No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /hadoop/hadoop-2.9.2/bin...
Found hadoop binary: /hadoop/hadoop-2.9.2/bin/hadoop
Using Hadoop version 2.9.2
Looking for Hadoop streaming jar in /hadoop/hadoop-2.9.2...
Found Hadoop streaming jar: /hadoop/hadoop-2.9.2/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar
Creating temp directory /tmp/wordcount.wuwenjing.20210906.024254.744498
uploading working dir files to hdfs:///user/wuwenjing/tmp/mrjob/wordcount.wuwenjing.20210906.024254.744498/files/wd...
Copying other local files to hdfs:///user/wuwenjing/tmp/mrjob/wordcount.wuwenjing.20210906.024254.744498/files/
Running step 1 of 1...
packageJobJar: [/tmp/hadoop-unjar1871937796876565674/] [] /tmp/streamjob5389097803650413669.jar tmpDir=null
Connecting to ResourceManager at /0.0.0.0:8032
Connecting to ResourceManager at /0.0.0.0:8032
Total input files to process : 3
number of splits:3
yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
Submitting tokens for job: job_1630895970017_0001
Submitted application application_1630895970017_0001
The url to track the job: http://ubuntu:8088/proxy/application_1630895970017_0001/
Running job: job_1630895970017_0001
Job job_1630895970017_0001 running in uber mode : false
map 0% reduce 0%
map 67% reduce 0%
map 100% reduce 0%
map 100% reduce 100%
Job job_1630895970017_0001 completed successfully
Output directory: hdfs:///user/wuwenjing/tmp/mrjob/wordcount.wuwenjing.20210906.024254.744498/output
Counters: 49
File Input Format Counters
Bytes Read=174
File Output Format Counters
Bytes Written=46
File System Counters
FILE: Number of bytes read=375
FILE: Number of bytes written=812987
FILE: Number of large read operations=0
FILE: Number of read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=645
HDFS: Number of bytes written=46
HDFS: Number of large read operations=0
HDFS: Number of read operations=12
HDFS: Number of write operations=2
Job Counters
Data-local map tasks=3
Launched map tasks=3
Launched reduce tasks=1
Total megabyte-milliseconds taken by all map tasks=29351936
Total megabyte-milliseconds taken by all reduce tasks=5346304
Total time spent by all map tasks (ms)=28664
Total time spent by all maps in occupied slots (ms)=229312
Total time spent by all reduce tasks (ms)=5221
Total time spent by all reduces in occupied slots (ms)=41768
Total vcore-milliseconds taken by all map tasks=28664
Total vcore-milliseconds taken by all reduce tasks=5221
Map-Reduce Framework
CPU time spent (ms)=2820
Combine input records=0
Combine output records=0
Failed Shuffles=0
GC time elapsed (ms)=456
Input split bytes=471
Map input records=36
Map output bytes=303
Map output materialized bytes=387
Map output records=33
Merged Map outputs=3
Physical memory (bytes) snapshot=1008001024
Reduce input groups=5
Reduce input records=33
Reduce output records=5
Reduce shuffle bytes=387
Shuffled Maps =3
Spilled Records=66
Total committed heap usage (bytes)=694157312
Virtual memory (bytes) snapshot=7783809024
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
job output is in hdfs:///user/wuwenjing/tmp/mrjob/wordcount.wuwenjing.20210906.024254.744498/output
Streaming final output from hdfs:///user/wuwenjing/tmp/mrjob/wordcount.wuwenjing.20210906.024254.744498/output...
"a" 3
"hello" 6
"map" 12
"reduce" 9
"world" 3
Removing HDFS temp directory hdfs:///user/wuwenjing/tmp/mrjob/wordcount.wuwenjing.20210906.024254.744498...
Removing temp directory /tmp/wordcount.wuwenjing.20210906.024254.744498...
思考二:mrjob任务是否必须是mapper+reducer的形式?
答案:No,任何包含一个mrjob任务的类都可以通过class.run()的形式去执行任务 并行梯度下降算法的main.py文件如下
from collections import defaultdict
import argparse
import glob
import importlib
import imp
import multiprocessing
import numpy as np
import os
import random
import resource
import sys
from mrjob.job import MRJob
import itertools
try:
import simplejson as json
except:
import json
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
def chunks(iterable, size=10):
iterator = iter(iterable)
for first in iterator:
yield list(itertools.chain([first], itertools.islice(iterator, size - 1)))
def isolated_batch_call(f, arguments):
'''在单独的进程中调用函数 f 并返回结果列表'''
def lf(q):
ret = f(*arguments)
q.put(list(ret))
'''Queue 用来在多个进程间通信。Queue 有两个方法,get 和 put
put:放数据,Queue.put( )默认有block=True和timeout两个参数。当block=True时,写入是阻塞式的,阻塞时间由timeout确定。
get:取数据(默认阻塞),Queue.get([block[, timeout]])获取队列,timeout等待时间
'''
q = multiprocessing.Queue()
p = multiprocessing.Process(target=lf, args=(q,))
p.start()
ret = q.get()
p.join()
return ret
def mapreduce(input, MRSGD ,batch_size=50):
random.seed(0)
d = defaultdict(list)
for pairs_generator in chunks(input, batch_size):
pairs = list(pairs_generator)
k, v = None, [x[1] for x in pairs]
for k2, v2 in isolated_batch_call(MRSGD.mapper, (k, v)):
if not isinstance(k2, (str, int, float)):
raise Exception("Keys must be strings, ints or floats (provided '%s')!" % k2)
d[k2].append(v2)
keys = d.keys()
random.shuffle(keys)
for k in keys:
random.shuffle(d[k])
res = []
if len(keys) > 1:
raise Exception("Only one distinct key expected from mappers.")
k = list(keys)[0]
v = d[k]
r = isolated_batch_call(MRSGD.reducer, (k, v))
logger.info("Finished reducing phase!")
return r
def yield_pattern(path):
for i in glob.iglob(path):
if os.path.isfile(i):
with open(i, "r") as fin:
for line in fin:
yield None, line
def import_from_file(f):
mod = imp.new_module("mod")
mod.__file__ = "mod"
mod.__package__ = ''
code = compile(f,'','exec')
exec(code, mod.__dict__)
return mod
def evaluate(weights, test_data, transform):
logger.info("Evaluating the solution")
accuracy, total = 0, 0
instances = transform(test_data[:, 1:])
labels = test_data[:, 0].ravel()
if not instances.shape[1] == weights.shape[1]:
logging.error("Shapes of weight vector and transformed "
"data don't match")
logging.error("%s %s", instances.shape, weights.shape)
sys.exit(-3)
for features, label in zip(instances, labels):
if label * np.inner(weights, features) > 0:
accuracy += 1
total += 1
return float(accuracy) / total
class MRSGD(MRJob):
def run(input_pattern):
batch = 5000
source_file = "/opt/PycharmProjects/Hadoop/MRjobSGD/mrjobsgd.py"
with open(source_file, "r") as fin:
source = fin.read()
mod = import_from_file(source)
input = yield_pattern(input_pattern)
output = mapreduce(input, mod.MRSGD, batch)
weights = np.array(output)
if weights.shape[0] > 1:
logging.error("Incorrect format from reducer")
sys.exit(-2)
test_data = np.loadtxt(test_file, delimiter=" ")
return evaluate(weights, test_data, mod.transform)
if __name__ == "__main__":
MRSGD.run()
并行梯度下降算法的mrsgd.py文件如下
import numpy as np
from mrjob.job import MRJob
m = 3000
iterations = 300000
lambda_val = 1e-6
sigma = 10
def transform(X):
np.random.seed(0)
b = np.random.rand(m) * 2 * np.pi
if X.ndim == 1:
w = np.random.multivariate_normal(np.zeros(X.size), sigma**2 * np.identity(X.size), m)
else:
w = np.random.multivariate_normal(np.zeros(X.shape[1]), sigma**2 * np.identity(X.shape[1]), m)
transformed = (2.0 / m)**0.5 * np.cos(np.dot(X, np.transpose(w)) + b)
transformed = (transformed - np.mean(transformed, 0)) / np.std(transformed, 0)
return transformed
class MRSGD(MRJob):
def mapper(key, value):
features = np.zeros([len(value), len(value[0].split()) - 1])
classifications = np.zeros(len(value))
for i in range(len(value)):
tokens = value[i].split()
classifications[i] = tokens[0]
features[i] = tokens[1:]
features = transform(features)
w = np.zeros(m)
for i in range(1, iterations):
w = update_weights(w, features, classifications, i)
yield 0, w
def reducer(key, values):
cumulative_weights = np.zeros(m)
for w in values:
cumulative_weights += w
yield cumulative_weights / len(values)
def update_weights(w, features, classifications, t):
i = int(np.random.uniform(0, features.shape[0]))
learning_rate = 1 / (lambda_val * t)
new_w = (1 - learning_rate * lambda_val) * w + learning_rate * hinge_loss_gradient(w, features[i], classifications[i])
return new_w
def hinge_loss_gradient(w, x, y):
if np.dot(w, x) * y >= 1:
return 0
else:
return y * x
|