Typically, the only good reason for a map task to run much longer than its peers is if it’s processing a lot more data.
The jobtracker assigns blocks in decreasing order of size to help prevent the whole job waiting on one last mapper. If your input format is non-splitable (eg it’s .bz2
compressed), and some files are huge, those files will take proportionally longer. If these are so imbalanced as to cause trouble, I can’t offer much more than a) don’t write imbalance files, or b) switch to a splitable format.
If some map tasks are very slow, but aren’t processing proportionally more data, look for the following:
-
Check the logs of the slow tasks — did they hit a patch of bad records and spend all their time rescuing a ton of exceptions?
-
Some records take far longer to process than others — for example a complex regexp (regular expression) that requires a lot of backtracking.
-
If the slow tasks always occur on the same machine(s), they might be failing.
-
Check the logs for both the child processes and the daemons — are there timeouts or other errrors?
For jobs with a reduce step, the number of map tasks times the heap size of each mapper should be about twice the size of the input data.
-
A prior stage used one (or very few) reducers
-
You recently enlarged your cluster
-
HDFS is nearly full
-
Overloaded data nodes
Each mapper is responsible for sorting the individual records it emits, allowing the reducers to just do a merge sort. It does so by filing records into a fixed-size sort buffer (the analog of the inbox sorter on the back of each elephant). Each time the sort buffer overflows, the mapper will "spill" it to disk; once finished, it merge sorts those spills to produce its final output. Those merge passes are not cheap — the full map output is re-read and re-written. If the midflight data size is several times your allocable heap, then those extra merge passes are necessary: you should smile at how well Hadoop is leveraging what RAM you have.
However, if a mapper’s midflight size even slightly exceeds the sort buffer size, it will trigger an extra spill, causing a 20-40% performance hit.
On the jobtracker, check that the
If your mapper task is slightly expansive (outputs more data than it reads), you may end up with an output file that for every input block emits one full block and almost-empty block. For example, a task whose output is about 120% of its input will have an output block ratio of 60% — 40% of the disk space is wasted, and downstream map tasks will be inefficient.
-
Check this by comparing (TODO: grab actual terms) HDFS bytes read with mapper output size.
You can check the block ratio of your output dataset with hadoop fsck -blocks
(TODO: capture output)
If your mapper task is expansive and the ratio is less than aobut 60%, you may want to set a min split size of about
Alternatively, turn up the min split size on the next stage, sized so that it
-
Did you set a partition key?
-
Can you use a finer partition key?
-
Can you use a combiner?
-
Are there lots of records with a NULL key?
-
Here’s a great way to get null keys:
j = JOIN a BY akey LEFT OUTER, b by bkey; res = FOREACH j GENERATE bkey AS key, a::aval, b::bval; grouped = GROUP res BY key;
. Whenever there’s a key ina
with no match inb
,bkey
will be null and the finalGROUP
statement will be painful. You probably meant to say… GENERATE akey AS key ….
.
-
-
Does your data have intrinsically high skew? — If records follow a long-tail distribution,
-
Do you have an "LA and New York" problem? If you’re unlucky, the two largest keys might be hashed to the same reducer. Try running with one fewer reducers — it should jostle the keys onto different machines.
If you have irrevocably high skew, Pig offers a skewed
join operator.
If you are generating a huge amount of midflight data for a merely-large amount of reducer output data, you might be a candidate for a better algorithm.
In the graph analytics chapter, we talk about "counting triangles": how many of your friends-of-friends are also direct friends?
More than a million people follow Britney Spears and Justin Bieber on Twitter. If they follow each other (TODO: verify that they do), the "obvious" way of counting shared friends will result in trillions (10^12
) of candidates — but only millions if results. This is an example of "multiplying the short tails" of two distributions. The graph analytics chapter shows you one pattern for handling this.
If you can’t use a better algorithm, then as they said in Jaws: "you’re going to need a bigger boat".
-
don’t use swap — deprovision it.
-
if you’re going to use swap, set swappiness and overcommit
Otherwise, treat the presence of any significant swap activity as if the system were Out of Memory / No C+B reserve.
Your system is out of memory if any of the following occurs:
-
there is no remaining reserve for system caches+buffers
-
significant swap activity
-
OOM killer (the operating system’s Out Of Memory handler) kills processes
For Hadoop’s purposes, if the OS has no available space for caches+buffers, it has already run out of system RAM — even if it is not yet swapping or OOMing
-
check overcommit
You may have to reduce slots, or reduce heap per slot.
-
Unless your map task is CPU-intensive, mapper task throughput should be comparable to baseline throughput.
-
The number of non-local map tasks is small.
-
Map tasks take more than a minute or so.
-
Either 'Spilled bytes' and 'mapper output bytes' are nearly equal, or 'Spilled bytes' is three or more times 'mapper output bytes'.
-
The size of each output file is not close-to-but-above the HDFS block size
-
enough disk space
-
hadoop native libraries are discovered (
org.apache.hadoop.util.NativeCodeLoader: Loaded the native-hadoop library
appears in the logs). -
midstream data uses snappy compression (
org.apache.hadoop.io.compress.snappy.LoadSnappy: Snappy native library is available
appears in the logs).