When you launch a job (with pig
, wukong run
, or hadoop jar
), it starts a local process that
-
prepares a synthesized configuration from config files of the program and the machine (
core-site.xml
,hdfs-site.xml
,mapred-site.xml
). -
asks the jobtracker for a job ID
-
pushes your program and its assets (jars, script files, distributed cache contents) into the job’s directory on the HDFS.
-
asks the jobtracker enqueue the job.
After a few seconds you should see the job appear on the jobtracker interface. The jobtracker will begin dispatching the job to workers with free slots, as directed by its scheduler [1]. It knows where all the input blocks are, and will try to launch each task on the same machine as its input ("bring the compute to the data"). The jobtracker will tell you how many map tasks are "local" (launched on a different machine than its input); if it’s not harmlessly small, see [many_non_local_mappers].
The launching process doesn’t take many resources, so for a development cluster it’s OK to launch a job from a worker machine. Terminating the launch process won’t affect the job execution, but its output is useful. To record its output even if you log off, use the nohup
command:
nohup [...normal launch command...] >> /tmp/my_job-`date +%F`.log 2>&1 &
Run tail -f /tmp/my_job-*.log
to keep watching the job’s progress.
Note
|
The job draws its default configuration from the launch machine’s config file. Make sure those defaults doesn’t conflict with appropriate values for the workers that will actually execute the job! One great way to screw this up is to launch a job from your dev machine, go to dinner and come back to find it using one reducer and a tiny heap size. Another is to start your job from a master that is provisioned differently from the workers. |
Input files are split and assigned to mappers.
Each mapper will receive a chunk bounded by:
-
The file size — normally, each mapper handles at most one file (and typically, one part of a very large file). (footnote: Pig will pre-combine small files into single map inputs with the
pig.splitCombination
commandline parameter.) -
Min split size — up to the size of each file, you can force hadoop to make each split larger than
mapred.min.split.size
-
Block size — the natural unit of data to feed each map task is the size of an HDFS file chunk; this is what lets Hadoop "bring the compute to the data".
-
Input format — some input formats are non-splittable (by necessity, as for some compression formats; or by choice, when you want to enforce no file splits). [2]
Exercises:
-
Create a 2GB file having a 128MB block size on the HDFS. Run
wu-stream cat cat --min_split_mb=1900
on it. How many map tasks will launch? What will the "non-local" cell on the jobtracker report? Try it out for 1900, and also for values of 128, 120, 130, 900 and 1100.
If it’s a Hadoop "streaming" job (Wukong, MrJob, etc), the child process is a Java jar that itself hosts your script file:
-
it forks the script in a new process. The child ulimit applies to this script, but the heap size and other child process configs do not.
-
passes all the Hadoop configs as environment variables, changing
.
dots to_
underbars. Some useful examples:-
map_input_file
— the file this task is processing -
map_input_start
— the offset within that file -
mapred_tip_id
— the task ID. This is a useful ingredient in a unique key, or if for some reason you want each mapper’s output to go to a distinct reducer partition.
-
-
directs its input to the script’s
STDIN
. Not all input formats are streaming-friendly. -
anything the script sends to its
STDOUT
becomes the jar’s output.
forks yet another
Once the maps start, it’s normal for them to seemingly sit at 0% progress for a little while: they don’t report back until a certain abount of data has passed through. Annoyingly, jobs with gzipp’ed input will remain mute until they are finished (and then go instantly from 0 to 100%).
exercise: Write a mapper that ignores its input but emits a configurable number of bytes, with a configurable number of bytes per line. Run it with one mapper and one reducer. Compare what happens when the output is just below, and just above, each of these thresholds: - the HDFS block size - the mapper sortbuf spill threshold - the mapper sortbuf data threshold - the mapper sortbuf total threshold
For jobs that have a reducer, the total size of the output dataset divided by the number of reducers implies the size of your output files [3]. Of course your working dataset is less than a few hundred MB this doesn’t matter.
If your working set is large enough to care and less than about 10 TB, size your reduce set for files of about 1 to 2 GB.
-
Number of mappers: by default, Hadoop will launch one mapper per HDFS block; it won’t assign more than one file to each mapper [4]. More than a few thousand
-
Reducer efficiency: as explained later (TODO: ref reducer_size), your reducers are most efficient at 0.5 to 2 GB.
-
HDFS block size:
>=
1-2 GB — a typically-seen hadoop block size is 128 MB; as you’ll see later, there’s a good case for even larger block sizes. You’d like each file to hold 4 or more blocks. -
your network connection (
<
4GB): a mid-level US internet connection will download a 4 GB file segment in about 10 minutes, upload it in about 2 hours. -
a DVD:
<
4 GB — A DVD holds about 4GB. I don’t know if you use DVDs still, but it’s a data point. -
Cloud file stores:
<
5 GB — The Amazon S3 system now allows files greater than 5 GB, but it requires a special multi-part upload transfer. -
Browsability: a 1 GB file has about a million 1kB records.
Even if you don’t find any of those compelling enough to hang your hat on, I’ll just say that files of 2 GB are large enough to be efficient and small enough to be manageable; they also avoid those upper limits even with natural variance in reduce sizes.
If your dataset is
There’s a tradeoff:
If you set your min-split-size larger than your block size, you’ll get non-local map tasks, which puts a load on your network.
However, if you let it launch one job per block, you’ll have two problems. First, one mapper per HDFS block can cause a large number of tasks: a 1 TB input dataset of 128 MB HDFS blocks requires 8,000 map tasks. Make sure your map task runtimes aren’t swamped by job startup times and that your jobtracker heap size has been configured to handle that job count. Secondly, if your job is ever-so-slightly expansive — if it turns a 128 MB input block into a 130 MB output file — then you will double the block count of the dataset. It takes twice the actual size to store on disk and implies twice the count of mappers in subsequent stages.
My recommendation: (TODO: need to re-confirm with numbers; current readers please take with a grain of salt.)
To learn more, see the