IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 系统运维 -> 【神经网络并行训练(下)】mrjob的多种使用方法 -> 正文阅读

[系统运维]【神经网络并行训练(下)】mrjob的多种使用方法

经过上个星期的摸索,发现了mrjob这个神器,网络上搜出来的教程都是最传统的一个map+一个reduce的Wordcount例子,具体实现参考【神经网络并行训练(上)】
但是要实现梯度下降算法的并行化远不止这么简单,所以决定好好研究一下如何用其实现梯度下降算法并行化。

思考一:mrjob任务接受的参数只能是一个文件吗?

答案:No,mrjob任务可以接受一个或多个文件,甚至一个文件夹,在执行命令后面加n个文件就会处理n个文件。
example.通过一个简单的wordcount例子来验证一下mrjob任务接受多个文件的情况
代码如下:

#!/usr/bin/python
# -*- coding: utf-8 -*-
from mrjob.job import MRJob

class MRWordCount(MRJob):
    def mapper(self, key, line):
        for word in line.split():
            yield(word, 1)

    def reducer(self, word, counts):
        yield(word, sum(counts))

if __name__ == '__main__':
    count = 1
    pairs = "hello\n" \
            "map\n" \
            "reduce\n" \
            "reduce\n" \
            "hello\n" \
            "world\n" \
            "map\n" \
            "reduce\n" \
            "map\n" \
            "a\n" \
            "map\n"
    for i in range(3):
        with open('/home/wuwenjing/Downloads/wordcounttest/test' + str(count) + '.txt', 'w') as f:
            f.write("%s\n" % str(pairs))
            count = count + 1
    #先生成3个待处理文件
    MRWordCount.run()

在命令行执行任务

root@ubuntu:/opt/PycharmProjects/Hadoop/WordCount# python3 wordcount.py -r hadoop /home/wuwenjing/Downloads/wordcounttest/test1.txt /home/wuwenjing/Downloads/wordcounttest/test2.txt /home/wuwenjing/Downloads/wordcounttest/test3.txt

先生成3个.txt文件
请添加图片描述
运行结果,统计了3个文件中单词的数量

root@ubuntu:/opt/PycharmProjects/Hadoop/WordCount# python3 wordcount.py -r hadoop /home/wuwenjing/Downloads/wordcounttest/test1.txt /home/wuwenjing/Downloads/wordcounttest/test2.txt /home/wuwenjing/Downloads/wordcounttest/test3.txt
No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /hadoop/hadoop-2.9.2/bin...
Found hadoop binary: /hadoop/hadoop-2.9.2/bin/hadoop
Using Hadoop version 2.9.2
Looking for Hadoop streaming jar in /hadoop/hadoop-2.9.2...
Found Hadoop streaming jar: /hadoop/hadoop-2.9.2/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar
Creating temp directory /tmp/wordcount.wuwenjing.20210906.024254.744498
uploading working dir files to hdfs:///user/wuwenjing/tmp/mrjob/wordcount.wuwenjing.20210906.024254.744498/files/wd...
Copying other local files to hdfs:///user/wuwenjing/tmp/mrjob/wordcount.wuwenjing.20210906.024254.744498/files/
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar1871937796876565674/] [] /tmp/streamjob5389097803650413669.jar tmpDir=null
  Connecting to ResourceManager at /0.0.0.0:8032
  Connecting to ResourceManager at /0.0.0.0:8032
  Total input files to process : 3
  number of splits:3
  yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
  Submitting tokens for job: job_1630895970017_0001
  Submitted application application_1630895970017_0001
  The url to track the job: http://ubuntu:8088/proxy/application_1630895970017_0001/
  Running job: job_1630895970017_0001
  Job job_1630895970017_0001 running in uber mode : false
   map 0% reduce 0%
   map 67% reduce 0%
   map 100% reduce 0%
   map 100% reduce 100%
  Job job_1630895970017_0001 completed successfully
  Output directory: hdfs:///user/wuwenjing/tmp/mrjob/wordcount.wuwenjing.20210906.024254.744498/output
Counters: 49
	File Input Format Counters 
		Bytes Read=174
	File Output Format Counters 
		Bytes Written=46
	File System Counters
		FILE: Number of bytes read=375
		FILE: Number of bytes written=812987
		FILE: Number of large read operations=0
		FILE: Number of read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=645
		HDFS: Number of bytes written=46
		HDFS: Number of large read operations=0
		HDFS: Number of read operations=12
		HDFS: Number of write operations=2
	Job Counters 
		Data-local map tasks=3
		Launched map tasks=3
		Launched reduce tasks=1
		Total megabyte-milliseconds taken by all map tasks=29351936
		Total megabyte-milliseconds taken by all reduce tasks=5346304
		Total time spent by all map tasks (ms)=28664
		Total time spent by all maps in occupied slots (ms)=229312
		Total time spent by all reduce tasks (ms)=5221
		Total time spent by all reduces in occupied slots (ms)=41768
		Total vcore-milliseconds taken by all map tasks=28664
		Total vcore-milliseconds taken by all reduce tasks=5221
	Map-Reduce Framework
		CPU time spent (ms)=2820
		Combine input records=0
		Combine output records=0
		Failed Shuffles=0
		GC time elapsed (ms)=456
		Input split bytes=471
		Map input records=36
		Map output bytes=303
		Map output materialized bytes=387
		Map output records=33
		Merged Map outputs=3
		Physical memory (bytes) snapshot=1008001024
		Reduce input groups=5
		Reduce input records=33
		Reduce output records=5
		Reduce shuffle bytes=387
		Shuffled Maps =3
		Spilled Records=66
		Total committed heap usage (bytes)=694157312
		Virtual memory (bytes) snapshot=7783809024
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
job output is in hdfs:///user/wuwenjing/tmp/mrjob/wordcount.wuwenjing.20210906.024254.744498/output
Streaming final output from hdfs:///user/wuwenjing/tmp/mrjob/wordcount.wuwenjing.20210906.024254.744498/output...
"a"	3
"hello"	6
"map"	12
"reduce"	9
"world"	3
Removing HDFS temp directory hdfs:///user/wuwenjing/tmp/mrjob/wordcount.wuwenjing.20210906.024254.744498...
Removing temp directory /tmp/wordcount.wuwenjing.20210906.024254.744498...

思考二:mrjob任务是否必须是mapper+reducer的形式?

答案:No,任何包含一个mrjob任务的类都可以通过class.run()的形式去执行任务
并行梯度下降算法的main.py文件如下

from collections import defaultdict
import argparse
import glob
import importlib
import imp
import multiprocessing
import numpy as np
import os
import random
import resource
import sys
from mrjob.job import MRJob
import itertools

try:
    import simplejson as json
except:
    import json
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)

def chunks(iterable, size=10):
    iterator = iter(iterable)
    for first in iterator:
        yield list(itertools.chain([first], itertools.islice(iterator, size - 1)))

def isolated_batch_call(f, arguments):
    '''在单独的进程中调用函数 f 并返回结果列表'''
    def lf(q):
        ret = f(*arguments)
        q.put(list(ret))

    '''Queue 用来在多个进程间通信。Queue 有两个方法,get 和 put
    put:放数据,Queue.put( )默认有block=True和timeout两个参数。当block=True时,写入是阻塞式的,阻塞时间由timeout确定。
    get:取数据(默认阻塞),Queue.get([block[, timeout]])获取队列,timeout等待时间
    '''
    q = multiprocessing.Queue() #创建父进程q
    p = multiprocessing.Process(target=lf, args=(q,)) #将父进程传给子进程
    p.start() #启动子进程p,执行def lf(q),把mapper(k,v)返回的数据存在list(ret)里
    ret = q.get() #变量ret通过q.get方法获得list(ret)
    p.join() #等待p结束
    return ret #ret是一个list


def mapreduce(input, MRSGD ,batch_size=50):
    # 设置初始随机种子
    random.seed(0)
    # 运行 mappers
    d = defaultdict(list)
    for pairs_generator in chunks(input, batch_size):
        pairs = list(pairs_generator)
        k, v = None, [x[1] for x in pairs]
        for k2, v2 in isolated_batch_call(MRSGD.mapper, (k, v)):
            if not isinstance(k2, (str, int, float)):  #k2是元组(str,int,list)中的一个返回 True,如果k2不是其中一种,则报错
                raise Exception("Keys must be strings, ints or floats (provided '%s')!" % k2)
            d[k2].append(v2)
    # 键和值的随机排列
    keys = d.keys()
    random.shuffle(keys)
    for k in keys:
        random.shuffle(d[k])
    # 运行 reducers
    res = []
    if len(keys) > 1:
        raise Exception("Only one distinct key expected from mappers.")
    k = list(keys)[0]   # k为0
    v = d[k]   # v包含16个w值数组
    r = isolated_batch_call(MRSGD.reducer, (k, v))
    logger.info("Finished reducing phase!")
    return r


def yield_pattern(path):
    for i in glob.iglob(path):
        if os.path.isfile(i):
            with open(i, "r") as fin:
                for line in fin:
                    yield None, line  #按行返回

def import_from_file(f):
    mod =  imp.new_module("mod")
    mod.__file__ = "mod"
    mod.__package__ = ''
    code = compile(f,'','exec')
    exec(code, mod.__dict__)
    return mod


def evaluate(weights, test_data, transform):
    logger.info("Evaluating the solution")
    accuracy, total = 0, 0
    instances = transform(test_data[:, 1:])
    labels = test_data[:, 0].ravel()
    if not instances.shape[1] == weights.shape[1]:
        logging.error("Shapes of weight vector and transformed "
                      "data don't match")
        logging.error("%s %s", instances.shape, weights.shape)
        sys.exit(-3)
    for features, label in zip(instances, labels):
        if label * np.inner(weights, features) > 0:
            accuracy += 1
        total += 1
    return float(accuracy) / total

class MRSGD(MRJob):
    def run(input_pattern):
        batch = 5000
        source_file = "/opt/PycharmProjects/Hadoop/MRjobSGD/mrjobsgd.py"
        with open(source_file, "r") as fin:
            source = fin.read()  # 读取source_file文件
        mod = import_from_file(source) #将source文件转化为一个名为'mod'的模型,从而可以调用里面的方法和函数
        input = yield_pattern(input_pattern)
        output = mapreduce(input, mod.MRSGD, batch)
        weights = np.array(output)
        if weights.shape[0] > 1:
            logging.error("Incorrect format from reducer")
            sys.exit(-2)
        test_data = np.loadtxt(test_file, delimiter=" ")
        return evaluate(weights, test_data, mod.transform)

if __name__ == "__main__":
    MRSGD.run()

并行梯度下降算法的mrsgd.py文件如下

import numpy as np
from mrjob.job import MRJob

# constants
# m is the dimension of the transformed feature vector    # m 是变换后的特征向量的维度
m = 3000
# number of iterations of PEGASOS    #PEGASOS 的迭代次数
iterations = 300000
# regularization constant    #正则化常数
lambda_val = 1e-6
# standard deviation of p in RFF    #RFF 中 p 的标准偏差
sigma = 10

# transforms X into m-dimensional feature vectors using RFF and RBF kernel 使用 RFF 和 RBF 核将 X 转换为 m 维特征向量
# Make sure this function works for both 1D and 2D NumPy arrays. 确保此函数适用于 1D 和 2D NumPy 数组。
def transform(X):
    np.random.seed(0)
    b = np.random.rand(m) * 2 * np.pi

    if X.ndim == 1:
        w = np.random.multivariate_normal(np.zeros(X.size), sigma**2 * np.identity(X.size), m)
    else:
        w = np.random.multivariate_normal(np.zeros(X.shape[1]), sigma**2 * np.identity(X.shape[1]), m)

    transformed = (2.0 / m)**0.5 * np.cos(np.dot(X, np.transpose(w)) + b)
    # feature normalization 特征归一化
    transformed = (transformed - np.mean(transformed, 0)) / np.std(transformed, 0)

    return transformed

class MRSGD(MRJob):
    # key: None
    # value: one line of input file
    def mapper(key, value): #传进来的value是一个5000*401的列表
        # 2D NumPy array containing the original feature vectors 包含原始特征向量的 2维 NumPy 数组(训练数据)
        features = np.zeros([len(value), len(value[0].split()) - 1]) #features.shape=(5000,400)
        # 1D NumPy array containing the classifications of the training data 包含训练数据分类的一维 NumPy 数组(标签t)
        classifications = np.zeros(len(value)) #classifications.shape=(5000,)

        # populate features and classifications 填充特征和分类
        for i in range(len(value)):
            tokens = value[i].split() #tokens是一个1*400的矩阵,共循环5000次
            classifications[i] = tokens[0] #5000个标签[-1,-1,+1,……]
            features[i] = tokens[1:]

        # project features into higher dimensional space 将特征投影到更高维空间
        features = transform(features)

        # PEGASOS
        w = np.zeros(m)   # w.shape=(3000,)
        for i in range(1, iterations):   #iterations = 300000 迭代300000次
            w = update_weights(w, features, classifications, i)
        yield 0, w

    def reducer(key, values):
        cumulative_weights = np.zeros(m)

        for w in values:
            cumulative_weights += w  # 求16个w值数组的和

        # yield the average of the weights
        yield cumulative_weights / len(values)  # 求16个w值数组的均值  sum/16

# weight vector update of PEGASOS
def update_weights(w, features, classifications, t):
    i = int(np.random.uniform(0, features.shape[0]))
    learning_rate = 1 / (lambda_val * t)   #lambda_val = 1e-6 正则化常数

    new_w = (1 - learning_rate * lambda_val) * w + learning_rate * hinge_loss_gradient(w, features[i], classifications[i])
    # optional projection step 可选的投影步骤
    #new_w = min(1, ((1 / lambda_val**0.5) / np.linalg.norm(new_w))) * new_w

    return new_w


# calculate the gradient of the hinge loss function 计算铰链损失函数的梯度
def hinge_loss_gradient(w, x, y):
    if np.dot(w, x) * y >= 1:
        return 0
    else:
        return y * x
  系统运维 最新文章
配置小型公司网络WLAN基本业务(AC通过三层
如何在交付运维过程中建立风险底线意识,提
快速传输大文件,怎么通过网络传大文件给对
从游戏服务端角度分析移动同步(状态同步)
MySQL使用MyCat实现分库分表
如何用DWDM射频光纤技术实现200公里外的站点
国内顺畅下载k8s.gcr.io的镜像
自动化测试appium
ctfshow ssrf
Linux操作系统学习之实用指令(Centos7/8均
上一篇文章      下一篇文章      查看所有文章
加:2021-09-06 11:30:56  更:2021-09-06 11:32:36 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/15 13:41:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码