IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> MIT6.824 lab-1实践 -> 正文阅读

[大数据]MIT6.824 lab-1实践

实验准备

阅读论文《MapReduce: Simplified Data Processing on Large Cluster 》。
最好准备一个unix环境,本人是在自己的腾讯云服务器上进行的实验,使用的是ubuntu系统,安装好golang,将实验代码拉到本地并命名为6.824(可以自己起别的名字):

git clone git://g.csail.mit.edu/6.824-golabs-2018 6.824

代码结构

本实验是第一个实验lab-1,也就是map_reduce实验,所以这里主要用到的是mapreduce文件夹下的代码。
先从test_test.go文件看起,这个文件除了已经编写好的MapFunReduceFun函数外,主要是用来最后进行测试的,其中有TestSequentialSingleTestSequentialManyTestParallelBasicTestParallelCheckTestOneFailureTestManyFailures六个测试函数,当编写完自己的代码后可以在mapreduce目录下通过如下命令进行测试

go test -run TestSequentialSingle

或者使用如下命令对代码中以Test开头的函数全部进行测试

go test

再来看common_map.gocommon_reduce.go文件,这两个文件是这次实验的核心,也是本次实验需要编写的文件,如果顺利的话编写完这两个文件就可以测试TestSequentialSingleTestSequentialMany检查是否成功
最后再来看一下schedule.go文件,这个是用来实现并发调度的核心的代码,也是本次实验需要编写的代码。

编写代码

  1. common_map.go里的doMap函数增加代码如下
// Your code here (Part I).
//
//	1. 读取文件
file, errOpenFile := os.OpenFile(inFile, os.O_RDONLY, 0644)
if errOpenFile != nil {
	fmt.Println("open file failure ", errOpenFile)
}
defer file.Close()

FileContent, errReadFile := ioutil.ReadAll(file)
if errReadFile != nil {
	fmt.Println("Read file failure ", errReadFile)
}

//	2. 进行 map 处理
ReduceKeyValue := mapF(inFile, string(FileContent))

//	3. 根据<job_name, map_id, reduce_id>生成中间文件
for i := 0;i < nReduce;i++ {
	ReduceName := reduceName(jobName, mapTask, i)
	mapOutFile, errOutFile := os.OpenFile(ReduceName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
	if errOutFile != nil {
		fmt.Println(ReduceName, "output file open failure ", errOutFile)
	}

	encoder := json.NewEncoder(mapOutFile)
	for _, value := range ReduceKeyValue {
		if ihash(value.Key) % nReduce == i {
			err := encoder.Encode(&value)
			if err != nil {
				fmt.Println("File write failure", err)
			}
		}
	}
	mapOutFile.Close()
}
  1. common_reduce.go里的doReduce函数增加代码如下
// Your code here (Part I).
//
//	1. 根据<job_name, map_id, reduce_id>依次读取中间文件塞到reduceData里
var reduceData []KeyValue
var ReduceInputFileName string
for i := 0;i < nMap;i++ {
	ReduceInputFileName = reduceName(jobName, i, reduceTask)
	file, errOpenFile := os.Open(ReduceInputFileName)
	if errOpenFile != nil {
		fmt.Println("Open reduce file failure ", errOpenFile)
	}

	decoder := json.NewDecoder(file)
	for {
		var v KeyValue
		errDecode := decoder.Decode(&v)
		if errDecode != nil {
			break
		}
		reduceData = append(reduceData, v)
	}
	file.Close()
}

//	2. 对 reduceData 进行排序
var sortReduceData keyValues = reduceData[:]
sort.Sort(sortReduceData)

//	3. 把相同的 Key 对应的 value 放一起然后进行 reduce
ReduceOutFile, errOutput := os.OpenFile(outFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if errOutput != nil {
	fmt.Println("output file open failure ", errOutput)
}
defer ReduceOutFile.Close()
encoder := json.NewEncoder(ReduceOutFile)
var dataForPerKey []string
key := sortReduceData[0].Key
for _, value := range sortReduceData {
	if value.Key == key {
		dataForPerKey = append(dataForPerKey, value.Value)
	} else {
		out := reduceF(key, dataForPerKey)
		encoder.Encode(KeyValue{key, out})

		key = value.Key
		dataForPerKey = dataForPerKey[:0]
		dataForPerKey = append(dataForPerKey, value.Key)
	}
}

//	最后避免漏网之鱼2333
if len(dataForPerKey) > 0 {
	out := reduceF(key, dataForPerKey)
	encoder.Encode(KeyValue{key, out})
}

完成这两步代码的编写后就可以进行如下两个串行执行的测试了,在终端输入如下命令

go test -run TestSequentialSingle
go test -run TestSequentialMany

不出意外会出现如下结果

master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
master: Map/Reduce task completed
PASS
ok      _/home/ribincao/Learn/6.824labs/src/mapreduce   1.653s
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
Merge: read mrtmp.test-res-1
Merge: read mrtmp.test-res-2
master: Map/Reduce task completed
PASS
ok      _/home/ribincao/Learn/6.824labs/src/mapreduce   2.260s
  1. main目录下的wc.go文件的函数mapFreduceF中增加代码
func mapF(filename string, contents string) []mapreduce.KeyValue {
	// Your code here (Part II).
	f := func (c rune) bool{
		return !unicode.IsLetter(c)
	}
	var temp = strings.FieldsFunc(contents,f)
	var ret []mapreduce.KeyValue
	for _,value := range temp{
		ret = append(ret, mapreduce.KeyValue{Key:value,Value:"1"})
	}
	return ret
}
func reduceF(key string, values []string) string {
	// Your code here (Part II).
	if len(values) == 0{
		fmt.Println("values = 0")
	}
	var count int
	var ret string
	for _,v := range values{
		Int,err := strconv.Atoi(v)
		if err != nil{
			fmt.Println("v is not a numbet",v)
		}
		count += Int
	}
	ret = strconv.Itoa(count)
	return ret
}
  1. schedule.go里的schedule函数增加如下代码
// Your code here (Part III, Part IV).
//
var threadMaster sync.WaitGroup
for i := 0;i < ntasks;i++ {
	//	一个 master 并发分配任务给 worker 
	threadMaster.Add(1)
	go func(inputFile string, TaskNumber int) {
		for {
			//	registerChan 相当于一个 worker 队列, worker 出队完成 map 或者 reduce 任务之后入队
			rpcAdr := <- registerChan
			ok := call(rpcAdr, "Worker.DoTask", DoTaskArgs{
				JobName: jobName,
				File:	inputFile,
				Phase: phase,
				TaskNumber: TaskNumber,
				NumOtherPhase: n_other,
			}, nil)
			if ok {
				go func() {
					//	完成了就通知当前 worker 可用
					registerChan <- rpcAdr
				}()
				break
			}
		}
		threadMaster.Done()
	}(mapFiles[i], i)
}
fmt.Printf("Schedule: %v done\n", phase)
threadMaster.Wait()

测试

最后可以完整的测试一遍,终端运行命令

go test

成功的话会出现下面的结果(太多只截取部分内容)

...
/var/tmp/824-1001/mr31752-worker2: given reducePhase task #9 on file 824-mrinput-9.txt (nios: 20)
/var/tmp/824-1001/mr31752-worker3: reducePhase task #8 done
/var/tmp/824-1001/mr31752-worker2: reducePhase task #9 done
Master: RPC /var/tmp/824-1001/mr31752-worker1 shutdown error
Master: RPC /var/tmp/824-1001/mr31752-worker0 shutdown error
Merge: read mrtmp.test-res-0
Merge: read mrtmp.test-res-1
Merge: read mrtmp.test-res-2
Merge: read mrtmp.test-res-3
Merge: read mrtmp.test-res-4
Merge: read mrtmp.test-res-5
Merge: read mrtmp.test-res-6
Merge: read mrtmp.test-res-7
Merge: read mrtmp.test-res-8
Merge: read mrtmp.test-res-9
/var/tmp/824-1001/mr31752-master: Map/Reduce task completed
PASS
ok      _/home/ribincao/Learn/6.824labs/src/mapreduce   11.113s
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-28 07:53:02  更:2021-07-28 07:55:32 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/7 4:53:01-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码