Skip to content

Commit

Permalink
Merge pull request #10 from tilezen/zerebubuth/outliers
Browse files Browse the repository at this point in the history
Add outliers command
  • Loading branch information
zerebubuth authored Oct 12, 2018
2 parents b53a04a + 28f26bc commit 87b57a5
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 30 deletions.
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Current scoville commands:
* `proxy`: Serves a treemap visualisation of tiles on a local HTTP server.
* `percentiles`: Calculate the percentile tile sizes for a set of MVT tiles.
* `heatmap`: Serves a heatmap visualisation of tile sizes on a local HTTP server.
* `outliers`: Calculates the tiles with the largest per-layer sizes.

### Info command ###

Expand Down Expand Up @@ -126,6 +127,40 @@ This will run a server on [localhost:8000](http://localhost:8000) by default (us
![Screenshot of the heatmap server](doc/heatmap_screenshot.png)


### Outliers command ###

This calculates the largest tiles on a per-layer basis. For example, when run on a list of 1,000 frequently accessed tiles:

```
scoville outliers -j 4 --cache top-1000-tiles.txt 'https://tile.nextzen.org/tilezen/vector/v1/512/all/{z}/{x}/{y}.mvt?api_key=YOUR_API_KEY'
```

It gives something like the following:

```
Layer 'boundaries'
67474 https://tile.nextzen.org/tilezen/vector/v1/512/all/5/17/11.mvt?api_key=YOUR_API_KEY
68731 https://tile.nextzen.org/tilezen/vector/v1/512/all/0/0/0.mvt?api_key=YOUR_API_KEY
92467 https://tile.nextzen.org/tilezen/vector/v1/512/all/4/8/5.mvt?api_key=YOUR_API_KEY
Layer 'buildings'
359667 https://tile.nextzen.org/tilezen/vector/v1/512/all/13/3035/4647.mvt?api_key=YOUR_API_KEY
372929 https://tile.nextzen.org/tilezen/vector/v1/512/all/13/3034/4647.mvt?api_key=YOUR_API_KEY
408946 https://tile.nextzen.org/tilezen/vector/v1/512/all/13/3033/4647.mvt?api_key=YOUR_API_KEY
Layer 'earth'
94603 https://tile.nextzen.org/tilezen/vector/v1/512/all/7/62/44.mvt?api_key=YOUR_API_KEY
98898 https://tile.nextzen.org/tilezen/vector/v1/512/all/7/68/40.mvt?api_key=YOUR_API_KEY
110378 https://tile.nextzen.org/tilezen/vector/v1/512/all/0/0/0.mvt?api_key=YOUR_API_KEY
Layer 'landuse'
191312 https://tile.nextzen.org/tilezen/vector/v1/512/all/9/263/170.mvt?api_key=YOUR_API_KEY
196733 https://tile.nextzen.org/tilezen/vector/v1/512/all/9/262/170.mvt?api_key=YOUR_API_KEY
271852 https://tile.nextzen.org/tilezen/vector/v1/512/all/9/263/169.mvt?api_key=YOUR_API_KEY
...
```

For each layer, it calculates the tiles which use the most bytes for that layer. The top tile URLs are listed, grouped by layer, with each line showing the size of the layer and the URL. Further investigation can be done by pasting the tile URL into the `info` command.

By default, it outputs the top 3 tiles, but this can be changed with the `-n` command line option. Runs can be parallelised by using the `-j` option, and cached using the `--cache` option (useful if this is not a one-off, and you might run several commands against the same tile set).

## Install on Ubuntu:

```
Expand Down
5 changes: 5 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
click
enum34
requests
requests_futures
squarify
msgpack
Pillow
44 changes: 42 additions & 2 deletions scoville/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,21 @@ def info(mvt_file, kind, d3_json):
treemap visualisation.
"""

with open(mvt_file, 'r') as fh:
tile = Tile(fh.read())
if mvt_file.startswith('http://') or \
mvt_file.startswith('https://'):
import requests

res = requests.get(mvt_file)
if res.status_code == 200:
tile = Tile(res.content)
else:
click.echo("Failed to fetch tile, status was %r" %
(res.status_code,))
return

else:
with open(mvt_file, 'r') as fh:
tile = Tile(fh.read())

sizes = {}
for layer in tile:
Expand Down Expand Up @@ -274,6 +287,33 @@ def colour_map(size):
serve_http(url, port, heatmap)


@cli.command()
@click.argument('tiles_file', required=1)
@click.argument('url', required=1)
@click.option('--cache/--no-cache', default=False, help='Use a cache for '
'tiles. Can speed up multiple runs considerably.')
@click.option('--nprocs', '-j', default=1, type=int, help='Number of '
'processes to use to download and do tile size aggregation.')
@click.option('--num-outliers-per-layer', '-n', type=int, default=3,
help='Number of outliers for each layer to report on.')
def outliers(tiles_file, url, cache, nprocs, num_outliers_per_layer):
"""
From the distribution of tile coordinates given in TILES_FILE and fetched
from the URL pattern, pull out some of the outlier tiles which have the
largest sizes in each layer.
"""

from scoville.percentiles import calculate_outliers

tiles = read_urls(tiles_file, url)
result = calculate_outliers(tiles, num_outliers_per_layer, cache, nprocs)

for name in sorted(result.keys()):
click.secho("Layer %r" % name, fg='green', bold=True)
for size, url in sorted(result[name]):
click.echo("%8d %s" % (size, url))


def scoville_main():
cli()

Expand Down
133 changes: 106 additions & 27 deletions scoville/percentiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ def _fetch_cache(url):
return data


def fetch(url, cache=False):
"""
Fetch a tile from url, using cache if cache=True.
"""

if cache:
return _fetch_cache(url)
return _fetch_http(url)


class Aggregator(object):
"""
Core of the algorithm. Fetches tiles and aggregates their total and
Expand All @@ -76,33 +86,72 @@ def add(self, tile_url):
for layer in tile:
self.results[layer.name].append(layer.size)

# encode a message to be sent over the "wire" from a worker to the parent
# process. we use msgpack encoding rather than pickle, as pickle was
# producing some very large messages.
def encode(self):
from msgpack import packb
return packb(self.results)

# special object to tell worker threads to exit
class Sentinel(object):
pass
def merge_decode(self, data):
from msgpack import unpackb
results = unpackb(data)
for k, v in results.iteritems():
self.results[k].extend(v)


class LargestN(object):
"""
Keeps a list of the largest N tiles for each layer.
"""

def __init__(self, num, cache=False):
self.num = num
self.fetch_fn = _fetch_http
if cache:
self.fetch_fn = _fetch_cache

self.results = defaultdict(list)

def _insert(self, name, size, url):
largest = self.results.get(name, [])
largest.append((size, url))
if len(largest) > self.num:
largest.sort(reverse=True)
del largest[self.num:]
self.results[name] = largest

# encode a message to be sent over the "wire" from a worker to the parent
# process. we use msgpack encoding rather than pickle, as pickle was producing
# some very large messages.
def mp_encode(data):
from msgpack import packb
return packb(data)
def add(self, tile_url):
data = self.fetch_fn(tile_url)
tile = Tile(data)
for layer in tile:
self._insert(layer.name, layer.size, tile_url)

def encode(self):
from msgpack import packb
return packb(self.results)

def mp_decode(data):
from msgpack import unpackb
return unpackb(data)
def merge_decode(self, data):
from msgpack import unpackb
results = unpackb(data)
for name, values in results.iteritems():
for size, url in values:
self._insert(name, size, url)


def worker(input_queue, output_queue, cache):
# special object to tell worker threads to exit
class Sentinel(object):
pass


def worker(input_queue, output_queue, factory_fn):
"""
Worker for multi-processing. Reads tasks from a queue and feeds them into
the Aggregator. When all tasks are done it reads a Sentinel and sends the
aggregated result back on the output queue.
"""

agg = Aggregator(cache)
agg = factory_fn()

while True:
obj = input_queue.get()
Expand All @@ -113,10 +162,10 @@ def worker(input_queue, output_queue, cache):
agg.add(obj)
input_queue.task_done()

output_queue.put(mp_encode(agg.results))
output_queue.put(agg.encode())


def parallel(tile_urls, cache, nprocs):
def parallel(tile_urls, factory_fn, nprocs):
"""
Fetch percentile data in parallel, using nprocs processes.
Expand All @@ -132,7 +181,7 @@ def parallel(tile_urls, cache, nprocs):

workers = []
for i in xrange(0, nprocs):
w = Process(target=worker, args=(input_queue, output_queue, cache))
w = Process(target=worker, args=(input_queue, output_queue, factory_fn))
w.start()
workers.append(w)

Expand All @@ -148,21 +197,19 @@ def parallel(tile_urls, cache, nprocs):

# after we've queued the Sentinels, each worker should output an aggregated
# result on the output queue.
result = defaultdict(list)
agg = factory_fn()
for i in xrange(0, nprocs):
worker_result = mp_decode(output_queue.get())
for k, v in worker_result.iteritems():
result[k].extend(v)
agg.merge_decode(output_queue.get())

# and the worker should have exited, so we can clean up the processes.
for w in workers:
w.join()

return result
return agg.results


def sequential(tile_urls, cache):
agg = Aggregator(cache)
def sequential(tile_urls, factory_fn):
agg = factory_fn()
for tile_url in tile_urls:
agg.add(tile_url)
return agg.results
Expand All @@ -183,19 +230,51 @@ def calculate_percentiles(tile_urls, percentiles, cache, nprocs):
larger number to make concurrent nework requests for tiles.
"""

# check that the input values are in the range we need
for p in percentiles:
assert 0 <= p <= 100

def factory_fn():
return Aggregator(cache)

if nprocs > 1:
results = parallel(tile_urls, cache, nprocs)
results = parallel(tile_urls, factory_fn, nprocs)
else:
results = sequential(tile_urls, cache)
results = sequential(tile_urls, factory_fn)

pct = {}
for label, values in results.iteritems():
values.sort()
pcts = []
for p in percentiles:
i = int(len(values) * p / 100.0)
i = min(len(values) - 1, int(len(values) * p / 100.0))
pcts.append(values[i])

pct[label] = pcts

return pct


def calculate_outliers(tile_urls, num_outliers, cache, nprocs):
"""
Fetch tiles and calculate the outlier tiles per layer.
The number of outliers is per layer - the largest N.
Cache, if true, uses a local disk cache for the tiles. This can be very
useful if re-running percentile calculations.
Nprocs is the number of processes to use for both fetching and aggregation.
Even on a system with a single CPU, it can be worth setting this to a
larger number to make concurrent nework requests for tiles.
"""

def factory_fn():
return LargestN(num_outliers, cache)

if nprocs > 1:
results = parallel(tile_urls, factory_fn, nprocs)
else:
results = sequential(tile_urls, factory_fn)

return results
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@
zip_safe=False,
install_requires=[
'click',
'PIL',
'requests',
'requests_futures',
'squarify',
'msgpack',
'Pillow',
],
entry_points=dict(
console_scripts=[
Expand Down

0 comments on commit 87b57a5

Please sign in to comment.