Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Industry openstreetmap benchmarks #484

Draft
wants to merge 3 commits into
base: branch-25.02
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions examples/SQL+DF-Examples/open-street-map/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Open Street Map Benchmarks
[OpenStreetMap (OSM)](https://www.openstreetmap.org/copyright/en) is a collaborative, free,
and open-source digital map of the world. It allows users to contribute to the creation and editing
of maps through a simple, web-based interface. Anyone can contribute to the project by mapping areas
they are familiar with, either through manual surveys, satellite imagery, or aerial photographs.

Users can perform analyses by running Spark ETL jobs based on datasets that include geospatial analysis,
traffic flow, socioeconomic analysis, environmental analysis, route optimization, etc. The queries on it
can stray into terabytes, [Spark Rapids](https://nvidia.github.io/spark-rapids/) can speed up the
entire analysis process.

Steps to prepare the dataset:


Performance (CPU VS GPU) and env:
IMG


Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "62787244",
"metadata": {},
"source": [
"# Open Street Map Benchmarks\n",
"Below benchmarks show case how user do analysis based on OpenStreetMap dataset."
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "1c3a15d7",
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import SparkSession\n",
"from pyspark.conf import SparkConf\n",
"from time import time\n",
"RAPIDS_JAR = \"/usr/lib/spark/jars/rapids-4-spark_2.12-24.10.0.jar\"\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "0c3536ad",
"metadata": {},
"outputs": [],
"source": [
"def runQuery(spark, appName, query, retryTimes):\n",
" count = 0\n",
" total_time = 0\n",
" while count < retryTimes:\n",
" start = time()\n",
" spark.sql(query).show(5)\n",
" end = time()\n",
" total_time += round(end - start, 2)\n",
" count = count + 1\n",
" print(\"Retry times : {}, \".format(count) + appName + \" benchmark takes {} seconds\".format(round(end - start, 2)))"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "975717da",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Setting default log level to \"WARN\".\n",
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
"25/01/20 10:36:07 INFO SparkEnv: Registering MapOutputTracker\n",
"25/01/20 10:36:07 INFO SparkEnv: Registering BlockManagerMaster\n",
"25/01/20 10:36:07 INFO SparkEnv: Registering BlockManagerMasterHeartbeat\n",
"25/01/20 10:36:07 INFO SparkEnv: Registering OutputCommitCoordinator\n",
"25/01/20 10:36:08 WARN RapidsPluginUtils: RAPIDS Accelerator 24.10.0 using cudf 24.10.0, private revision bd4e99e18e20234ee0c54f95f4b0bfce18a6255e\n",
"25/01/20 10:36:08 WARN RapidsPluginUtils: spark.rapids.sql.multiThreadedRead.numThreads is set to 20.\n",
"25/01/20 10:36:08 WARN RapidsPluginUtils: RAPIDS Accelerator is enabled, to disable GPU support set `spark.rapids.sql.enabled` to false.\n",
"25/01/20 10:36:08 WARN RapidsPluginUtils: spark.rapids.sql.explain is set to `NOT_ON_GPU`. Set it to 'NONE' to suppress the diagnostics logging about the query placement on the GPU.\n",
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"--------------------------------------------------\n"
]
}
],
"source": [
"# Common spark settings\n",
"conf = SparkConf()\n",
"conf.setMaster(\"yarn\")\n",
"conf.setAppName(\"Open Street Map Benchmarks\")\n",
"conf.set(\"spark.executor.instances\",\"2\")\n",
"conf.set(\"spark.executor.cores\", \"16\") \n",
"conf.set(\"spark.driver.memory\", \"10g\")\n",
"conf.set(\"spark.driver.cores\", \"1\")\n",
"conf.set(\"spark.sql.parquet.binaryAsString\",\"true\")\n",
"## The tasks will run on GPU memory, so there is no need to set a high host memory\n",
"conf.set(\"spark.executor.memory\", \"16g\")\n",
"\n",
"conf.set(\"spark.locality.wait\", \"0\")\n",
"conf.set(\"spark.rapids.sql.reader.batchSizeBytes\",\"512m\")\n",
"conf.set(\"spark.dynamicAllocation.enabled\", \"false\") \n",
"\n",
"# Plugin settings\n",
"conf.set(\"spark.executor.resource.gpu.amount\", \"1\")\n",
"# 4 tasks will run concurrently per GPU\n",
"conf.set(\"spark.rapids.sql.concurrentGpuTasks\", \"2\")\n",
"# Pinned 8g host memory to transfer data between GPU and host memory\n",
"conf.set(\"spark.rapids.memory.pinnedPool.size\", \"4G\")\n",
"# 16 tasks will run concurrently per executor, as we set spark.executor.cores=16\n",
"conf.set(\"spark.task.resource.gpu.amount\", \"0.0625\") \n",
"conf.set(\"spark.rapids.sql.enabled\", \"true\") \n",
"conf.set(\"spark.plugins\", \"com.nvidia.spark.SQLPlugin\")\n",
"conf.set(\"spark.driver.extraClassPath\", RAPIDS_JAR)\n",
"conf.set(\"spark.executor.extraClassPath\", RAPIDS_JAR)\n",
"conf.set(\"spark.jars\", RAPIDS_JAR)\n",
"# Create spark session\n",
"spark = SparkSession.builder.config(conf=conf).getOrCreate()\n",
"# Load dataframe and create tempView\n",
"# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/history_changesets/\").createOrReplaceTempView(\"history_changesets\")\n",
"# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/history_layers/\").createOrReplaceTempView(\"history_layers\")\n",
"# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/history_nodes/\").createOrReplaceTempView(\"history_nodes\")\n",
"# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/history_relations/\").createOrReplaceTempView(\"history_relations\")\n",
"# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/history_ways/\").createOrReplaceTempView(\"history_ways\")\n",
"# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_changesets/\").createOrReplaceTempView(\"planet_changesets\")\n",
"# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_features/\").createOrReplaceTempView(\"planet_features\")\n",
"# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_features_lines/\").createOrReplaceTempView(\"planet_features_lines\")\n",
"# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_features_multilinestrings/\").createOrReplaceTempView(\"planet_features_multilinestrings\")\n",
"# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_features_multipolygons/\").createOrReplaceTempView(\"planet_features_multipolygons\")\n",
"# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_features_other_relations/\").createOrReplaceTempView(\"planet_features_other_relations\")\n",
"# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_features_points/\").createOrReplaceTempView(\"planet_features_points\")\n",
"# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_layers/\").createOrReplaceTempView(\"planet_layers\")\n",
"# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_nodes/\").createOrReplaceTempView(\"planet_nodes\")\n",
"# spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_relations/\").createOrReplaceTempView(\"planet_relations\")\n",
"spark.read.parquet(\"gs://bigquery_public_data_for_spark/geo_openstreetmap/planet_ways/\").createOrReplaceTempView(\"planet_ways\")\n",
"print(\"-\"*50)"
]
},
{
"cell_type": "markdown",
"id": "7136eb63",
"metadata": {},
"source": [
"### Calculate the building numbers over 5 levels"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "dd12d749",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"25/01/20 10:37:35 WARN GpuOverrides: \n",
"!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it\n",
" @Partitioning <SinglePartition$> could run on GPU\n",
"\n",
"25/01/20 10:37:35 WARN GpuOverrides: \n",
"!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it\n",
" @Partitioning <SinglePartition$> could run on GPU\n",
"\n",
"25/01/20 10:37:35 WARN GpuOverrides: \n",
"!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it\n",
" @Partitioning <SinglePartition$> could run on GPU\n",
"\n",
"25/01/20 10:38:12 WARN GpuOverrides: 80][Stage 2:==============>(279 + 1) / 280]\n",
"!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it\n",
" @Partitioning <SinglePartition$> could run on GPU\n",
"\n",
"25/01/20 10:40:11 WARN GpuOverrides: ==========================>(279 + 1) / 280]\n",
"!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it\n",
" @Partitioning <SinglePartition$> could run on GPU\n",
"\n",
"25/01/20 10:40:11 WARN GpuOverrides: \n",
"!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it\n",
" @Partitioning <SinglePartition$> could run on GPU\n",
"\n",
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---------+-------+-------------------+---------+-------------------+\n",
"| id|version| username|changeset| osm_timestamp|\n",
"+---------+-------+-------------------+---------+-------------------+\n",
"|109568528| 10| Stalker61| 91879126|2020-10-02 18:25:07|\n",
"|236002978| 3| MaksimTo| 54555625|2017-12-12 03:43:01|\n",
"| 85820500| 4| Achoin| 50043141|2017-07-04 19:46:51|\n",
"|505004057| 5| mbouzada| 58562021|2018-04-30 17:31:20|\n",
"|252765823| 1|catastro_pontevedra| 19552118|2013-12-20 16:01:44|\n",
"+---------+-------+-------------------+---------+-------------------+\n",
"only showing top 5 rows\n",
"\n",
"Retry times : 1, 5 levels building numbers benchmark takes 159.22 seconds\n"
]
}
],
"source": [
"query = \"\"\"\n",
"WITH exploded_tags AS (\n",
" SELECT id, version, username, changeset, osm_timestamp, tag.key AS tag_key, tag.value AS tag_value\n",
" FROM planet_ways\n",
" LATERAL VIEW explode(all_tags) AS tag\n",
")\n",
"\n",
"SELECT id, version, username, changeset, osm_timestamp\n",
"FROM exploded_tags\n",
"WHERE tag_key = 'building' \n",
" AND EXISTS (\n",
" SELECT 1\n",
" FROM exploded_tags AS inner_tags\n",
" WHERE inner_tags.id = exploded_tags.id \n",
" AND inner_tags.tag_key = 'building:levels' \n",
" AND CAST(inner_tags.tag_value AS INT) > 5\n",
" )\n",
"\n",
"\"\"\"\n",
"runQuery(spark,\"5 levels building numbers\",query,1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d9b3d5db",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.8"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading