Skip to content

Latest commit

 

History

History
193 lines (126 loc) · 13.7 KB

ba2a-simple_stream.asciidoc

File metadata and controls

193 lines (126 loc) · 13.7 KB

The Stream (ch. B)

Chimpanzee and Elephant Start a Business

As you know, chimpanzees love nothing more than sitting at typewriters processing and generating text. Elephants have a prodigious ability to store and recall information, and will carry huge amounts of carge with great determination. The chimpanzees and the elephants realized there was a real business opportunity from combining their strengths, and so they formed the Chimpanzee and Elephant Data Shipping Corporation.

They were soon hired by a publishing firm to translate the works of Shakespeare into every language. In the system they set up, each chimpanzee sits at a typewriter doing exactly one thing well: read a set of passages, and type out the corresponding text in a new language. Each elephant has a pile of books, which she breaks up into "blocks" (a consecutive bundle of pages, tied up with string).

A Simple Streamer

We’re hardly as clever as one of these multilingual chimpanzees, but even we can translate text into Pig Latin. For the unfamiliar, you turn standard English into Pig Latin as follows:

  • If the word begins with a consonant-sounding letter or letters, move them to the end of the word adding "ay": "happy" becomes "appy-hay", "chimp" becomes "imp-chay" and "yes" becomes "es-yay".

  • In words that begin with a vowel, just append the syllable "way": "another" becomes "another-way", "elephant" becomes "elephant-way".

Pig Latin translator, actual version is a program to do that translation. It’s written in Wukong, a simple library to rapidly develop big data analyses. Like the chimpanzees, it is single-concern: there’s nothing in there about loading files, parallelism, network sockets or anything else. Yet you can run it over a text file from the commandline or run it over petabytes on a cluster (should you somehow have a petabyte crying out for pig-latinizing).

Pig Latin translator, actual version
    CONSONANTS   = "bcdfghjklmnpqrstvwxz"
    UPPERCASE_RE = /[A-Z]/
    PIG_LATIN_RE = %r{
      \b                  # word boundary
      ([#{CONSONANTS}]*)  # all initial consonants
      ([\w\']+)           # remaining wordlike characters
      }xi

    each_line do |line|
      latinized = line.gsub(PIG_LATIN_RE) do
        head, tail = [$1, $2]
        head       = 'w' if head.blank?
        tail.capitalize! if head =~ UPPERCASE_RE
        "#{tail}-#{head.downcase}ay"
      end
      yield(latinized)
    end
Pig Latin translator, pseudocode
    for each line,
      recognize each word in the line and change it as follows:
        separate the head consonants (if any) from the tail of the word
	if there were no initial consonants, use 'w' as the head
        give the tail the same capitalization as the word
        change the word to "{tail}-#{head}ay"
      end
      emit the latinized version of the line
    end
Ruby helper
  • The first few lines define "regular expressions" selecting the initial characters (if any) to move. Writing their names in ALL CAPS makes them be constants.

  • Wukong calls the each_line do …​ end block with each line; the |line| part puts it in the line variable.

  • the gsub ("globally substitute") statement calls its do …​ end block with each matched word, and replaces that word with the last line of the block.

  • yield(latinized) hands off the latinized string for wukong to output

To test the program on the commandline, run

wu-local examples/text/pig_latin.rb data/magi.txt -

The last line of its output should look like

Everywhere-way ey-thay are-way isest-way. Ey-thay are-way e-thay agi-may.

So that’s what it looks like when a cat is feeding the program data; let’s see how it works when an elephant is setting the pace.

Chimpanzee and Elephant: A Day at Work

Each day, the chimpanzee’s foreman, a gruff silverback named J.T., hands each chimp the day’s translation manual and a passage to translate as they clock in. Throughout the day, he also coordinates assigning each block of pages to chimps as they signal the need for a fresh assignment.

Some passages are harder than others, so it’s important that any elephant can deliver page blocks to any chimpanzee — otherwise you’d have some chimps goofing off while others are stuck translating King Lear into Kinyarwanda. On the other hand, sending page blocks around arbitrarily will clog the hallways and exhaust the elephants.

The elephants' chief librarian, Nanette, employs several tricks to avoid this congestion.

Since each chimpanzee typically shares a cubicle with an elephant, it’s most convenient to hand a new page block across the desk rather then carry it down the hall. J.T. assigns tasks accordingly, using a manifest of page blocks he requests from Nanette. Together, they’re able to make most tasks be "local".

Second, the page blocks of each play are distributed all around the office, not stored in one book together. One elephant might have pages from Act I of Hamlet, Act II of The Tempest, and the first four scenes of Troilus and Cressida [1]. Also, there are multiple 'replicas' (typically three) of each book collectively on hand. So even if a chimp falls behind, JT can depend that some other colleague will have a cubicle-local replica. (There’s another benefit to having multiple copies: it ensures there’s always a copy available. If one elephant is absent for the day, leaving her desk locked, Nanette will direct someone to make a xerox copy from either of the two other replicas.)

Nanette and J.T. exercise a bunch more savvy optimizations (like handing out the longest passages first, or having folks who finish early pitch in so everyone can go home at the same time, and more). There’s no better demonstration of power through simplicity.

Running a Hadoop Job

Note: this assumes you have a working Hadoop cluster, however large or small.

As you’ve surely guessed, Hadoop is organized very much like the Chimpanzee & Elephant team. Let’s dive in and see it in action.

First, copy the data onto the cluster:

hadoop fs -mkdir ./data
hadoop fs -put   wukong_example_data/text ./data/

These commands understand ./data/text to be a path on the HDFS, not your local disk; the dot . is treated as your HDFS home directory (use it as you would ~ in Unix.). The wu-put command, which takes a list of local paths and copies them to the HDFS, treats its final argument as an HDFS path by default, and all the preceding paths as being local.

First, let’s test on the same tiny little file we used at the commandline. Make sure to notice how much longer it takes this elephant to squash a flea than it took to run without hadoop.

wukong launch examples/text/pig_latin.rb ./data/text/magi.txt ./output/latinized_magi

After outputting a bunch of happy robot-ese to your screen, the job should appear on the jobtracker window within a few seconds. The whole job should complete in far less time than it took to set it up. You can compare its output to the earlier by running

hadoop fs -cat ./output/latinized_magi/\*

Now let’s run it on the full Shakespeare corpus. Even this is hardly enough data to make Hadoop break a sweat, but it does show off the power of distributed computing.

wukong launch examples/text/pig_latin.rb ./data/text/magi.txt ./output/latinized_magi

Brief Anatomy of a Hadoop Job

We’ll go into much more detail in (TODO: ref), but here are the essentials of what you just performed.

Copying files to the HDFS

When you ran the hadoop fs -mkdir command, the Namenode (Nanette’s Hadoop counterpart) simply made a notation in its directory: no data was stored. If you’re familiar with the term, think of the namenode as a 'File Allocation Table (FAT)' for the HDFS.

When you run hadoop fs -put …​, the putter process does the following for each file:

  1. Contacts the namenode to create the file. This also just makes a note of the file; the namenode doesn’t ever have actual data pass through it.

  2. Instead, the putter process asks the namenode to allocate a new data block. The namenode designates a set of datanodes (typically three), along with a permanently-unique block ID.

  3. The putter process transfers the file over the network to the first data node in the set; that datanode transfers its contents to the next one, and so forth. The putter doesn’t consider its job done until a full set of replicas have acknowledged successful receipt.

  4. As soon as each HDFS block fills, even if it is mid-record, it is closed; steps 2 and 3 are repeated for the next block.

Running on the cluster

Now let’s look at what happens when you run your job.

(TODO: verify this is true in detail. @esammer?)

  • Runner: The program you launch sends the job and its assets (code files, etc) to the jobtracker. The jobtracker hands a job_id back (something like job_201204010203_0002 — the datetime the jobtracker started and the count of jobs launched so far); you’ll use this to monitor and if necessary kill the job.

  • Jobtracker: As tasktrackers "heartbeat" in, the jobtracker hands them a set of 'task’s — the code to run and the data segment to process (the "split", typically an HDFS block).

  • Tasktracker: each tasktracker launches a set of 'mapper child processes', each one an 'attempt' of the tasks it received. (TODO verify:) It periodically reassures the jobtracker with progress and in-app metrics.

  • Jobtracker: the Jobtracker continually updates the job progress and app metrics. As each tasktracker reports a complete attempt, it receives a new one from the jobtracker.

  • Tasktracker: after some progress, the tasktrackers also fire off a set of reducer attempts, similar to the mapper step.

  • Runner: stays alive, reporting progress, for the full duration of the job. As soon as the job_id is delivered, though, the Hadoop job itself doesn’t depend on the runner — even if you stop the process or disconnect your terminal the job will continue to run.

Warning

Please keep in mind that the tasktracker does not run your code directly — it forks a separate process in a separate JVM with its own memory demands. The tasktracker rarely needs more than a few hundred megabytes of heap, and you should not see it consuming significant I/O or CPU.

Chimpanzee and Elephant: Splits

I’ve danced around a minor but important detail that the workers take care of. For the Chimpanzees, books are chopped up into set numbers of pages — but the chimps translate sentences, not pages, and a page block boundary might happen mid-sentence.

The Hadoop equivalent of course is that a data record may cross and HDFS block boundary. (In fact, you can force map-reduce splits to happen anywhere in the file, but the default and typically most-efficient choice is to split at HDFS blocks.)

A mapper will skip the first record of a split if it’s partial and carry on from there. Since there are many records in each split, that’s no big deal. When it gets to the end of the split, the task doesn’t stop processing until is completes the current record — the framework makes the overhanging data seamlessley appear.

In practice, Hadoop users only need to worry about record splitting when writing a custom InputFormat or when practicing advanced magick. You’ll see lots of reference to it though — it’s a crucial subject for those inside the framework, but for regular users the story I just told is more than enough detail.

Exercises

Exercise 1.1: Running time

It’s important to build your intuition about what makes a program fast or slow.

Write the following scripts:

  • null.rb  — emits nothing.

  • identity.rb  — emits every line exactly as it was read in.

Let’s run the reverse.rb and piglatin.rb scripts from this chapter, and the null.rb and identity.rb scripts from exercise 1.1, against the 30 Million Wikipedia Abstracts dataset.

First, though, write down an educated guess for how much longer each script will take than the null.rb script takes (use the table below). So, if you think the reverse.rb script will be 10% slower, write '10%'; if you think it will be 10% faster, write '- 10%'.

Next, run each script three times, mixing up the order. Write down

  • the total time of each run

  • the average of those times

  • the actual percentage difference in run time between each script and the null.rb script

    script     | est % incr | run 1 | run 2 | run 3 | avg run time | actual % incr |
    null:      |            |       |       |       |              |               |
    identity:  |            |       |       |       |              |               |
    reverse:   |            |       |       |       |              |               |
    pig_latin: |            |       |       |       |              |               |

Most people are surprised by the result.

Exercise 1.2: A Petabyte-scale wc command

Create a script, wc.rb, that emit the length of each line, the count of bytes it occupies, and the number of words it contains.

Notes:

  • The String methods chomp, length, bytesize, split are useful here.

  • Do not include the end-of-line characters (\n or \r) in your count.

  • As a reminder — for English text the byte count and length are typically similar, but the funny characters in a string like "Iñtërnâtiônàlizætiøn" require more than one byte each. The character count says how many distinct 'letters' the string contains, regardless of how it’s stored in the computer. The byte count describes how much space a string occupies, and depends on arcane details of how strings are stored.


1. Does that sound complicated? It is — Nanette is able to keep track of all those blocks, but if she calls in sick, nobody can get anything done. You do NOT want Nanette to call in sick.