-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreducer.go
executable file
·123 lines (91 loc) · 2.48 KB
/
reducer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Works on part of dataset and saves result in file as a reducer.
package main
// Import necessary package to accomplish your work.
import (
"fmt"
"strconv"
"os"
"bufio"
"sort"
"io/ioutil"
"strings"
"log"
)
type SortWikiResult struct {
searchWiki map[string]int
Keys []string
}
func (sw *SortWikiResult) Len() int {
// TODO: Implement Len function.
return len(sw.searchWiki)
}
func (sw *SortWikiResult) Less(i, j int) bool {
// TODO: Implement Less function.
return sw.searchWiki[sw.Keys[i]] > sw.searchWiki[sw.Keys[j]]
}
func (sw *SortWikiResult) Swap(i, j int) {
// TODO: Implement Swap function.
sw.Keys[i], sw.Keys[j] = sw.Keys[j], sw.Keys[i]
}
func sortArticles(searchWiki map[string]int) []string {
// TODO: Implement sortKeys function.
var Keys []string
for key, _ := range searchWiki {
Keys = append(Keys, key)
}
SWR := SortWikiResult{searchWiki, Keys}
sort.Stable(&SWR)
return Keys
}
func getMappersFiles(jobID int) []string {
// TODO: get a list of path of files were produced
// by mappers in path /mnt/datanode/tmp/jobID.
path := "/mnt/datanode/tmp/" + strconv.Itoa(jobID)
files, _ := ioutil.ReadDir(path)
var arr []string
for _, f := range files {
arr = append(arr, f.Name())
}
return arr
}
func readFile(path string, searchWiki map[string]int) {
// TODO: read file which was written by the mapper and
// add data to searchWiki.
file, err := os.Open(path)
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
arr := strings.Fields(line)
v, _ := strconv.Atoi(arr[1])
searchWiki[arr[0]] = v
}
}
func main() {
// TODO: get one arguments in format jobID.
strjobID := os.Args[1]
jobID, _ := strconv.Atoi(strjobID)
// TODO: call getMappersFiles and save result in array.
mapFiles := getMappersFiles(jobID)
// TODO: use readFile for files of mappers.
myMap := make(map[string]int)
for i := 0; i < len(mapFiles); i++ {
readFile("/mnt/datanode/tmp/"+strconv.Itoa(jobID)+"/"+mapFiles[i], myMap)
}
// TODO: perform sort on results.
sortedWiki := sortArticles(myMap)
// TODO: save only results of top 100 in output file
// in this path pattern /mnt/datanode/tmp/jobID/output.
var top60 []string
for i := 0; i < len(sortedWiki); i++ {
top60 = append(top60, sortedWiki[i])
}
top := strings.Join(top60, "\n")
err := ioutil.WriteFile("/mnt/datanode/tmp/"+strjobID+"/output", []byte(top), 0644)
if err == nil {
fmt.Println("Data has been written")
}
}