Skip to content

Commit

Permalink
disabled checkpointing by default, fixed RST_Write failure when check…
Browse files Browse the repository at this point in the history
…pointing disabled, added checkpointing explanation and instructions to documentation
  • Loading branch information
sllynn committed Oct 22, 2024
1 parent 412d44f commit ba042ae
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 26 deletions.
13 changes: 4 additions & 9 deletions docs/source/api/raster-functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,15 @@ e.g. :code:`spark.read.format("gdal")`
* The Mosaic raster tile schema changed in v0.4.1 to the following:
:code:`<tile:struct<index_id:bigint, tile:binary, metadata:map<string, string>>`. All APIs that use tiles now follow
this schema.
* The function :ref:`rst_maketiles` allows for the raster tile schema to hold either a path pointer (string)
or a byte array representation of the source raster. It also supports optional checkpointing for increased
performance during chains of raster operations.

Updates to the raster features for 0.4.1
----------------------------------------

* Scala does not have a :code:`df.display()` method while python does. In practice you would most often call
:code:`display(df)` in scala for a prettier output, but for brevity, we write :code:`df.show` in scala.
* Mosaic can write rasters from a DataFrame to a target directory in DBFS using the function :ref:`rst_write`

.. note:: For mosaic versions > 0.4.0 you can use the revamped setup_gdal function or new setup_fuse_install.
These functions will configure an init script in your preferred Workspace, Volume, or DBFS location to install GDAL
on your cluster. See :doc:`Install and Enable GDAL with Mosaic </usage/install-gdal>` for more details.

.. note:: For complex operations and / or working with large rasters, Mosaic offers the option option of employing checkpointing
to write intermediate results to disk. Follow the instructions in :doc:`Checkpointing </usage/raster-checkpointing>` to enable this feature.

Functions
#########

Expand Down
47 changes: 47 additions & 0 deletions docs/source/usage/raster-checkpointing.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
===================================
Checkpointing for raster operations
===================================

Mosaic offers the ability to checkpoint raster operations to disk. This is useful when working with large rasters and
complex operations that may require multiple stages of computation. Checkpointing can be used to save intermediate results
to the cloud object store, which can be loaded back into memory at a later stage.
This can help to reduce the amount of memory required to store intermediate results, and can also help to improve
performance by reducing the amount of data that needs to be transferred between nodes during wide transformations.

Checkpointing is enabled by setting the :code:`spark.databricks.labs.mosaic.raster.use.checkpoint` configuration to :code:`true`.
By default, checkpointing is disabled. When checkpointing is enabled, Mosaic will save intermediate results to the checkpoint directory
specified by the :code:`spark.databricks.labs.mosaic.raster.checkpoint` configuration.
The checkpoint directory must be a valid DBFS path (by default it is set to :code:`/dbfs/tmp/mosaic/raster/checkpoint`).

There are also a number of helper functions that can be used to manage checkpointing in Mosaic.

The simplest way to enable checkpointing is to specify a checkpoint directory when calling :code:`enable_gdal()` from the Python interface...

.. code-block:: python
import mosaic as mos
mos.enable_mosaic(spark, dbutils)
mos.enable_gdal(spark, checkpoint_dir="/dbfs/tmp/mosaic/raster/checkpoint")
... or directly from Scala using :code:`MosaicGDAL.enableGDALWithCheckpoint()`:

.. code-block:: scala
import com.databricks.labs.mosaic.functions.MosaicContext
import com.databricks.labs.mosaic.gdal.MosaicGDAL
import com.databricks.labs.mosaic.H3
import com.databricks.labs.mosaic.JTS
val mosaicContext = MosaicContext.build(H3, JTS)
import mosaicContext.functions._
MosaicGDAL.enableGDALWithCheckpoint(session, "/dbfs/tmp/mosaic/raster/checkpoint")
Checkpointing can be modified within the Python interface using the functions

* :code:`update_checkpoint_path(spark: SparkSession, path: str)`
* :code:`set_checkpoint_on(spark: SparkSession)`
* :code:`set_checkpoint_off(spark: SparkSession)`

1 change: 1 addition & 0 deletions docs/source/usage/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ Usage
quickstart
kepler
automatic-sql-registration
raster-checkpointing

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.gdal.osr
import org.gdal.osr.SpatialReference
import org.locationtech.proj4j.CRSFactory

import java.nio.file.{Files, Paths, StandardCopyOption}
import java.nio.file.{Files, Path, Paths, StandardCopyOption}
import java.util.{Locale, Vector => JVector}
import scala.collection.JavaConverters.dictionaryAsScalaMapConverter
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -602,7 +602,7 @@ case class MosaicRasterGDAL(
}
val byteArray = FileUtils.readBytes(readPath)
if (dispose) RasterCleaner.dispose(this)
if (readPath != PathUtils.getCleanPath(parentPath)) {
if (readPath != PathUtils.getCleanPath(parentPath) && PathUtils.isTmpLocation(readPath)) {
Files.deleteIfExists(Paths.get(readPath))
if (readPath.endsWith(".zip")) {
val nonZipPath = readPath.replace(".zip", "")
Expand Down Expand Up @@ -651,7 +651,10 @@ case class MosaicRasterGDAL(
} else {
val thisPath = Paths.get(this.path)
val fromDir = thisPath.getParent
val toDir = Paths.get(newPath).getParent
val toDir = Paths.get(newPath) match {
case p: Path if Files.isDirectory(p) => p
case p: Path => p.getParent()
}
val stemRegex = PathUtils.getStemRegex(this.path)
PathUtils.wildcardCopy(fromDir.toString, toDir.toString, stemRegex)
if (dispose) RasterCleaner.dispose(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ case class RST_Write(
// serialize data type
override def dataType: DataType = {
require(dirExpr.isInstanceOf[Literal])
RasterTileType(expressionConfig.getCellIdType, StringType, useCheckpoint = false)
RasterTileType(expressionConfig.getCellIdType, inputExpr, expressionConfig.isRasterUseCheckpoint)
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/databricks/labs/mosaic/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ package object mosaic {
val MOSAIC_RASTER_CHECKPOINT = "spark.databricks.labs.mosaic.raster.checkpoint"
val MOSAIC_RASTER_CHECKPOINT_DEFAULT = "/dbfs/tmp/mosaic/raster/checkpoint"
val MOSAIC_RASTER_USE_CHECKPOINT = "spark.databricks.labs.mosaic.raster.use.checkpoint"
val MOSAIC_RASTER_USE_CHECKPOINT_DEFAULT = "true"
val MOSAIC_RASTER_USE_CHECKPOINT_DEFAULT = "false"
val MOSAIC_RASTER_TMP_PREFIX = "spark.databricks.labs.mosaic.raster.tmp.prefix"
val MOSAIC_RASTER_TMP_PREFIX_DEFAULT = "/tmp"
val MOSAIC_RASTER_BLOCKSIZE = "spark.databricks.labs.mosaic.raster.blocksize"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import com.databricks.labs.mosaic.core.index.H3IndexSystem
import com.databricks.labs.mosaic.functions.MosaicContext
import com.databricks.labs.mosaic.test.MosaicSpatialQueryTest
import org.apache.spark.sql.test.SharedSparkSessionGDAL
import org.scalatest.Tag
import org.scalatest.matchers.must.Matchers.{be, noException}
import org.scalatest.matchers.should.Matchers.an

import java.nio.file.{Files, Paths}

object ExcludeLocalTag extends Tag("ExcludeIfLocal")

class RasterAsGridReaderTest extends MosaicSpatialQueryTest with SharedSparkSessionGDAL {

test("Read netcdf with Raster As Grid Reader") {
Expand All @@ -33,7 +36,7 @@ class RasterAsGridReaderTest extends MosaicSpatialQueryTest with SharedSparkSess

}

test("Read grib with Raster As Grid Reader") {
test("Read grib with Raster As Grid Reader", ExcludeLocalTag) {
assume(System.getProperty("os.name") == "Linux")
MosaicContext.build(H3IndexSystem, JTS)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ trait RST_DTMFromGeomsBehaviours extends SharedSparkSessionGDAL {
$"grid_size_x", $"grid_size_y", $"pixel_size_x", $"pixel_size_y"
).cache()
noException should be thrownBy result.collect()
result.select($"tile").show(truncate = false)
}

def multiRegionTriangulationRasterizeTest(indexSystem: IndexSystem, geometryAPI: GeometryAPI): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.databricks.labs.mosaic.expressions.raster

import com.databricks.labs.mosaic.{MOSAIC_RASTER_USE_CHECKPOINT, MOSAIC_RASTER_USE_CHECKPOINT_DEFAULT}
import com.databricks.labs.mosaic.core.geometry.api.GeometryAPI
import com.databricks.labs.mosaic.core.index.IndexSystem
import com.databricks.labs.mosaic.functions.MosaicContext
import com.databricks.labs.mosaic.utils.FileUtils
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.scalatest.matchers.should.Matchers.{be, convertToAnyShouldWrapper}

import java.nio.file.{Files, Paths}
Expand All @@ -25,7 +25,7 @@ trait RST_WriteBehaviors extends QueryTest {
mc.register(sc)
import mc.functions._

val writeDir = "/tmp/mosaic_tmp/write-tile"
val writeDir = "/mnt/mosaic_tmp/write-tile"
val writeDirJava = Paths.get(writeDir)
Try(FileUtils.deleteRecursively(writeDir, keepRoot = false))
Files.createDirectories(writeDirJava)
Expand All @@ -42,13 +42,12 @@ trait RST_WriteBehaviors extends QueryTest {
.filter(!rst_isempty($"tile"))
.select(rst_write($"tile", writeDir))
.first()
.asInstanceOf[GenericRowWithSchema].get(0)

val createInfo1 = gridTiles1.asInstanceOf[GenericRowWithSchema].getAs[Map[String, String]](2)
val createInfo1 = gridTiles1.getStruct(0).getAs[Map[String, String]](2)
val path1Java = Paths.get(createInfo1("path"))

Files.list(path1Java.getParent).count() should be (1)
Try(FileUtils.deleteRecursively(writeDir, keepRoot = false))
Files.list(path1Java.getParent).count() shouldBe 1
FileUtils.deleteRecursively(writeDir, keepRoot = false)
Files.createDirectories(writeDirJava)
Files.list(Paths.get(writeDir)).count() should be (0)

Expand All @@ -63,16 +62,19 @@ trait RST_WriteBehaviors extends QueryTest {
|)
|select rst_write(tile, '$writeDir') as result
|from subquery
|where not rst_isempty(tile)
|""".stripMargin)
.first()
.asInstanceOf[GenericRowWithSchema].get(0)

val createInfo2 = gridTilesSQL.asInstanceOf[GenericRowWithSchema].getAs[Map[String, String]](2)
val createInfo2 = gridTilesSQL.getStruct(0).getAs[Map[String, String]](2)
val path2Java = Paths.get(createInfo2("path"))

// should equal 2: original file plus file written during checkpointing
Files.list(path2Java.getParent).count() should be (2)

val expectedFileCount = spark.conf.get(MOSAIC_RASTER_USE_CHECKPOINT, MOSAIC_RASTER_USE_CHECKPOINT_DEFAULT) match {
case "true" => 2
case _ => 1
}
Files.list(path2Java.getParent).count() should be (expectedFileCount)
Try(FileUtils.deleteRecursively(writeDir, keepRoot = false))
Files.createDirectories(writeDirJava)
Files.list(Paths.get(writeDir)).count() should be (0)
Expand Down

0 comments on commit ba042ae

Please sign in to comment.