Skip to content

ISSNet summer school

mrry edited this page Jul 14, 2011 · 25 revisions

This wiki page hosts the instructions for the CIEL practical session at the ISSNet summer school, on 14/7/2011 in Calgary, AB. I will be updating this page during the session to correct any errors or omissions.

Connecting to your personal cluster

You should have received an email containing a link to the details of your personal cluster and access credentials. Let me know if you did not receive it, and I will provide it by other means. The following steps will connect you to your personal cluster:

N.B. The URLs to your key are password protected. To use this with curl, you will need to type http://USERNAME:PASSWORD@HOSTNAME/PATH, substituting appropriately.

    laptop$ mkdir issnet-ciel
    laptop$ cd issnet-ciel
    laptop$ curl URL_TO_YOUR_PRIVATE_KEY > ec2-key
    laptop$ chmod 600 ec2-key
    laptop$ curl URL_TO_YOUR_HOSTNAMES_FILE > ec2-hostnames
    laptop$ master=`head -n 1 ec2-hostnames`
    laptop$ scp -i ec2-key ec2-hostnames ec2-key root@$master:/opt/skywriting/
    laptop$ ssh -i ec2-key root@$master
    master# 

You are now connected to an EC2 instance, running Ubuntu 10.04. Since this is a virtual machine, you have full root access, so you may want to install some additional packages using apt-get install. (For example, I typically install emacs and lynx on the master, but your taste may vary.)

Launching a CIEL cluster

Given a list of hostnames, the CIEL cluster management scripts (sw-launch-cluster, sw-update-cluster and sw-kill-cluster) are designed to launch the master on the first host, and workers on the remainder of the hosts. You can run the cluster management scripts from any host that has a private key for accessing your cluster hosts, but in this tutorial I will assume that you are running them from the master (i.e. the host named on the first line of your hostnames file).

To launch your cluster, type the following (assuming that your current directory is /opt/skywriting):

    master# scripts/sw-launch-cluster -f ec2-hostnames -i ec2-key

You should see some initial logging output on your terminal. To confirm that your cluster has started successfully, wait a few seconds then type the following:

    master# curl http://localhost:8000/control/worker/ | grep netloc | wc -l

The result should be the number of workers in your cluster (i.e. the number of lines in ec2-hostnames, minus one for the master). At this point, if the result is zero, there is a problem with your configuration, so let me know. If the result is fewer than you were expecting, carry on for the moment, because you should still be able to run tasks on some of the machines, and I will take a look at your setup later on.

It may be necessary to update your cluster software during the session. In that case, use the following commands:

    master# scripts/sw-kill-cluster -f ec2-hostnames -i ec2-key
    ... wait a few seconds ...
    master# scripts/sw-update-cluster -f ec2-hostnames -i ec2-key
    ... wait a few seconds, until the messages from git stop appearing ...
    master# scripts/sw-launch-cluster -f ec2-hostnames -i ec2-key

Hello world in Skywriting

The simplest Skywriting script returns a value directly. Create a file called helloworld.sw with the following contents:

    return "Hello, world!";

To run the script, type the following command:

    master# scripts/sw-job -m http://`hostname -f`:8000/ helloworld.sw

The sw-job script is a utility for launching simple Skywriting scripts that do not have any external dependencies. We will use it to launch the Skywriting scripts in the remainder of this section. Typing the -m flag each time rapidly becomes tedious, so you can avoid it by setting the CIEL_MASTER environment variable:

    master# export CIEL_MASTER=http://`hostname -f`:8000/
    master# scripts/sw-job helloworld.sw

A more-realistic Skywriting script will spawn() one or more tasks. Create a file called helloworld2.sw with the following contents:

    function hello(who) {
        return "Hello, " + who + "!";
    }

    salutation = spawn(hello, ["world"]);

    return *salutation;

Using non-Skywriting executors

The next example shows how a script can make use of non-Skywriting executors. Create a file called linecount.sw with the following contents (but don't attempt to run it just yet):

    include "stdinout";
    lines = stdinout([package("input1")], ["wc", "-l"]);
    return *lines;

The stdinout function invokes the stdinout executor to integrate legacy applications in a CIEL job. The function takes a list of /references/, and a command line (as a list of strings). In the above example, the single input reference is the value of package("input1"), and the task executes wc -l on that reference. (When more than one reference is specified, the contents of those references are concatenated together, as if they were multiple inputs to cat. The implementation of the stdinout function can be found in src/sw/stdlib/stdinout) Since the result of wc -l is an integer (and hence valid JSON), we can dereference it using the * operator.

How does Skywriting resolve package("input1")? The answer is that it must be supplied as an /external reference/. In general, the job package mechanism is used to provide a key-value dictionary of files, URLs, lists of files and lists of URLs. However, in this case, we can use the sw-job -p option to define the package manually:

    master# apt-get install wcanadian-insane wbritish-insane wamerican
    master# scripts/sw-job linecount.sw -p input1 /usr/share/dict/words

Let's now try a script that performs some analysis in parallel. Create a file called linecount-parallel.sw with the following contents:

    include "stdinout";
    inputs = [package("input1"), package("input2"), package("input3")];

    results = [];
    for (input in inputs) {
        results += stdinout([input], ["wc", "-l"]);
    }
  
    total = 0;
    for (count in results) {
        total += *count;
    }

    return total;
    master# scripts/sw-job linecount-parallel.sw -p input1 /usr/share/dict/canadian-english-insane \
                                                 -p input2 /usr/share/dict/british-english-insane \
                                                 -p input3 /usr/share/dict/american-english

The above script is a simple example of a MapReduce coordination pattern: the lines are counted in parallel, then reduced (added together). In the following section, we will see how to build a more general form of MapReduce from scratch.

Implementing MapReduce

We have just seen how to implement a simple form of MapReduce using a combination of command-line utilities and Skywriting scripts. However, in general, this combination does not offer a sufficiently comprehensive collection of libraries etc. for developing realistic applications. In this section, you are going to implement a fuller version of the MapReduce, using Python. (If you aren't happy using Python, you can use any programming language of your choice for this part of the practical. However, you will have to implement the simple interface for accessing task inputs and outputs.

I have provided a simple test harness that creates the appropriate task graph, in the examples/MapReduce directory. The mapreduce.sw script is as follows (note that you should not have to edit this, at least at first):

include "mapreduce";
include "environ";
include "stdinout";
include "sync";

These include statements allow functions from the "standard library" to be used in the script. The corresponding implementations are in src/sw/stdlib/*.

function make_environ_map_task(num_reducers) {
    return function(map_input) {
        return environ([map_input], [package("map-bin")], num_reducers);
    };
}

function make_environ_reduce_task() {
    return function(reduce_inputs) {
        return environ(reduce_inputs, [package("reduce-bin")], 1)[0];
    };
}

These higher-order functions are constructors for the map and reduce tasks. They define the template for each kind of task: a map task takes a single input and produces num_reducers outputs; a reduce task takes many inputs (one per map task) and produces a single output. The environ function (and env executor) are similar to the stdinout version, except that they provide individual access to multiple inputs and outputs, by using the environment variables as an indirection.

inputs = *package("input-files");
num_reducers = int(env["NUM_REDUCERS"]);

These are the input parameters. Notably, package("input-files") is a reference to a /list of references/, which makes it simpler to include variable-length inputs.

results = mapreduce(inputs, make_environ_map_task(num_reducers), make_environ_reduce_task(), num_reducers);

catted_results = stdinout(results, ["/bin/cat"]);

return sync([catted_results]);

These statements perform the execution: first a MapReduce graph is built using the task constructors, then the results of the reduce tasks are catted together, and finally a sync task is used to force execution of the whole graph.

To execute a MapReduce-style job, you need to invoke a package (rather than the script directly). The is done as follows:

master# scripts/sw-start-job examples/MapReduce/mapreduce.pack \
                             PATH_TO_MAPPER_EXECUTABLE \
                             PATH_TO_REDUCER_EXECUTABLE \
                             PATH_TO_INPUT_INDEX \
                             [NUMBER_OF_REDUCERS=1]

The PATH_TO_INPUT_INDEX is the name of a file containing a list of filenames (one per line), for each of which a mapper will be created. The mapper and reducer executables can be simple Python scripts, for example:

master# echo /usr/share/dict/american-english > dicts.txt
master# echo /usr/share/dict/british-english-insane >> dicts.txt
master# echo /usr/share/dict/canadian-english-insane >> dicts.txt
master# scripts/sw-start-job examples/MapReduce/mapreduce.pack \
                             examples/MapReduce/src/python/count_lines.py \
                             examples/MapReduce/src/python/total.py \
                             dicts.txt

This performs the same calculation as the example using stdinout and Skywriting. However, you have a lot more flexibility as to what you can do in the Python scripts.

At this point, try writing Python mapper and reducer scripts to perform a more interesting calculation. Using examples/MapReduce/src/python/count_lines.py and examples/MapReduce/src/python/total.py as starting points, try to write a program that calculates the probability of different /k/-grams in a text corpus, where /k/ is a constant number of characters. In the mapper, you will have to do the following:

  1. Loop through the input file, identifying all of the subsequences of length /k/, and update a data structure holding the counts.
  2. Serialise the data structure (perhaps as plain text, to simplify matters) to one or more of the mapper's outputs.

In the reducer, you will have to do the following:

  1. Read and deserialise the data structures from each mapper, and update a global data structure.
  2. Serialise the data structure (as plain text) to the single reducer output.

examples/MapReduce/src/python/count_lines.py illustrates how to read from inputs and write to outputs: fcpy.inputs is a list of file-like objects that are open for reading, and fcpy.outputs is a list of file-like objects that are open for writing. Since you are dealing with text, the fcpy.inputs[i].readlines() function will be useful for reading input, and the fcpy.outputs[i].write() function will be useful for writing output.

By now, you should know enough about Skywriting and CIEL to implement your own applications. If any time remains, you might want to try the following exercises:

  • A real MapReduce provides an emit() function that takes an arbitrary key-value pair and writes it out to the appropriate reducer (one of many). Can you implement the data structures that are necessary to do this?
  • It is common to pre-sort the output from the mappers in order to lower the memory requirements at the reducers (allowing them to perform a streaming mergesort). Try to implement this in your emit() function and reducer.
  • Often, the reducer performs a commutative and associative aggregation, which means that the reduce function can be applied early to mapper outputs---this is usually called a /combiner/. Can you implement a combiner for your mapper tasks?
  • CIEL supports a greater diversity of patterns than simple MapReduce. For example, it is possible for MapReduce tasks to have more than one input. By modifying the Skywriting script mapreduce.sw, can you add a secondary data set to the mappers? This would allow you to perform a Cartesian product, which underlies many data analysis algorithms, such as arbitrary joins. Can you implement other join algorithms, such as hash-joins and broadcast joins?
  • The key feature of CIEL is its ability to perform data-dependent control flow. Therefore, it is often used for algorithms that iterate until a convergence criterion is reached. Can you implement an iterative algorithm using your MapReduce primitives? For example, try generating a random graph, then implementing the PageRank or Single-Source Shortest Paths algorithms on it.

The fcpy interface

If you want to implement this part of the practical in another language, you will need to implement the equivalent of the fcpy interface:

import os
import sys

# Task inputs are accessed through fcpy.inputs; similarly for outputs.
inputs = None
outputs = None

def init():
    """Must be called at the start of a task to configure the environment."""
    global inputs, outputs
    
    try:
        # $INPUT_FILES contains a single filename; similarly for $OUTPUT_FILES.
        input_files = os.getenv("INPUT_FILES")
        output_files = os.getenv("OUTPUT_FILES")
        
        if input_files is None or output_files is None:
            raise KeyError()
        
        # Each line of `cat $INPUT_FILES` is a filename corresponding to one of the task inputs.
        with open(input_files) as f:
            inputs = [open(x.strip(), 'r') for x in f.readlines()]
            
        # Each line of `cat $OUTPUT_FILES` is a filename corresponding to one of the task outputs.
        with open(output_files) as f:
            outputs = [open(x.strip(), 'w') for x in f.readlines()]