Skip to content

Commit

Permalink
Merge pull request #155 from hotosm/feature/tm_support
Browse files Browse the repository at this point in the history
Feature : Tasking Manger Download Support
  • Loading branch information
kshitijrajsharma authored Nov 1, 2023
2 parents f17c4b9 + 4c5c53e commit 0e3f579
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 101 deletions.
39 changes: 35 additions & 4 deletions API/api_worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import pathlib
import re
import shutil
import time
import zipfile
Expand All @@ -12,6 +13,7 @@
from src.config import ALLOW_BIND_ZIP_FILTER
from src.config import CELERY_BROKER_URL as celery_broker_uri
from src.config import CELERY_RESULT_BACKEND as celery_backend
from src.config import ENABLE_TILES
from src.config import USE_S3_TO_UPLOAD as use_s3_to_upload
from src.config import logger as logging
from src.query_builder.builder import format_file_name_str
Expand All @@ -36,6 +38,13 @@ def process_raw_data(self, params):
if params.output_type
else RawDataOutputType.GEOJSON.value
)
if ENABLE_TILES:
if (
params.output_type == RawDataOutputType.PMTILES.value
or params.output_type == RawDataOutputType.MBTILES.value
):
logging.debug("Using STwithin Logic")
params.use_st_within = True
params.file_name = (
format_file_name_str(params.file_name) if params.file_name else "Export"
)
Expand Down Expand Up @@ -65,15 +74,37 @@ def process_raw_data(self, params):
logging.debug("Zip Binding Done !")
else:
for file_path in pathlib.Path(working_dir).iterdir():
upload_file_path = file_path
inside_file_size += os.path.getsize(file_path)
break # only take one file inside dir , if contains many it should be inside zip
if file_path.is_file() and file_path.name.endswith(
params.output_type.lower()
):
upload_file_path = file_path
inside_file_size += os.path.getsize(file_path)
break # only take one file inside dir , if contains many it should be inside zip
# check if download url will be generated from s3 or not from config
if use_s3_to_upload:
file_transfer_obj = S3FileTransfer()
upload_name = exportname if params.uuid else f"Recurring/{exportname}"
if exportname.startswith("hotosm_project"): # TM
if not params.uuid:
pattern = r"(hotosm_project_)(\d+)"
match = re.match(pattern, exportname)
if match:
prefix = match.group(1)
project_number = match.group(2)
if project_number:
upload_name = f"TM/{project_number}/{exportname}"
elif exportname.startswith("hotosm_"): # HDX
if not params.uuid:
pattern = r"hotosm_([A-Z]{3})_(\w+)"
match = re.match(pattern, exportname)
if match:
iso3countrycode = match.group(1)
if iso3countrycode:
upload_name = f"HDX/{iso3countrycode}/{exportname}"

download_url = file_transfer_obj.upload(
upload_file_path,
exportname if params.uuid else f"Recurring/{exportname}",
upload_name,
file_suffix="zip" if bind_zip else params.output_type.lower(),
)
else:
Expand Down
25 changes: 24 additions & 1 deletion API/raw_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@
from src.config import LIMITER as limiter
from src.config import RATE_LIMIT_PER_MIN as export_rate_limit
from src.config import logger as logging
from src.validation.models import RawDataCurrentParams, SnapshotResponse, StatusResponse
from src.validation.models import (
RawDataCurrentParams,
RawDataCurrentParamsBase,
SnapshotResponse,
StatusResponse,
)

from .api_worker import process_raw_data

Expand Down Expand Up @@ -434,6 +439,24 @@ def get_osm_current_snapshot_as_file(
return JSONResponse({"task_id": task.id, "track_link": f"/tasks/status/{task.id}/"})


@router.get("/snapshot/plain/", response_model=FeatureCollection)
@version(1)
def get_osm_current_snapshot_as_plain_geojson(
request: Request, params: RawDataCurrentParamsBase
):
"""Generates the Plain geojson for the polygon within 100 Sqkm and returns the result right away
Args:
request (Request): _description_
params (RawDataCurrentParamsBase): Same as /snapshot excpet multiple output format options and configurations
Returns:
Featurecollection: Geojson
"""
result = RawData(params).extract_plain_geojson()
return result


@router.get("/countries/", response_model=FeatureCollection)
@version(1)
def get_countries(q: str = ""):
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
| GeoPackage | :heavy_check_mark: |
| PGDUMP | :heavy_check_mark: |
| GeoJSON | :heavy_check_mark: |
| Pmtiles | :heavy_check_mark: |
| Mbtiles | :heavy_check_mark: |

## Installation

Expand Down Expand Up @@ -90,6 +92,9 @@ Setup the necessary configurations for Raw Data API from [configurations](./docs

Setup config.txt in project root.

## Optional : For Tiles Output
If you opt for tiles output and have ```ENABLE_TILES : True``` in env variable . Make sure you install [Tippecanoe] (https://github.com/mapbox/tippecanoe)

### Start the Server

```
Expand Down
7 changes: 7 additions & 0 deletions backend/sql/null_index.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE INDEX CONCURRENTLY ways_poly_country_idx_null ON public.ways_poly USING gin (country gin__int_ops) WHERE country <@ ARRAY[0];

CREATE INDEX CONCURRENTLY ways_line_country_idx_null ON public.ways_line USING gin (country gin__int_ops) WHERE country <@ ARRAY[0];

CREATE INDEX CONCURRENTLY nodes_country_idx_null ON public.nodes USING gin (country gin__int_ops) WHERE country <@ ARRAY[0];

CREATE INDEX CONCURRENTLY relations_country_idx_null ON public.relations USING gin (country gin__int_ops) WHERE country <@ ARRAY[0];
2 changes: 2 additions & 0 deletions docs/src/installation/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The following are the different configuration options that are accepted.
| `EXPORT_MAX_AREA_SQKM` | `EXPORT_MAX_AREA_SQKM` | `[API_CONFIG]` | `100000` | max area in sq. km. to support for rawdata input | OPTIONAL |
| `USE_CONNECTION_POOLING` | `USE_CONNECTION_POOLING` | `[API_CONFIG]` | `false` | Enable psycopg2 connection pooling | OPTIONAL |
| `ALLOW_BIND_ZIP_FILTER` | `ALLOW_BIND_ZIP_FILTER` | `[API_CONFIG]` | `true` | Enable zip compression for exports | OPTIONAL |
| `ENABLE_TILES` | `ENABLE_TILES` | `[API_CONFIG]` | `false` | Enable Tile Output (Pmtiles and Mbtiles) | OPTIONAL |
| `INDEX_THRESHOLD` | `INDEX_THRESHOLD` | `[API_CONFIG]` | `5000` | Area in sqkm to apply grid/country index filter | OPTIONAL |
| `CELERY_BROKER_URL` | `CELERY_BROKER_URL` | `[CELERY]` | `redis://localhost:6379/0` | Redis connection string for the broker | OPTIONAL |
| `CELERY_RESULT_BACKEND` | `CELERY_RESULT_BACKEND` | `[CELERY]` | `redis://localhost:6379/0` | Redis connection string for the the result backend | OPTIONAL |
Expand Down Expand Up @@ -72,6 +73,7 @@ The following are the different configuration options that are accepted.
| `EXPORT_PATH` | `[API_CONFIG]` | Yes | Yes |
| `EXPORT_MAX_AREA_SQKM` | `[API_CONFIG]` | Yes | No |
| `USE_CONNECTION_POOLING` | `[API_CONFIG]` | Yes | Yes |
| `ENABLE_TILES` | `[API_CONFIG]` | Yes | Yes |
| `ALLOW_BIND_ZIP_FILTER` | `[API_CONFIG]` | Yes | Yes |
| `INDEX_THRESHOLD` | `[API_CONFIG]` | No | Yes |
| `CELERY_BROKER_URL` | TBD | Yes | Yes |
Expand Down
119 changes: 77 additions & 42 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@
from psycopg2 import OperationalError, connect
from psycopg2.extras import DictCursor

from src.config import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, BUCKET_NAME
from src.config import (
AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY,
BUCKET_NAME,
ENABLE_TILES,
)
from src.config import EXPORT_PATH as export_path
from src.config import INDEX_THRESHOLD as index_threshold
from src.config import USE_CONNECTION_POOLING as use_connection_pooling
Expand Down Expand Up @@ -321,18 +326,29 @@ def ogr_export(query, outputtype, working_dir, dump_temp_path, params):
with open(query_path, "w", encoding="UTF-8") as file:
file.write(query)
# for mbtiles we need additional input as well i.e. minzoom and maxzoom , setting default at max=22 and min=10
if outputtype == RawDataOutputType.MBTILES.value:
cmd = """ogr2ogr -overwrite -f MBTILES -dsco MINZOOM={min_zoom} -dsco MAXZOOM={max_zoom} {export_path} PG:"host={host} user={username} dbname={db} password={password}" -sql @"{pg_sql_select}" -lco ENCODING=UTF-8 -progress""".format(
min_zoom=params.min_zoom,
max_zoom=params.max_zoom,
export_path=dump_temp_path,
host=db_items.get("host"),
username=db_items.get("user"),
db=db_items.get("dbname"),
password=db_items.get("password"),
pg_sql_select=query_path,
)
run_ogr2ogr_cmd(cmd)
if ENABLE_TILES:
if outputtype == RawDataOutputType.MBTILES.value:
if params.min_zoom and params.max_zoom:
cmd = """ogr2ogr -overwrite -f MBTILES -dsco MINZOOM={min_zoom} -dsco MAXZOOM={max_zoom} {export_path} PG:"host={host} user={username} dbname={db} password={password}" -sql @"{pg_sql_select}" -lco ENCODING=UTF-8 -progress""".format(
min_zoom=params.min_zoom,
max_zoom=params.max_zoom,
export_path=dump_temp_path,
host=db_items.get("host"),
username=db_items.get("user"),
db=db_items.get("dbname"),
password=db_items.get("password"),
pg_sql_select=query_path,
)
else:
cmd = """ogr2ogr -overwrite -f MBTILES -dsco ZOOM_LEVEL_AUTO=YES {export_path} PG:"host={host} user={username} dbname={db} password={password}" -sql @"{pg_sql_select}" -lco ENCODING=UTF-8 -progress""".format(
export_path=dump_temp_path,
host=db_items.get("host"),
username=db_items.get("user"),
db=db_items.get("dbname"),
password=db_items.get("password"),
pg_sql_select=query_path,
)
run_ogr2ogr_cmd(cmd)

if outputtype == RawDataOutputType.FLATGEOBUF.value:
cmd = """ogr2ogr -overwrite -f FLATGEOBUF {export_path} PG:"host={host} port={port} user={username} dbname={db} password={password}" -sql @"{pg_sql_select}" -lco ENCODING=UTF-8 -progress VERIFY_BUFFERS=NO""".format(
Expand Down Expand Up @@ -469,12 +485,14 @@ def get_grid_id(geom, cur):
)

@staticmethod
def to_geojson_raw(results):
"""Responsible for geojson writing"""
features = [orjson.loads(row[0]) for row in results]
feature_collection = FeatureCollection(features=features)

return feature_collection
def geojson2tiles(geojson_path, tile_path, tile_layer_name):
"""Responsible for geojson to tiles"""
cmd = """tippecanoe -zg --projection=EPSG:4326 -o {tile_output_path} -l {tile_layer_name} {geojson_input_path} --force""".format(
tile_output_path=tile_path,
tile_layer_name=tile_layer_name,
geojson_input_path=geojson_path,
)
run_ogr2ogr_cmd(cmd)

def extract_current_data(self, exportname):
"""Responsible for Extracting rawdata current snapshot, Initially it creates a geojson file , Generates query , run it with 1000 chunk size and writes it directly to the geojson file and closes the file after dump
Expand All @@ -500,25 +518,59 @@ def extract_current_data(self, exportname):
# Create a exports directory because it does not exist
os.makedirs(working_dir)
# create file path with respect to of output type

dump_temp_file_path = os.path.join(
working_dir,
f"{self.params.file_name if self.params.file_name else 'Export'}.{output_type.lower()}",
)
try:
# currently we have only geojson binding function written other than that we have depend on ogr
if ENABLE_TILES:
if output_type == RawDataOutputType.PMTILES.value:
geojson_path = os.path.join(
working_dir,
f"{self.params.file_name if self.params.file_name else 'Export'}.geojson",
)
RawData.query2geojson(
self.con,
raw_currentdata_extraction_query(
self.params,
g_id=grid_id,
c_id=country,
country_export=country_export,
),
geojson_path,
)
RawData.geojson2tiles(
geojson_path, dump_temp_file_path, self.params.file_name
)
if output_type == RawDataOutputType.MBTILES.value:
RawData.ogr_export(
query=raw_currentdata_extraction_query(
self.params,
grid_id,
country,
ogr_export=True,
country_export=country_export,
),
outputtype=output_type,
dump_temp_path=dump_temp_file_path,
working_dir=working_dir,
params=self.params,
) # uses ogr export to export

if output_type == RawDataOutputType.GEOJSON.value:
RawData.query2geojson(
self.con,
raw_currentdata_extraction_query(
self.params,
g_id=grid_id,
c_id=country,
geometry_dump=geometry_dump,
country_export=country_export,
),
dump_temp_file_path,
) # uses own conversion class
elif output_type == RawDataOutputType.SHAPEFILE.value:
if output_type == RawDataOutputType.SHAPEFILE.value:
(
point_query,
line_query,
Expand All @@ -542,13 +594,12 @@ def extract_current_data(self, exportname):
if self.params.file_name
else "Export",
) # using ogr2ogr
else:
if output_type in ["fgb", "gpkg", "sql", "csv"]:
RawData.ogr_export(
query=raw_currentdata_extraction_query(
self.params,
grid_id,
country,
geometry_dump,
ogr_export=True,
country_export=country_export,
),
Expand Down Expand Up @@ -613,24 +664,7 @@ def get_osm_feature(self, osm_id):

def extract_plain_geojson(self):
"""Gets geojson for small area : Performs direct query with/without geometry"""
query = raw_extract_plain_geojson(self.params, inspect_only=True)
self.cur.execute(query)
analyze_fetched = self.cur.fetchall()
rows = list(
filter(lambda x: x.startswith("rows"), analyze_fetched[0][0].split())
)
approx_returned_rows = rows[0].split("=")[1]
logging.debug("Approximated query output : %s", approx_returned_rows)

if int(approx_returned_rows) > 500:
self.cur.close()
RawData.close_con(self.con)
raise HTTPException(
status_code=500,
detail=f"Query returned {approx_returned_rows} rows (This endpoint supports upto 1000) , Use /current-snapshot/ for larger extraction",
)

extraction_query = raw_extract_plain_geojson(self.params)
extraction_query = raw_currentdata_extraction_query(self.params)
features = []

with self.con.cursor(
Expand Down Expand Up @@ -686,11 +720,12 @@ def upload(self, file_path, file_name, file_suffix="zip"):
sample function call :
S3FileTransfer.transfer(file_path="exports",file_prefix="upload_test")"""
file_name = f"{file_name}.{file_suffix}"
logging.debug("Started Uploading %s from %s", file_name, file_path)
# instantiate upload
start_time = time.time()

try:
self.s_3.upload_file(file_path, BUCKET_NAME, file_name)
self.s_3.upload_file(str(file_path), BUCKET_NAME, str(file_name))
except Exception as ex:
logging.error(ex)
raise ex
Expand Down
4 changes: 4 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@
"API_CONFIG", "ALLOW_BIND_ZIP_FILTER", fallback=None
)

ENABLE_TILES = os.environ.get("ENABLE_TILES") or config.get(
"API_CONFIG", "ENABLE_TILES", fallback=None
)

####################

### EXPORT_UPLOAD CONFIG BLOCK
Expand Down
5 changes: 2 additions & 3 deletions src/query_builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,8 @@ def get_country_geojson(c_id):

def raw_currentdata_extraction_query(
params,
g_id,
c_id,
geometry_dump,
g_id=None,
c_id=None,
ogr_export=False,
select_all=False,
country_export=False,
Expand Down
Loading

0 comments on commit 0e3f579

Please sign in to comment.