From ba042ae261bfdaa2d8df0e20d7a2d4177ce8d415 Mon Sep 17 00:00:00 2001 From: sllynn Date: Tue, 22 Oct 2024 18:16:46 +0100 Subject: [PATCH] disabled checkpointing by default, fixed RST_Write failure when checkpointing disabled, added checkpointing explanation and instructions to documentation --- docs/source/api/raster-functions.rst | 13 ++--- docs/source/usage/raster-checkpointing.rst | 47 +++++++++++++++++++ docs/source/usage/usage.rst | 1 + .../core/raster/gdal/MosaicRasterGDAL.scala | 9 ++-- .../mosaic/expressions/raster/RST_Write.scala | 2 +- .../com/databricks/labs/mosaic/package.scala | 2 +- .../multiread/RasterAsGridReaderTest.scala | 5 +- .../raster/RST_DTMFromGeomsBehaviours.scala | 1 - .../raster/RST_WriteBehaviors.scala | 22 +++++---- 9 files changed, 76 insertions(+), 26 deletions(-) create mode 100644 docs/source/usage/raster-checkpointing.rst diff --git a/docs/source/api/raster-functions.rst b/docs/source/api/raster-functions.rst index c76f98d58..db18f0171 100644 --- a/docs/source/api/raster-functions.rst +++ b/docs/source/api/raster-functions.rst @@ -26,20 +26,15 @@ e.g. :code:`spark.read.format("gdal")` * The Mosaic raster tile schema changed in v0.4.1 to the following: :code:`>`. All APIs that use tiles now follow this schema. - * The function :ref:`rst_maketiles` allows for the raster tile schema to hold either a path pointer (string) - or a byte array representation of the source raster. It also supports optional checkpointing for increased - performance during chains of raster operations. - -Updates to the raster features for 0.4.1 ----------------------------------------- - - * Scala does not have a :code:`df.display()` method while python does. In practice you would most often call - :code:`display(df)` in scala for a prettier output, but for brevity, we write :code:`df.show` in scala. + * Mosaic can write rasters from a DataFrame to a target directory in DBFS using the function :ref:`rst_write` .. note:: For mosaic versions > 0.4.0 you can use the revamped setup_gdal function or new setup_fuse_install. These functions will configure an init script in your preferred Workspace, Volume, or DBFS location to install GDAL on your cluster. See :doc:`Install and Enable GDAL with Mosaic ` for more details. +.. note:: For complex operations and / or working with large rasters, Mosaic offers the option option of employing checkpointing + to write intermediate results to disk. Follow the instructions in :doc:`Checkpointing ` to enable this feature. + Functions ######### diff --git a/docs/source/usage/raster-checkpointing.rst b/docs/source/usage/raster-checkpointing.rst new file mode 100644 index 000000000..5d35ca5ad --- /dev/null +++ b/docs/source/usage/raster-checkpointing.rst @@ -0,0 +1,47 @@ +=================================== +Checkpointing for raster operations +=================================== + +Mosaic offers the ability to checkpoint raster operations to disk. This is useful when working with large rasters and +complex operations that may require multiple stages of computation. Checkpointing can be used to save intermediate results +to the cloud object store, which can be loaded back into memory at a later stage. +This can help to reduce the amount of memory required to store intermediate results, and can also help to improve +performance by reducing the amount of data that needs to be transferred between nodes during wide transformations. + +Checkpointing is enabled by setting the :code:`spark.databricks.labs.mosaic.raster.use.checkpoint` configuration to :code:`true`. +By default, checkpointing is disabled. When checkpointing is enabled, Mosaic will save intermediate results to the checkpoint directory +specified by the :code:`spark.databricks.labs.mosaic.raster.checkpoint` configuration. +The checkpoint directory must be a valid DBFS path (by default it is set to :code:`/dbfs/tmp/mosaic/raster/checkpoint`). + +There are also a number of helper functions that can be used to manage checkpointing in Mosaic. + +The simplest way to enable checkpointing is to specify a checkpoint directory when calling :code:`enable_gdal()` from the Python interface... + + .. code-block:: python + + import mosaic as mos + + mos.enable_mosaic(spark, dbutils) + mos.enable_gdal(spark, checkpoint_dir="/dbfs/tmp/mosaic/raster/checkpoint") + + +... or directly from Scala using :code:`MosaicGDAL.enableGDALWithCheckpoint()`: + + .. code-block:: scala + + import com.databricks.labs.mosaic.functions.MosaicContext + import com.databricks.labs.mosaic.gdal.MosaicGDAL + import com.databricks.labs.mosaic.H3 + import com.databricks.labs.mosaic.JTS + + val mosaicContext = MosaicContext.build(H3, JTS) + import mosaicContext.functions._ + + MosaicGDAL.enableGDALWithCheckpoint(session, "/dbfs/tmp/mosaic/raster/checkpoint") + +Checkpointing can be modified within the Python interface using the functions + + * :code:`update_checkpoint_path(spark: SparkSession, path: str)` + * :code:`set_checkpoint_on(spark: SparkSession)` + * :code:`set_checkpoint_off(spark: SparkSession)` + diff --git a/docs/source/usage/usage.rst b/docs/source/usage/usage.rst index d7c8c3e10..f829ac043 100644 --- a/docs/source/usage/usage.rst +++ b/docs/source/usage/usage.rst @@ -11,4 +11,5 @@ Usage quickstart kepler automatic-sql-registration + raster-checkpointing diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal/MosaicRasterGDAL.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal/MosaicRasterGDAL.scala index 4f4cf50f8..f94886f5d 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal/MosaicRasterGDAL.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal/MosaicRasterGDAL.scala @@ -17,7 +17,7 @@ import org.gdal.osr import org.gdal.osr.SpatialReference import org.locationtech.proj4j.CRSFactory -import java.nio.file.{Files, Paths, StandardCopyOption} +import java.nio.file.{Files, Path, Paths, StandardCopyOption} import java.util.{Locale, Vector => JVector} import scala.collection.JavaConverters.dictionaryAsScalaMapConverter import scala.util.{Failure, Success, Try} @@ -602,7 +602,7 @@ case class MosaicRasterGDAL( } val byteArray = FileUtils.readBytes(readPath) if (dispose) RasterCleaner.dispose(this) - if (readPath != PathUtils.getCleanPath(parentPath)) { + if (readPath != PathUtils.getCleanPath(parentPath) && PathUtils.isTmpLocation(readPath)) { Files.deleteIfExists(Paths.get(readPath)) if (readPath.endsWith(".zip")) { val nonZipPath = readPath.replace(".zip", "") @@ -651,7 +651,10 @@ case class MosaicRasterGDAL( } else { val thisPath = Paths.get(this.path) val fromDir = thisPath.getParent - val toDir = Paths.get(newPath).getParent + val toDir = Paths.get(newPath) match { + case p: Path if Files.isDirectory(p) => p + case p: Path => p.getParent() + } val stemRegex = PathUtils.getStemRegex(this.path) PathUtils.wildcardCopy(fromDir.toString, toDir.toString, stemRegex) if (dispose) RasterCleaner.dispose(this) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_Write.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_Write.scala index b314d5099..8c37dddd8 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_Write.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/RST_Write.scala @@ -44,7 +44,7 @@ case class RST_Write( // serialize data type override def dataType: DataType = { require(dirExpr.isInstanceOf[Literal]) - RasterTileType(expressionConfig.getCellIdType, StringType, useCheckpoint = false) + RasterTileType(expressionConfig.getCellIdType, inputExpr, expressionConfig.isRasterUseCheckpoint) } /** diff --git a/src/main/scala/com/databricks/labs/mosaic/package.scala b/src/main/scala/com/databricks/labs/mosaic/package.scala index 06dfa3264..32f8015c1 100644 --- a/src/main/scala/com/databricks/labs/mosaic/package.scala +++ b/src/main/scala/com/databricks/labs/mosaic/package.scala @@ -23,7 +23,7 @@ package object mosaic { val MOSAIC_RASTER_CHECKPOINT = "spark.databricks.labs.mosaic.raster.checkpoint" val MOSAIC_RASTER_CHECKPOINT_DEFAULT = "/dbfs/tmp/mosaic/raster/checkpoint" val MOSAIC_RASTER_USE_CHECKPOINT = "spark.databricks.labs.mosaic.raster.use.checkpoint" - val MOSAIC_RASTER_USE_CHECKPOINT_DEFAULT = "true" + val MOSAIC_RASTER_USE_CHECKPOINT_DEFAULT = "false" val MOSAIC_RASTER_TMP_PREFIX = "spark.databricks.labs.mosaic.raster.tmp.prefix" val MOSAIC_RASTER_TMP_PREFIX_DEFAULT = "/tmp" val MOSAIC_RASTER_BLOCKSIZE = "spark.databricks.labs.mosaic.raster.blocksize" diff --git a/src/test/scala/com/databricks/labs/mosaic/datasource/multiread/RasterAsGridReaderTest.scala b/src/test/scala/com/databricks/labs/mosaic/datasource/multiread/RasterAsGridReaderTest.scala index 721201eaa..33f42f619 100644 --- a/src/test/scala/com/databricks/labs/mosaic/datasource/multiread/RasterAsGridReaderTest.scala +++ b/src/test/scala/com/databricks/labs/mosaic/datasource/multiread/RasterAsGridReaderTest.scala @@ -5,11 +5,14 @@ import com.databricks.labs.mosaic.core.index.H3IndexSystem import com.databricks.labs.mosaic.functions.MosaicContext import com.databricks.labs.mosaic.test.MosaicSpatialQueryTest import org.apache.spark.sql.test.SharedSparkSessionGDAL +import org.scalatest.Tag import org.scalatest.matchers.must.Matchers.{be, noException} import org.scalatest.matchers.should.Matchers.an import java.nio.file.{Files, Paths} +object ExcludeLocalTag extends Tag("ExcludeIfLocal") + class RasterAsGridReaderTest extends MosaicSpatialQueryTest with SharedSparkSessionGDAL { test("Read netcdf with Raster As Grid Reader") { @@ -33,7 +36,7 @@ class RasterAsGridReaderTest extends MosaicSpatialQueryTest with SharedSparkSess } - test("Read grib with Raster As Grid Reader") { + test("Read grib with Raster As Grid Reader", ExcludeLocalTag) { assume(System.getProperty("os.name") == "Linux") MosaicContext.build(H3IndexSystem, JTS) diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_DTMFromGeomsBehaviours.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_DTMFromGeomsBehaviours.scala index d3afc1788..17eafe14e 100644 --- a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_DTMFromGeomsBehaviours.scala +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_DTMFromGeomsBehaviours.scala @@ -102,7 +102,6 @@ trait RST_DTMFromGeomsBehaviours extends SharedSparkSessionGDAL { $"grid_size_x", $"grid_size_y", $"pixel_size_x", $"pixel_size_y" ).cache() noException should be thrownBy result.collect() - result.select($"tile").show(truncate = false) } def multiRegionTriangulationRasterizeTest(indexSystem: IndexSystem, geometryAPI: GeometryAPI): Unit = { diff --git a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_WriteBehaviors.scala b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_WriteBehaviors.scala index e2dee079b..f65559bbd 100644 --- a/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_WriteBehaviors.scala +++ b/src/test/scala/com/databricks/labs/mosaic/expressions/raster/RST_WriteBehaviors.scala @@ -1,11 +1,11 @@ package com.databricks.labs.mosaic.expressions.raster +import com.databricks.labs.mosaic.{MOSAIC_RASTER_USE_CHECKPOINT, MOSAIC_RASTER_USE_CHECKPOINT_DEFAULT} import com.databricks.labs.mosaic.core.geometry.api.GeometryAPI import com.databricks.labs.mosaic.core.index.IndexSystem import com.databricks.labs.mosaic.functions.MosaicContext import com.databricks.labs.mosaic.utils.FileUtils import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.scalatest.matchers.should.Matchers.{be, convertToAnyShouldWrapper} import java.nio.file.{Files, Paths} @@ -25,7 +25,7 @@ trait RST_WriteBehaviors extends QueryTest { mc.register(sc) import mc.functions._ - val writeDir = "/tmp/mosaic_tmp/write-tile" + val writeDir = "/mnt/mosaic_tmp/write-tile" val writeDirJava = Paths.get(writeDir) Try(FileUtils.deleteRecursively(writeDir, keepRoot = false)) Files.createDirectories(writeDirJava) @@ -42,13 +42,12 @@ trait RST_WriteBehaviors extends QueryTest { .filter(!rst_isempty($"tile")) .select(rst_write($"tile", writeDir)) .first() - .asInstanceOf[GenericRowWithSchema].get(0) - val createInfo1 = gridTiles1.asInstanceOf[GenericRowWithSchema].getAs[Map[String, String]](2) + val createInfo1 = gridTiles1.getStruct(0).getAs[Map[String, String]](2) val path1Java = Paths.get(createInfo1("path")) - Files.list(path1Java.getParent).count() should be (1) - Try(FileUtils.deleteRecursively(writeDir, keepRoot = false)) + Files.list(path1Java.getParent).count() shouldBe 1 + FileUtils.deleteRecursively(writeDir, keepRoot = false) Files.createDirectories(writeDirJava) Files.list(Paths.get(writeDir)).count() should be (0) @@ -63,16 +62,19 @@ trait RST_WriteBehaviors extends QueryTest { |) |select rst_write(tile, '$writeDir') as result |from subquery - |where not rst_isempty(tile) |""".stripMargin) .first() - .asInstanceOf[GenericRowWithSchema].get(0) - val createInfo2 = gridTilesSQL.asInstanceOf[GenericRowWithSchema].getAs[Map[String, String]](2) + val createInfo2 = gridTilesSQL.getStruct(0).getAs[Map[String, String]](2) val path2Java = Paths.get(createInfo2("path")) // should equal 2: original file plus file written during checkpointing - Files.list(path2Java.getParent).count() should be (2) + + val expectedFileCount = spark.conf.get(MOSAIC_RASTER_USE_CHECKPOINT, MOSAIC_RASTER_USE_CHECKPOINT_DEFAULT) match { + case "true" => 2 + case _ => 1 + } + Files.list(path2Java.getParent).count() should be (expectedFileCount) Try(FileUtils.deleteRecursively(writeDir, keepRoot = false)) Files.createDirectories(writeDirJava) Files.list(Paths.get(writeDir)).count() should be (0)