Skip to content

Commit

Permalink
Merge pull request #252 from wri/gtc-3055
Browse files Browse the repository at this point in the history
GTC-3055 Added new GHG (green-house gas) analysis
  • Loading branch information
danscales authored Jan 27, 2025
2 parents 2315a4b + 1656df0 commit 9af31db
Show file tree
Hide file tree
Showing 29 changed files with 1,096 additions and 46 deletions.
32 changes: 32 additions & 0 deletions src/main/resources/raster-catalog-pro.json
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,38 @@
{
"name":"col_frontera_agricola",
"source_uri":"s3://gfw-data-lake/col_frontera_agricola/v2024/raster/epsg-4326/{grid_size}/{row_count}/category/gdal-geotiff/{tile_id}.tif"
},
{
"name":"gfw_forest_flux_full_extent_gross_emissions_co2_only_biomass_soil",
"source_uri": "s3://gfw-data-lake/gfw_forest_flux_full_extent_gross_emissions_co2_only_biomass_soil/v20240402/raster/epsg-4326/{grid_size}/{row_count}/Mg_CO2_ha-1/geotiff/{tile_id}.tif"
},
{
"name":"gfw_forest_flux_full_extent_gross_emissions_non_co2_biomass_soil",
"source_uri": "s3://gfw-data-lake/gfw_forest_flux_full_extent_gross_emissions_non_co2_biomass_soil/v20240402/raster/epsg-4326/{grid_size}/{row_count}/Mg_CO2e_ha-1/geotiff/{tile_id}.tif"
},
{
"name":"mapspam_yield_coco",
"source_uri":"s3://gfw-data-lake/mapspam_yield_coco/v2020/raster/epsg-4326/{grid_size}/{row_count}/yield/gdal-geotiff/{tile_id}.tif"
},
{
"name":"mapspam_yield_coff",
"source_uri":"s3://gfw-data-lake/mapspam_yield_coff/v2020/raster/epsg-4326/{grid_size}/{row_count}/yield/gdal-geotiff/{tile_id}.tif"
},
{
"name":"mapspam_yield_oilp",
"source_uri":"s3://gfw-data-lake/mapspam_yield_oilp/v2020/raster/epsg-4326/{grid_size}/{row_count}/yield/gdal-geotiff/{tile_id}.tif"
},
{
"name":"mapspam_yield_rubb",
"source_uri":"s3://gfw-data-lake/mapspam_yield_rubb/v2020/raster/epsg-4326/{grid_size}/{row_count}/yield/gdal-geotiff/{tile_id}.tif"
},
{
"name":"mapspam_yield_soyb",
"source_uri":"s3://gfw-data-lake/mapspam_yield_soyb/v2020/raster/epsg-4326/{grid_size}/{row_count}/yield/gdal-geotiff/{tile_id}.tif"
},
{
"name":"mapspam_yield_sugc",
"source_uri":"s3://gfw-data-lake/mapspam_yield_sugc/v2020/raster/epsg-4326/{grid_size}/{row_count}/yield/gdal-geotiff/{tile_id}.tif"
}
]
}
5 changes: 3 additions & 2 deletions src/main/scala/org/globalforestwatch/features/Feature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ object Feature {
case "modis" => FireAlertModisFeature
case "burned_areas" => BurnedAreasFeature
case "gfwpro" => GfwProFeature
case "gfwpro_ext" => GfwProFeatureExt
case value =>
throw new IllegalArgumentException(
s"FeatureType must be one of 'gadm', 'wdpa', 'geostore', 'gfwpro', 'feature', 'viirs', 'modis', or 'burned_areas'. Got $value."
s"FeatureType must be one of 'gadm', 'wdpa', 'geostore', 'gfwpro', 'gfwpro_ext', 'feature', 'viirs', 'modis', or 'burned_areas'. Got $value."
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ object FeatureId {
case "wdpa" => WdpaFeature.getFeatureId(values, true)
case "geostore" => GeostoreFeature.getFeatureId(values, true)
case "gfwpro" => GfwProFeature.getFeatureId(values, true)
case "gfwpro_ext" => GfwProFeatureExt.getFeatureId(values, true)
case "burned_areas" => BurnedAreasFeature.getFeatureId(values, true)
case "viirs" => FireAlertViirsFeature.getFeatureId(values, true)
case "modis" => FireAlertModisFeature.getFeatureId(values, true)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.globalforestwatch.features

// GfwPro Feature that includes commodity and yield columns for GHG analysis.

object GfwProFeatureExt extends Feature {
val listIdPos = 0
val locationIdPos = 1
val geomPos = 2
val commodityPos = 3
val yieldPos = 4

val featureIdExpr = "list_id as listId, cast(location_id as int) as locationId, commodity as commodity, yield as yield"

def getFeatureId(i: Array[String], parsed: Boolean = false): FeatureId = {

val listId: String = i(0)
val locationId: Int = i(1).toInt
val commodity: String = i(2)
val yieldVal: Float = i(3).toFloat

GfwProFeatureExtId(listId, locationId, commodity, yieldVal)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.globalforestwatch.features

// GfwPro FeatureId that includes commodity and yield for GHG analysis.

case class GfwProFeatureExtId(listId: String, locationId: Int, commodity: String, yieldVal: Float) extends FeatureId {
override def toString: String = s"$listId, $locationId, $commodity, $yieldVal"
}
17 changes: 17 additions & 0 deletions src/main/scala/org/globalforestwatch/layers/MapspamYield.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.globalforestwatch.layers

import org.globalforestwatch.grids.GridTile

/** Parameterized layer for all the various Mapspam commodity yield datasets.
* 'commodity' should be one of the four-letter uppercase Mapspam commodity codes,
* such as 'COCO' or 'OILP'.
*/
case class MapspamYield(commodity: String, gridTile: GridTile, kwargs: Map[String, Any])
extends FloatLayer
with OptionalFLayer {

val datasetName = s"mapspam_yield_${commodity.toLowerCase()}"

val uri: String =
uriForGrid(gridTile, kwargs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,32 +102,17 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable {
)
}

// Split features into those that completely contain the current window
// and those that only partially contain it
val windowGeom: Extent = windowLayout.mapTransform.keyToExtent(windowKey)
val (fullWindowFeatures, partialWindowFeatures) = features.partition {
feature =>
try {
feature.geom.contains(windowGeom)
} catch {
case e: org.locationtech.jts.geom.TopologyException =>
// fallback if JTS can't do the intersection because of a wonky geometry,
// just skip the optimization
false
}
}

def getSummaryForGeom(featureIds: List[FEATUREID], geom: Geometry): List[(FEATUREID, ValidatedSummary[SUMMARY])] = {
def getSummaryForGeom(geom: Geometry, mykwargs: Map[String, Any]): ValidatedSummary[SUMMARY] = {
val summary: Either[JobError, PolygonalSummaryResult[SUMMARY]] =
maybeRaster.flatMap { raster =>
Either.catchNonFatal {
runPolygonalSummary(
raster,
geom,
ErrorSummaryRDD.rasterizeOptions,
kwargs)
mykwargs)
}.left.map{
// TODO: these should be moved into left side of PolygonalSummaryResult in GT
case NoYieldException(msg) => NoYieldError(msg)
case ise: java.lang.IllegalStateException =>
GeometryError(s"IllegalStateException")
case te: org.locationtech.jts.geom.TopologyException =>
Expand All @@ -140,31 +125,57 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable {
}
// Converting to Validated so errors across partial results can be accumulated
// @see https://typelevel.org/cats/datatypes/validated.html#validated-vs-either
featureIds.map { id => (id, summary.toValidated) }
summary.toValidated
}

// for partial windows, we need to calculate summary for each geometry,
// since they all may have unique intersections with the window
val partialWindowResults = partialWindowFeatures.flatMap {
case feature =>
getSummaryForGeom(List(feature.data), feature.geom)
}
if (kwargs.get("includeFeatureId").isDefined) {
// Include the featureId in the kwargs passed to the polygonalSummary
// code. This is to handle the case where the featureId may include
// one or more columns that are used by the analysis. In that case,
// we can't do the optimization where we share fullWindow results
// across features.
val partialSummaries: Array[(FEATUREID, ValidatedSummary[SUMMARY])] =
features.map { feature: Feature[Geometry, FEATUREID] =>
val id: FEATUREID = feature.data
(id, getSummaryForGeom(feature.geom, kwargs + ("featureId" -> id)))
}
partialSummaries
} else {
// Split features into those that completely contain the current window
// and those that only partially contain it
val windowGeom: Extent = windowLayout.mapTransform.keyToExtent(windowKey)
val (fullWindowFeatures, partialWindowFeatures) = features.partition {
feature =>
try {
feature.geom.contains(windowGeom)
} catch {
case e: org.locationtech.jts.geom.TopologyException =>
// fallback if JTS can't do the intersection because of a wonky geometry,
// just skip the optimization
false
}
}

// if there are any full window intersections, we only need to calculate
// the summary for the window, and then tie it to each feature ID
val fullWindowIds = fullWindowFeatures.map { case feature => feature.data}.toList
//if (fullWindowIds.size >= 2) {
// println(s"Re-using results from same full tile ${windowKey} for featureIds ${fullWindowIds}")
//}
val fullWindowResults =
if (fullWindowFeatures.nonEmpty) {
getSummaryForGeom(fullWindowIds, windowGeom)
} else {
List.empty
}
// for partial windows, we need to calculate summary for each geometry,
// since they all may have unique intersections with the window
val partialWindowResults = partialWindowFeatures.map {
case feature =>
(feature.data, getSummaryForGeom(feature.geom, kwargs))
}

// combine results
partialWindowResults ++ fullWindowResults
// if there are any full window intersections, we only need to calculate
// the summary for the window, and then tie it to each feature ID
val fullWindowIds = fullWindowFeatures.map { case feature => feature.data}.toList
val fullWindowResults =
if (fullWindowFeatures.nonEmpty) {
fullWindowIds.map { id => (id, getSummaryForGeom(windowGeom, kwargs)) }
} else {
List.empty
}

// combine results
partialWindowResults ++ fullWindowResults
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,16 @@ trait JobError

case class RasterReadError(msg: String) extends JobError
case class GeometryError(msg: String) extends JobError

/* Error indicating that the location did not intersect the centroid of any
* raster pixels, hence there are no results. */
case object NoIntersectionError extends JobError

/* Error and exception indicating that no yield could be determined for the specified
* location for use in GHG analysis */
case class NoYieldError(msg: String) extends JobError
case class NoYieldException(msg: String) extends Exception(msg)

case class MultiError(errors: Set[String]) extends JobError {
def addError(err: JobError): MultiError = MultiError(errors + err.toString)
def addError(other: MultiError): MultiError = MultiError(errors ++ other.errors)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.globalforestwatch.summarystats.gladalerts.GladAlertsCommand.gladAlert
import org.globalforestwatch.summarystats.treecoverloss.TreeCoverLossCommand.treeCoverLossCommand
import org.globalforestwatch.summarystats.integrated_alerts.IntegratedAlertsCommand.integratedAlertsCommand
import org.globalforestwatch.summarystats.afi.AFiCommand.afiCommand
import org.globalforestwatch.summarystats.ghg.GHGCommand.ghgCommand
import com.monovore.decline._

object SummaryMain {
Expand All @@ -25,16 +26,18 @@ object SummaryMain {
gladAlertsCommand orElse
treeCoverLossCommand orElse
integratedAlertsCommand orElse
afiCommand
afiCommand orElse
ghgCommand
}
val command = Command(name, header, true)(main)

final def main(args: Array[String]): Unit = {
// Print out environment variables (for debugging purposes)
val environmentVars = System.getenv().forEach {
case (key, value) => println(s"$key = $value")
// Print out environment variables (if needed for debugging)
if (false) {
val environmentVars = System.getenv().forEach {
case (key, value) => println(s"$key = $value")
}
}

command.parse(args, sys.env) match {
case Left(help) =>
System.err.println(help)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.globalforestwatch.summarystats.ghg

import cats.data.Validated.{Invalid, Valid}
import cats.implicits._

import geotrellis.vector.{Feature, Geometry}
import org.locationtech.jts.geom.Geometry
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.globalforestwatch.summarystats.{Location, NoIntersectionError, SummaryAnalysis, ValidatedLocation}
import org.apache.spark.storage.StorageLevel
import org.globalforestwatch.ValidatedWorkflow

object GHGAnalysis extends SummaryAnalysis {

val name = "ghg"

/** Greenhouse gas analysis of input features in a TSV file. The TSV file contains
* the individual list items (location IDs >= 0) and optional merged ("dissolved")
* list geometries (location id -1).
*
* This function assumes that all features have already been split by 1x1 degree
* grid, so each location and merged list may have a single or multiple rows.
*/
def apply(
features: RDD[ValidatedLocation[Geometry]],
kwargs: Map[String, Any]
)(implicit spark: SparkSession): RDD[ValidatedLocation[GHGData]] = {
features.persist(StorageLevel.MEMORY_AND_DISK)

try {
val partialResult: RDD[ValidatedLocation[GHGData]] = {
ValidatedWorkflow(features)
.flatMap { locationGeometries =>
val locationSummaries: RDD[ValidatedLocation[GHGSummary]] = {
val tmp = locationGeometries.map { case Location(id, geom) => Feature(geom, id) }

// This is where the main analysis happens, in ErrorSummaryRDD.apply(),
// which eventually calls into GHGSummary via runPolygonalSummary().
GHGRDD(tmp, GHGGrid.blockTileGrid, kwargs)
}

// For all rows that didn't get an error from the GHG analysis, do the
// transformation from GHGSummary to GHGData
ValidatedWorkflow(locationSummaries).mapValid { summaries =>
summaries
.mapValues {
case summary: GHGSummary =>
val data = summary.toGHGData()
data
}
}
}
.unify
.persist(StorageLevel.MEMORY_AND_DISK)
}

// If a location has empty GHGData results, then the geometry
// must not have intersected the centroid of any pixels, so report the location
// as NoIntersectionError.
partialResult.map {
case Valid(Location(fid, data)) if data.equals(GHGData.empty) =>
Invalid(Location(fid, NoIntersectionError))
case data => data
}
} catch {
case e: StackOverflowError =>
e.printStackTrace()
throw e
}
}
}
Loading

0 comments on commit 9af31db

Please sign in to comment.