实验准备
阅读论文《MapReduce: Simplified Data Processing on Large Cluster 》。 最好准备一个unix环境,本人是在自己的腾讯云服务器上进行的实验,使用的是ubuntu系统,安装好golang,将实验代码拉到本地并命名为6.824(可以自己起别的名字):
git clone git://g.csail.mit.edu/6.824-golabs-2018 6.824
代码结构
本实验是第一个实验lab-1,也就是map_reduce 实验,所以这里主要用到的是mapreduce 文件夹下的代码。 先从test_test.go 文件看起,这个文件除了已经编写好的MapFun 和ReduceFun 函数外,主要是用来最后进行测试的,其中有TestSequentialSingle 、TestSequentialMany 、TestParallelBasic 、TestParallelCheck 、TestOneFailure 和TestManyFailures 六个测试函数,当编写完自己的代码后可以在mapreduce 目录下通过如下命令进行测试
go test -run TestSequentialSingle
或者使用如下命令对代码中以Test开头的函数全部进行测试
go test
再来看common_map.go 和common_reduce.go 文件,这两个文件是这次实验的核心,也是本次实验需要编写的文件,如果顺利的话编写完这两个文件就可以测试TestSequentialSingle 、TestSequentialMany 检查是否成功 最后再来看一下schedule.go 文件,这个是用来实现并发调度的核心的代码,也是本次实验需要编写的代码。
编写代码
common_map.go 里的doMap 函数增加代码如下
file, errOpenFile := os.OpenFile(inFile, os.O_RDONLY, 0644)
if errOpenFile != nil {
fmt.Println("open file failure ", errOpenFile)
}
defer file.Close()
FileContent, errReadFile := ioutil.ReadAll(file)
if errReadFile != nil {
fmt.Println("Read file failure ", errReadFile)
}
ReduceKeyValue := mapF(inFile, string(FileContent))
for i := 0;i < nReduce;i++ {
ReduceName := reduceName(jobName, mapTask, i)
mapOutFile, errOutFile := os.OpenFile(ReduceName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if errOutFile != nil {
fmt.Println(ReduceName, "output file open failure ", errOutFile)
}
encoder := json.NewEncoder(mapOutFile)
for _, value := range ReduceKeyValue {
if ihash(value.Key) % nReduce == i {
err := encoder.Encode(&value)
if err != nil {
fmt.Println("File write failure", err)
}
}
}
mapOutFile.Close()
}
common_reduce.go 里的doReduce 函数增加代码如下
var reduceData []KeyValue
var ReduceInputFileName string
for i := 0;i < nMap;i++ {
ReduceInputFileName = reduceName(jobName, i, reduceTask)
file, errOpenFile := os.Open(ReduceInputFileName)
if errOpenFile != nil {
fmt.Println("Open reduce file failure ", errOpenFile)
}
decoder := json.NewDecoder(file)
for {
var v KeyValue
errDecode := decoder.Decode(&v)
if errDecode != nil {
break
}
reduceData = append(reduceData, v)
}
file.Close()
}
var sortReduceData keyValues = reduceData[:]
sort.Sort(sortReduceData)
ReduceOutFile, errOutput := os.OpenFile(outFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if errOutput != nil {
fmt.Println("output file open failure ", errOutput)
}
defer ReduceOutFile.Close()
encoder := json.NewEncoder(ReduceOutFile)
var dataForPerKey []string
key := sortReduceData[0].Key
for _, value := range sortReduceData {
if value.Key == key {
dataForPerKey = append(dataForPerKey, value.Value)
} else {
out := reduceF(key, dataForPerKey)
encoder.Encode(KeyValue{key, out})
key = value.Key
dataForPerKey = dataForPerKey[:0]
dataForPerKey = append(dataForPerKey, value.Key)
}
}
if len(dataForPerKey) > 0 {
out := reduceF(key, dataForPerKey)
encoder.Encode(KeyValue{key, out})
}
完成这两步代码的编写后就可以进行如下两个串行执行的测试了,在终端输入如下命令
go test -run TestSequentialSingle
go test -run TestSequentialMany
不出意外会出现如下结果
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
master: Map/Reduce task completed
PASS
ok _/home/ribincao/Learn/6.824labs/src/mapreduce 1.653s
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
Merge: read mrtmp.test-res-1
Merge: read mrtmp.test-res-2
master: Map/Reduce task completed
PASS
ok _/home/ribincao/Learn/6.824labs/src/mapreduce 2.260s
- 在
main 目录下的wc.go 文件的函数mapF 和reduceF 中增加代码
func mapF(filename string, contents string) []mapreduce.KeyValue {
f := func (c rune) bool{
return !unicode.IsLetter(c)
}
var temp = strings.FieldsFunc(contents,f)
var ret []mapreduce.KeyValue
for _,value := range temp{
ret = append(ret, mapreduce.KeyValue{Key:value,Value:"1"})
}
return ret
}
func reduceF(key string, values []string) string {
if len(values) == 0{
fmt.Println("values = 0")
}
var count int
var ret string
for _,v := range values{
Int,err := strconv.Atoi(v)
if err != nil{
fmt.Println("v is not a numbet",v)
}
count += Int
}
ret = strconv.Itoa(count)
return ret
}
schedule.go 里的schedule 函数增加如下代码
var threadMaster sync.WaitGroup
for i := 0;i < ntasks;i++ {
threadMaster.Add(1)
go func(inputFile string, TaskNumber int) {
for {
rpcAdr := <- registerChan
ok := call(rpcAdr, "Worker.DoTask", DoTaskArgs{
JobName: jobName,
File: inputFile,
Phase: phase,
TaskNumber: TaskNumber,
NumOtherPhase: n_other,
}, nil)
if ok {
go func() {
registerChan <- rpcAdr
}()
break
}
}
threadMaster.Done()
}(mapFiles[i], i)
}
fmt.Printf("Schedule: %v done\n", phase)
threadMaster.Wait()
测试
最后可以完整的测试一遍,终端运行命令
go test
成功的话会出现下面的结果(太多只截取部分内容)
...
/var/tmp/824-1001/mr31752-worker2: given reducePhase task #9 on file 824-mrinput-9.txt (nios: 20)
/var/tmp/824-1001/mr31752-worker3: reducePhase task #8 done
/var/tmp/824-1001/mr31752-worker2: reducePhase task #9 done
Master: RPC /var/tmp/824-1001/mr31752-worker1 shutdown error
Master: RPC /var/tmp/824-1001/mr31752-worker0 shutdown error
Merge: read mrtmp.test-res-0
Merge: read mrtmp.test-res-1
Merge: read mrtmp.test-res-2
Merge: read mrtmp.test-res-3
Merge: read mrtmp.test-res-4
Merge: read mrtmp.test-res-5
Merge: read mrtmp.test-res-6
Merge: read mrtmp.test-res-7
Merge: read mrtmp.test-res-8
Merge: read mrtmp.test-res-9
/var/tmp/824-1001/mr31752-master: Map/Reduce task completed
PASS
ok _/home/ribincao/Learn/6.824labs/src/mapreduce 11.113s
|