-
Notifications
You must be signed in to change notification settings - Fork 40
/
l13-mapreduce.html
520 lines (423 loc) · 21.1 KB
/
l13-mapreduce.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
<h1>6.824 2015 Lecture 13: MapReduce</h1>
<p><strong>Note:</strong> These lecture notes were slightly modified from the ones posted on the
6.824 <a href="http://nil.csail.mit.edu/6.824/2015/schedule.html">course website</a> from
Spring 2015.</p>
<h2>Intro</h2>
<ul>
<li>2nd trip to this paper, talk more about fault tolerance
<ul>
<li>See <a href="l01-intro.html">first lecture</a></li>
</ul></li>
<li>a real triumph of simplicity for the programmer</li>
<li>clever design tricks to get good performance</li>
</ul>
<h2>Example: Building an inverted index</h2>
<ul>
<li>you need an inverted index for a search index</li>
<li>maps keywords to documents they are found in</li>
</ul>
<p>Example:</p>
<pre><code>doc 31: I am Alex
doc 32: Alex at 8 am
</code></pre>
<p>The output that we want, is an index: for each word in the input, we want
a list of every place that word occurred (document + offset):</p>
<pre><code>alex: 31/2, 32/0 ...
am: 31/1, 32/3 ...
</code></pre>
<p>The actual map/reduce functions for building an inverted index:</p>
<pre><code>map(doc file)
split doc into words
for each word
emit(word, {doc #, offset})
reduce(word string, occurrences list<doc #, offset>)
emit(word, sorted list of ocurrences by doc # and then by offset)
</code></pre>
<h2>Input files</h2>
<p>In MapReduce, the input is stored in GFS (Google's file system)</p>
<pre><code>Input, M splits, one R reduce tasks
map function for each
split
--------- ---------------------------------
| | | | | * | | | |
--------- ----------|----------------------
| | | | | * | | | |
--------- ----------|----------------------
| | | | | * | | | |
--------- ----------|----------------------
| | . . . |
--------- . . . \
| | . . . \-----> data for a single reduce task
--------- . . . is all the data in the column
| | . . .
--------- . . .
| | . . .
--------- . . .
</code></pre>
<p>What happens if the column of data for a reduce worker not fitting in memory?
Seems like it would go to disk.</p>
<p>Note that a single reduce call happens for every unique keyword. So, in our
inverted index example, this would mean a single reduce call for the keyword
"the" which would appear probably a billion times in a large collection of
documents. Thus, this will take a while. MapReduce cannot parallelize the work
in a reduce call (lost opportunity for certain reduce functions that are
composable, like f(reduce(k, l1), reduce(k, l2)) = reduce(k, l1+l2) ).</p>
<ul>
<li>I think <em>combiner functions</em> mentioned in the paper, can alleviate this issue</li>
</ul>
<h2>Performance</h2>
<ul>
<li>it's all about data movement
<ul>
<li>pushing terrabytes of data across a cluster</li>
<li>1000 machines
<ul>
<li>can maybe push data to RAM (1GB/s) at 1000 GB/s</li>
<li>can maybe push data to disk (100MB/s) at 100 GB/s</li>
<li>network can run at 1Gbit/s = 100MB/s on a machine
<ul>
<li>for 1000 machine, the wiring is expensive and costs you speed</li>
<li>network is usually a tree with servers at the leaves and bigger
switches in the internal nodes</li>
<li>bottleneck is root switch, which runs at 18GB/s at Google</li>
</ul></li>
<li>thus, network can only push data at 18GB/s <code>=></code> <em>bottleneck</em></li>
</ul></li>
</ul></li>
</ul>
<h2>Design insights</h2>
<p>Need to cope with the network problem.</p>
<p>Distributed Shared Memory (DSM) is very flexible in that any machine can write
memory on any location in (distributed) memory. The problem is that you end up
w/ very bandwidth inefficient and latency sensitive systems. If you allow arbitrary
reads/writes to data you end up with a bunch of latency-sensitive small data
movements across the network.</p>
<p>DSM makes fault tolerance quite difficult, when a single machine dies, because
each machine can do whatever it wants (read or write any mem. loc.), so it's
hard to checkpoint the system.</p>
<p><strong>Key ideas:</strong> </p>
<ul>
<li><code>Map()</code> and <code>Reduce()</code> work on local data only.</li>
<li><code>Map()</code> and <code>Reduce()</code> only operate on big pieces of data
<ul>
<li>to amortize network cost of sending</li>
</ul></li>
<li>very little interaction between parts of the system
<ul>
<li>maps cannot talk to each other</li>
<li>reduces cannot talk to each other</li>
<li>maps and reduces cannot talk to each other
<ul>
<li>other than the implicit communication of sending the mapped data to
the reduce functions</li>
</ul></li>
</ul></li>
<li>give programmer abstract control over the network communication
<ul>
<li>some control over how keys are mapped into the reduce partitions</li>
</ul></li>
</ul>
<p>Input is typically stored striped (64MB chunks) in GFS, over a lot of disks and
machines.</p>
<ul>
<li>gotta be clever, because this would imply that Map tasks are limited by
network bandwidth</li>
</ul>
<p>MapReduce takes advantage of GFS knowledge, to actually run the map tasks locally
on the GFS machines where the file chunks are stored. <code>=></code> increase bandwitdh
to maps from 18GB/s to 100GB/s</p>
<p>Intermediate map files generated by map are also stored locally. Downside is that
there's a single copy of the data on that one machine and the reduce worker has
to talk to it only <code>=></code> limited bandwidth.</p>
<ul>
<li>if the machine stops or crashes, the data is lost, have to restart map</li>
</ul>
<p>Data in GFS is actually replicated (2 or 3 copies), and this gives MapReduce
a choice of 2-3 servers that it can run every map task on.</p>
<ul>
<li>good for load/balancing (MR master can move slow map tasks to other machines)
<ul>
<li>don't get this benefit for reduce tasks</li>
</ul></li>
</ul>
<p>Output of reduce is stored in GFS <code>=></code> reduce output is written across the network.
<code>=></code> total output of MapReduce system is 18GB/s, if that's your cross-section
bandwidth.</p>
<h2>QOTD</h2>
<p>How soon can reduce start after map emitted some data?</p>
<p>Morris: As soon as a column is filled with data <code><=></code> as soon as all the maps
are finished.</p>
<p>Apparently, you could do it as soon as a map task emits a keyword, by feeding
values as they are generated in the reduce task's iterator, but performance
can be tricky to achieve in that case.</p>
<h2>Does MapReduce scale well?</h2>
<p>One of the big benefit of a distributed system, is that you <em>might</em> be able
to speed it up by just buying more machines. Cheaper to buy machines than to
pay programmers.</p>
<p><code>nx</code> hardware => <code>nx</code> performance?, <code>n > 1</code></p>
<p>As we grow # of machines (10 fold), and input size stays constant <code>=></code> input
size has to be decreased (10 fold). Smaller splits (10x smaller).</p>
<p>If we have millions of machines, the splits can be kilobytes in size <code>=></code> network
latency will kill our performance.</p>
<p>You can't have more reduce workers than you have keys.</p>
<p>Scalability is limited by</p>
<ul>
<li>map split size</li>
<li>number of keys <code>>=</code> # of reduce workers</li>
<li>network bandwidth (need to buy more "network" too, as we buy more machines)
<ul>
<li>a really important problem</li>
</ul></li>
</ul>
<p>The answer: certainly get some scaling, but not infinite (limited by network)</p>
<h2>Fault tolerance</h2>
<p>Challenge: if you run big jobs on 1000s of computers, you are sure to get some
failures. So cannot simply restart whole computation. Must just redo failed
machine's work.</p>
<p>Difficult to achieve for DSM, easier for MapReduce.</p>
<p>Assuming independent failures (also because maps/reduces are independent)</p>
<p>If worker failed:</p>
<ul>
<li>can just restart</li>
<li>can save intermediate output and resume after failure</li>
</ul>
<p>If maps fail, we have to rerun it, because it stores its output on the same machine,
which is done. Master knows what the map was working on, so it can just restart.</p>
<p>If a reduce worker crashes, because they store their output on GFS, on replicated
different servers. We have a good chance of not having to recompute, if the
reduce worker finished.</p>
<h2>Paper's performance eval</h2>
<p>Figure 2 in paper. Why does the bandwidth take 60 seconds to achieve 30GB/s?</p>
<p>The MR job has 1800 mappers, and some <em>poor</em> master that has to give work to each
one. So maybe the master takes a while to contact everyone.</p>
<p>Why only 30GB/s? These are map tasks so no network overhead. Maybe the CPU is the
limit? Unlikely. Seems like this is a disk bandwidth issue. 30GB/s / 1800 machines
<code>=></code> 17MB/s per disk</p>
<p>Figure 3 in paper. 800 secs for sorting 1TB of data <code>=></code> 1.25GB/s sort throughput</p>
<p>One thing to notice is that the terrabyte of data fits in the memory of the
1800 machines. </p>
<p>On a single machine with enough memory, Morris extrapolated that it would take
around 30,000 seconds to sort 1TB of data (takes 2500secs to sort 100GB)</p>
<p>Middle graph says they are only able to move data across the network at 5GB/s.
Simply moving 1TB of data will take 200 seconds. And MapReduces moves it more than
once: from maps to reduce, from reduce to GFS (multiple times for replication)</p>
<p><em>Important insight:</em> Computation involves moving data. Not just CPU cycles.</p>
<h1>6.824 original notes</h1>
<pre><code> Why MapReduce?
Second look for fault tolerance and performance
Starting point in current enthusiasm for big cluster computing
A triumph of simplicity for programmer
Bulk orientation well matched to cluster with slow network
Very influential, inspired many successors (Hadoop, Spark, &c)
Cluster computing for Big Data
1000 computers + disks
a LAN
split up data+computation among machines
communicate as needed
similar to DSM vision but much bigger, no desire for compatibilty
Example: inverted index
e.g. index terabytes of web pages for a search engine
Input:
A collection of documents, e.g. crawled copy of entire web
doc 31: i am alex
doc 32: alex at 8 am
Output:
alex: 31/3 32/1 ...
am: 31/2 32/4 ...
Map(document file i):
split into words
for each offset j
emit key=word[j] value=i/j
Reduce(word, list of d/o)
emit word, sorted list of d/o
Diagram:
* input partitioned into M splits on GFS: A, B, C, ...
* Maps read local split, produce R local intermediate files (A0, A1 .. AR)
* Reduce # = hash(key) % R
* Reduce task i fetches Ai, Bi, Ci -- from every Map worker
* Sort the fetched files to bring same key together
* Call Reduce function on each key's values
* Write output to GFS
* Master controls all:
Map task list
Reduce task list
Location of intermediate data (which Map worker ran which Map task)
Notice:
Input is huge -- terabytes
Info from all parts of input contributes to each output index entry
So terabytes must be communicated between machines
Output is huge -- terabytes
The main challenge: communication bottleneck
Three kinds of data movement needed:
Read huge input
Move huge intermediate data
Store huge output
How fast can one move data?
RAM: 1000*1 GB/sec = 1000 GB/sec
disk: 1000*0.1 GB/sec = 100 GB/sec
net cross-section: 10 GB/sec
Explain host link b/w vs net cross-section b/w
How to cope with communication bottleneck
Locality: split storage and computation the same way, onto same machines
Because disk and RAM are faster than the network
Batching: move megabytes at a time, not e.g. little key/value puts/gets
Because network latency is a worse problem than network throughput
Programming: let the developer indicate how data should move between machines
Because the most powerful solutions lie with application structure
The big programming idea in MapReduce is the key-driven shuffle
Map function implicitly specifies what and where data is moved -- with keys
Programmer can control movement, but isn't burdened with details
Programs are pretty constrained to help with communication:
Map can only read the local split of input data, for locality and simplicity
Just one batch shuffle per computation
Reduce can only look at one key, for locality and simplicity
Where does MapReduce input come from?
Input is striped+replicated over GFS in 64 MB chunks
But in fact Map always reads from a local disk
They run the Maps on the GFS server that holds the data
Tradeoff:
Good: Map can read at disk speed, much faster than reading over net from GFS server
Bad: only two or three choices of where a given Map can run
potential problem for load balance, stragglers
Where does MapReduce store intermediate data?
On the local disk of the Map server (not in GFS)
Tradeoff:
Good: local disk write is faster than writing over network to GFS server
Bad: only one copy, potential problem for fault-tolerance and load-balance
Where does MapReduce store output?
In GFS, replicated, separate file per Reduce task
So output requires network communication -- slow
The reason: output can then be used as input for subsequent MapReduce
The Question: How soon after it receives the first file of
intermediate data can a reduce worker start calling the application's
Reduce function?
Why does MapReduce postpone choice of which worker runs a Reduce?
After all, might run faster if Map output directly streamed to reduce worker
Dynamic load balance!
If fixed in advance, one machine 2x slower -> 2x delay for whole computation
and maybe the rest of the cluster idle/wasted half the time
Will MR scale?
Will buying 2x machines yield 1/2 the run-time, indefinitely?
Map calls probably scale
2x machines -> each Map's input 1/2 as big -> done in 1/2 the time
but: input may not be infinitely partitionable
but: tiny input and intermediate files have high overhead
Reduce calls probably scale
2x machines -> each handles 1/2 as many keys -> done in 1/2 the time
but: can't have more workers than keys
but: limited if some keys have more values than others
e.g. "the" has vast number of values for inverted index
so 2x machines -> no faster, since limited by key w/ most values
Network may limit scaling, if large intermediate data
Must spend money on faster core switches as well as more machines
Not easy -- a hot R+D area now
Stragglers are a problem, if one machine is slow, or load imbalance
Can't solve imbalance w/ more machines
Start-up time is about a minute!!!
Can't reduce that no matter how many machines you buy (probably makes it worse)
More machines -> more failures
Now let's talk about fault tolerance
The challenge: paper says one server failure per job!
Too frequent for whole-job restart to be attractive
The main idea: Map and Reduce are deterministic and functional,
so MapReduce can deal with failures by re-executing
Often a choice:
Re-execute big tasks, or
Save output, replicate, use small tasks
Best tradeoff depends on frequency of failures and expense of communication
What if a worker fails while running Map?
Can we restart just that Map on another machine?
Yes: GFS keeps copy of each input split on 3 machines
Master knows, tells Reduce workers where to find intermediate files
If a Map finishes, then that worker fails, do we need to re-run that Map?
Intermediate output now inaccessible on worker's local disk.
Thus need to re-run Map elsewhere *unless* all Reduce workers have
already fetched that Map's output.
What if Map had started to produce output, then crashed:
Will some Reduces see Map's output twice?
And thus produce e.g. word counts that are too high?
What if a worker fails while running Reduce?
Where can a replacement worker find Reduce input?
If a Reduce finishes, then worker fails, do we need to re-run?
No: Reduce output is stored+replicated in GFS.
Load balance
What if some Map machines are faster than others?
Or some input splits take longer to process?
Don't want lots of idle machines and lots of work left to do!
Solution: many more input splits than machines
Master hands out more Map tasks as machines finish
Thus faster machines do bigger share of work
But there's a constraint:
Want to run Map task on machine that stores input data
GFS keeps 3 replicas of each input data split
So only three efficient choices of where to run each Map task
Stragglers
Often one machine is slow at finishing very last task
h/w or s/w wedged, overloaded with some other work
Load balance only balances newly assigned tasks
Solution: always schedule multiple copies of very last tasks!
How many Map/Reduce tasks vs workers should we have?
They use M = 10x number of workers, R = 2x.
More => finer grained load balance.
More => less redundant work for straggler reduction.
More => spread tasks of failed worker over more machines, re-execute faster.
More => overlap Map and shuffle, shuffle and Reduce.
Less => big intermediate files w/ less overhead.
M and R also maybe constrained by how data is striped in GFS.
e.g. 64 MByte GFS chunks means M needs to total data size / 64 MBytes
Let's look at paper's performance evaluation
Figure 2 / Section 5.2
Text search for rare 3-char pattern, just Map, no shuffle or reduce
One terabyte of input
1800 machines
Figure 2 x-axis is time, y-axis is input read rate
60 seconds of start-up time are *omitted*! (copying program, opening input files)
Why does it take so long (60 seconds) to reach the peak rate?
Why does it go up to 30,000 MB/s? Why not 3,000 or 300,000?
That's 17 MB/sec per server.
What limits the peak rate?
Figure 3(a) / Section 5.3
sorting a terabyte
Should we be impressed by 800 seconds?
Top graph -- Input rate
Why peak of 10,000 MB/s?
Why less than Figure 2's 30,000 MB/s? (writes disk)
Why does read phase last abt 100 seconds?
Middle graph -- Shuffle rate
How is shuffle able to start before Map phase finishes? (more map tasks than workers)
Why does it peak at 5,000 MB/s? (??? net cross-sec b/w abt 18 GB/s)
Why a gap, then starts again? (runs some Reduce tasks, then fetches more)
Why is the 2nd bump lower than first? (maybe competing w/ overlapped output writes)
Lower graph -- Reduce output rate
How can reduces start before shuffle has finished? (again, shuffle gets all files for some tasks)
Why is output rate so much lower than input rate? (net rather than disk; writes twice to GFS)
Why the gap between apparent end of output and vertical "Done" line? (stragglers?)
What should we buy if we wanted sort to run faster?
Let's guess how much each resource limits performance.
Reading input from disk: 30 GB/sec = 33 seconds (Figure 2)
Map computation: between zero and 150 seconds (Figure 3(a) top)
Writing intermediate to disk: ? (maybe 30 Gb/sec = 33 seconds)
Map->Reduce across net: 5 GB/sec = 200 seconds
Local sort: 2*100 seconds (gap in Figure 3(a) middle)
Writing output to GFS twice: 2.5 GB/sec = 400 seconds
Stragglers: 150 seconds? (Figure 3(a) bottom tail)
The answer: the network accounts for 600 of 850 seconds
Is it disappointing that sort harnesses only a small fraction of cluster CPU power?
After all, only 200 of 800 seconds were spent sorting.
If all they did was sort, they should sell CPUs/disks and buy a faster network.
Modern data centers have relatively faster networks
e.g. FDS's 5.5 terabits/sec cross-section b/w vs MR paper's 150 gigabits/sec
while CPUs are only modestly faster than in MR paper
so today bottleneck might have shifted away from net, towards CPU
For what applications *doesn't* MapReduce work well?
Small updates (re-run whole computation?)
Small unpredictable reads (neither Map nor Reduce can choose input)
Multiple shuffles (can use multiple MR but not very efficient)
In general, data-flow graphs with more than two stages
Iteration (e.g. page-rank)
MapReduce retrospective
Single-handedly made big cluster computation popular
(though coincident with big datacenters, cheap machines, data-oriented companies)
Hadoop is still very popular
Inspired better successors (Spark, DryadLINQ, &c)
</code></pre>