Skip to content

Latest commit

 

History

History
116 lines (68 loc) · 8 KB

ha1a-hadoop_internals.asciidoc

File metadata and controls

116 lines (68 loc) · 8 KB

Hadoop Execution in Detail

Launch

When you launch a job (with pig, wukong run, or hadoop jar), it starts a local process that

  • prepares a synthesized configuration from config files of the program and the machine (core-site.xml, hdfs-site.xml, mapred-site.xml).

  • asks the jobtracker for a job ID

  • pushes your program and its assets (jars, script files, distributed cache contents) into the job’s directory on the HDFS.

  • asks the jobtracker enqueue the job.

After a few seconds you should see the job appear on the jobtracker interface. The jobtracker will begin dispatching the job to workers with free slots, as directed by its scheduler [1]. It knows where all the input blocks are, and will try to launch each task on the same machine as its input ("bring the compute to the data"). The jobtracker will tell you how many map tasks are "local" (launched on a different machine than its input); if it’s not harmlessly small, see [many_non_local_mappers].

The launching process doesn’t take many resources, so for a development cluster it’s OK to launch a job from a worker machine. Terminating the launch process won’t affect the job execution, but its output is useful. To record its output even if you log off, use the nohup command:

    nohup [...normal launch command...] >> /tmp/my_job-`date +%F`.log 2>&1 &

Run tail -f /tmp/my_job-*.log to keep watching the job’s progress.

Note

The job draws its default configuration from the launch machine’s config file. Make sure those defaults doesn’t conflict with appropriate values for the workers that will actually execute the job! One great way to screw this up is to launch a job from your dev machine, go to dinner and come back to find it using one reducer and a tiny heap size. Another is to start your job from a master that is provisioned differently from the workers.

Split

Input files are split and assigned to mappers.

Each mapper will receive a chunk bounded by:

  • The file size — normally, each mapper handles at most one file (and typically, one part of a very large file). (footnote: Pig will pre-combine small files into single map inputs with the pig.splitCombination commandline parameter.)

  • Min split size — up to the size of each file, you can force hadoop to make each split larger than mapred.min.split.size

  • Block size — the natural unit of data to feed each map task is the size of an HDFS file chunk; this is what lets Hadoop "bring the compute to the data".

  • Input format — some input formats are non-splittable (by necessity, as for some compression formats; or by choice, when you want to enforce no file splits). [2]

Exercises:

  • Create a 2GB file having a 128MB block size on the HDFS. Run wu-stream cat cat --min_split_mb=1900 on it. How many map tasks will launch? What will the "non-local" cell on the jobtracker report? Try it out for 1900, and also for values of 128, 120, 130, 900 and 1100.

Mappers

Hadoop Streaming (Wukong, MrJob, etc)

If it’s a Hadoop "streaming" job (Wukong, MrJob, etc), the child process is a Java jar that itself hosts your script file:

  • it forks the script in a new process. The child ulimit applies to this script, but the heap size and other child process configs do not.

  • passes all the Hadoop configs as environment variables, changing . dots to _ underbars. Some useful examples:

    • map_input_file  — the file this task is processing

    • map_input_start — the offset within that file

    • mapred_tip_id  — the task ID. This is a useful ingredient in a unique key, or if for some reason you want each mapper’s output to go to a distinct reducer partition.

  • directs its input to the script’s STDIN. Not all input formats are streaming-friendly.

  • anything the script sends to its STDOUT becomes the jar’s output.

forks yet another

Once the maps start, it’s normal for them to seemingly sit at 0% progress for a little while: they don’t report back until a certain abount of data has passed through. Annoyingly, jobs with gzipp’ed input will remain mute until they are finished (and then go instantly from 0 to 100%).

exercise: Write a mapper that ignores its input but emits a configurable number of bytes, with a configurable number of bytes per line. Run it with one mapper and one reducer. Compare what happens when the output is just below, and just above, each of these thresholds: - the HDFS block size - the mapper sortbuf spill threshold - the mapper sortbuf data threshold - the mapper sortbuf total threshold

Speculative Execution ======

For exploratory work, it’s worth

Choosing a file size

Jobs with Map and Reduce

For jobs that have a reducer, the total size of the output dataset divided by the number of reducers implies the size of your output files [3]. Of course your working dataset is less than a few hundred MB this doesn’t matter.

If your working set is large enough to care and less than about 10 TB, size your reduce set for files of about 1 to 2 GB.

  • Number of mappers: by default, Hadoop will launch one mapper per HDFS block; it won’t assign more than one file to each mapper [4]. More than a few thousand

  • Reducer efficiency: as explained later (TODO: ref reducer_size), your reducers are most efficient at 0.5 to 2 GB.

  • HDFS block size: >= 1-2 GB — a typically-seen hadoop block size is 128 MB; as you’ll see later, there’s a good case for even larger block sizes. You’d like each file to hold 4 or more blocks.

  • your network connection (< 4GB): a mid-level US internet connection will download a 4 GB file segment in about 10 minutes, upload it in about 2 hours.

  • a DVD: < 4 GB — A DVD holds about 4GB. I don’t know if you use DVDs still, but it’s a data point.

  • Cloud file stores: < 5 GB — The Amazon S3 system now allows files greater than 5 GB, but it requires a special multi-part upload transfer.

  • Browsability: a 1 GB file has about a million 1kB records.

Even if you don’t find any of those compelling enough to hang your hat on, I’ll just say that files of 2 GB are large enough to be efficient and small enough to be manageable; they also avoid those upper limits even with natural variance in reduce sizes.

If your dataset is

Mapper-only jobs

There’s a tradeoff:

If you set your min-split-size larger than your block size, you’ll get non-local map tasks, which puts a load on your network.

However, if you let it launch one job per block, you’ll have two problems. First, one mapper per HDFS block can cause a large number of tasks: a 1 TB input dataset of 128 MB HDFS blocks requires 8,000 map tasks. Make sure your map task runtimes aren’t swamped by job startup times and that your jobtracker heap size has been configured to handle that job count. Secondly, if your job is ever-so-slightly expansive — if it turns a 128 MB input block into a 130 MB output file — then you will double the block count of the dataset. It takes twice the actual size to store on disk and implies twice the count of mappers in subsequent stages.

My recommendation: (TODO: need to re-confirm with numbers; current readers please take with a grain of salt.)

To learn more, see the


1. unless your cluster is heavily used by multiple people, the default scheduler is fine. If fights start breaking out, quickly consult (TODOREF Hadoop Operations) for guidance on the other choices
2. Paraphrasing the Hadoop FAQ, to make a 'non-splittable' FileInputFormat, your particular input-format should return false for the isSplittable call. If you would like the whole file to be a single record, you must also implement a RecordReader interface to do so — the default is LineRecordReader, which splits the file into separate lines. The other, quick-fix option, is to set mapred.min.split.size to large enough value.
3. Large variance in counts of reduce keys not only drives up reducer run times, it causes variance in output sizes; but that’s just insult added to injury. Worry about that before you worry about the target file size.
4. Pig has a special option to roll up small files