First let’s assemble our data model:
link:code/server_logs/server_logs-00-model-base.rb[role=include]
We should also add a method to the model to parse each line. It just breaks up each line into fields and passes them in order to the Logline
model:
link:code/server_logs/server_logs-01-parse-script.rb[role=include]
Now the parse script is simple as can be: take each line, hand it to Logline
to parse, and emit the model object that returns. We don’t need to do anything more, so there’s no reducer:
link:code/server_logs/server_logs-01-parse-script.rb[role=include]
There’s a couple more gory details of parsing each log line, but trust me on them for a moment, and let’s run this script. First, in local map mode; here’s lines 5000-5010 of the logs:
link:code/server_logs/server_logs-00-parse-00-mapper.log[role=include]
I’ve used the //x
form of regular expression — this ignores the whitespace and treats #
as introducing a comment. So at the cost of having to explicitly use \s
for "space", we get a very readable program:
link:code/server_logs/server_logs-00-model-regexp.rb[role=include]
The second important detail concerns parsing the time.
link:code/server_logs/server_logs-00-model-detail.rb[role=include]
-
Don’t try to be a hero and get everything done in one regexp. You’ve gone from gallant to goofus for sure if you start using backreferences. Here, we use the first regexp to just separate the date-time string from the herd, then finish it in a clean, quiet place.
-
See wire receiver pattern for why we use
receive_visit_time
rather than override the setter (visit_time=
) method-
A receive method must either call
super(val)
to save the value, or callwrite_attribute
directly.
-
-
Generic parsing of times can be shockingly slow. The version you see above came after profiling the script verified it was better to
-
The main regexp and the one in
receive_visit_time
are good brittle.-
spells out HTTP, specifies digits
\d
etc -
anchored at the beginning and end, using multi-line anchors (
^
and$
match on a newline;\A
and\z
match strictly at the beginning and end of a string).
-
When do people visit the site?
link:code/server_logs/server_logs-02-histograms-mapper.rb[role=include]
We want to group on day_hr
, so just add a method implementing (and documenting!) that domain knowledge.
link:code/server_logs/server_logs-00-model-day_hr.rb[role=include]
This is the advantage of having a model and not just a passive sack of data.
Run it in map mode:
link:code/server_logs/server_logs-02-histograms-02-mapper-wu-lign-sort.log[role=include]
TODO: digression about wu-lign
.
Sort and save the map output; then write and debug your reducer.
link:code/server_logs/server_logs-02-histograms-full.rb[role=include]
When things are working, this is what you’ll see. Notice that the …/Star_Wars_Kid.wmv
file already have five times the pageviews as the site root (/
).
link:code/server_logs/server_logs-02-histograms-03-reduce.log[role=include]
You’re ready to run the script in the cloud! Fire it off and you’ll see dozens of workers start processing the data.
link:code/server_logs/server_logs-02-histograms-04-freals.log[role=include]
Now let’s do some stuff more challenging that you’d try in a normal relational DB.
NOTE:[What are the important locality feature(s) to group on?]
spit out [ip, day_hr, visit_time, path]
.
link:code/server_logs/server_logs-03-breadcrumbs-full.rb[role=include]
run it in map mode:
link:code/server_logs/server_logs-02-histograms-01-mapper.log[role=include]
link:code/server_logs/server_logs-03-breadcrumbs-02-mapper.log[role=include]
group on user
link:code/server_logs/server_logs-03-breadcrumbs-03-reducer.log[role=include]
We use the secondary sort so that each visit is in strict order of time within a session.
You might ask why that is necessary — surely each mapper reads the lines in order? Yes, but you have no control over what order the mappers run, or where their input begins and ends.
This script will accumulate multiple visits of a page.
TODO: say more about the secondary sort.
What can you do with the sessionized logs? Well, each row lists a user on the left and a bunch of pages on the right.
We’ve been thinking about that as a table, but it’s also a graph!
link:code/server_logs/server_logs-04-page_page_edges-full.rb[role=include]
link:code/server_logs/server_logs-04-page_page_edges-03-reducer.log[role=include]
You’ll learn more about this in the chapter on Processing Graphs, but
You can learn a lot about your site’s audience in aggregate by mapping IP addresses to geolocation. Not just in itself, but joined against other datasets, like census data, store locations, weather and time. [1]
Maxmind makes their GeoLite IP-to-geo database available under an open license (CC-BY-SA)[2]. Out of the box, its columns are beg_ip
, end_ip
, location_id
, where the first two columns show the low and high ends (inclusive) of a range that maps to that location. Every address lies in at most one range; locations may have multiple ranges.
This arrangement caters to range queries in a relational database, but isn’t suitable for our needs. A single IP-geo block can span thousands of addresses.
To get the right locality, take each range and break it at some block level. Instead of having 1.2.3.4
to 1.2.5.6
on one line, let’s use the first three quads (first 24 bits) and emit rows for 1.2.3.4
to 1.2.3.255
, 1.2.4.0
to 1.2.4.255
, and 1.2.5.0
to 1.2.5.6
. This lets us use the first segment as the partition key, and the full ip address as the sort key.
lines bytes description file 15_288_766 1_094_541_688 24-bit partition key maxmind-geolite_city-20121002.tsv 2_288_690 183_223_435 16-bit partition key maxmind-geolite_city-20121002-16.tsv 2_256_627 75_729_432 original (not denormalized) GeoLiteCity-Blocks.csv
This is a generally-applicable approach for doing range queries.
-
Choose a regular interval, fine enough to avoid skew but coarse enough to avoid ballooning the dataset size.
-
Whereever a range crosses an interval boundary, split it into multiple records, each filling or lying within a single interval.
-
Emit a compound key of
[interval, join_handle, beg, end]
, where-
interval
is -
join_handle
identifies the originating table, so that records are grouped for a join (this is what ensures If the interval is transparently a prefix of the index (as it is here), you can instead just ship the remainder:[interval, join_handle, beg_suffix, end_suffix]
.
-
-
Use the
In the geodata section, the "quadtile" scheme is (if you bend your brain right) something of an extension on this idea — instead of splitting ranges on regular intervals, we’ll split regions on a regular grid scheme.
Hadoop is engineered to consume the full capacity of every available resource up to the currently-limiting one. So in general, you should never issue requests against external services from a Hadoop job — one-by-one queries against a database; crawling web pages; requests to an external API. The resulting load spike will effectively be attempting what web security folks call a "DDoS", or distributed denial of service attack.
Unless of course you are trying to test a service for resilience against an adversarial DDoS — in which case that assault is a feature, not a bug!
require 'faraday' processor :elephant_stampede do def process(logline) beg_at = Time.now.to_f resp = Faraday.get url_to_fetch(logline) yield summarize(resp, beg_at) end def summarize(resp, beg_at) duration = Time.now.to_f - beg_at bytesize = resp.body.bytesize { duration: duration, bytesize: bytesize } end def url_to_fetch(logline) logline.url end end flow(:mapper){ input > parse_loglines > elephant_stampede }
You must use Wukong’s eventmachine bindings to make more than one simultaneous request per mapper.