-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GTC-3055 Added new GHG (green-house gas) analysis #252
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package org.globalforestwatch.features | ||
|
||
// GfwPro Feature that includes commodity and yield columns for GHG analysis. | ||
|
||
object GfwProFeatureExt extends Feature { | ||
val listIdPos = 0 | ||
val locationIdPos = 1 | ||
val geomPos = 2 | ||
val commodityPos = 3 | ||
val yieldPos = 4 | ||
|
||
val featureIdExpr = "list_id as listId, cast(location_id as int) as locationId, commodity as commodity, yield as yield" | ||
|
||
def getFeatureId(i: Array[String], parsed: Boolean = false): FeatureId = { | ||
|
||
val listId: String = i(0) | ||
val locationId: Int = i(1).toInt | ||
val commodity: String = i(2) | ||
val yieldVal: Float = i(3).toFloat | ||
|
||
GfwProFeatureExtId(listId, locationId, commodity, yieldVal) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
package org.globalforestwatch.features | ||
|
||
// GfwPro FeatureId that includes commodity and yield for GHG analysis. | ||
|
||
case class GfwProFeatureExtId(listId: String, locationId: Int, commodity: String, yieldVal: Float) extends FeatureId { | ||
override def toString: String = s"$listId, $locationId, $commodity, $yieldVal" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package org.globalforestwatch.layers | ||
|
||
import org.globalforestwatch.grids.GridTile | ||
|
||
/** Parameterized layer for all the various Mapspam commodity yield datasets. | ||
* 'commodity' should be one of the four-letter uppercase Mapspam commodity codes, | ||
* such as 'COCO' or 'OILP'. | ||
*/ | ||
case class MapspamYield(commodity: String, gridTile: GridTile, kwargs: Map[String, Any]) | ||
extends FloatLayer | ||
with OptionalFLayer { | ||
|
||
val datasetName = s"mapspam_yield_${commodity.toLowerCase()}" | ||
|
||
val uri: String = | ||
uriForGrid(gridTile, kwargs) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -102,32 +102,17 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable { | |
) | ||
} | ||
|
||
// Split features into those that completely contain the current window | ||
// and those that only partially contain it | ||
val windowGeom: Extent = windowLayout.mapTransform.keyToExtent(windowKey) | ||
val (fullWindowFeatures, partialWindowFeatures) = features.partition { | ||
feature => | ||
try { | ||
feature.geom.contains(windowGeom) | ||
} catch { | ||
case e: org.locationtech.jts.geom.TopologyException => | ||
// fallback if JTS can't do the intersection because of a wonky geometry, | ||
// just skip the optimization | ||
false | ||
} | ||
} | ||
|
||
def getSummaryForGeom(featureIds: List[FEATUREID], geom: Geometry): List[(FEATUREID, ValidatedSummary[SUMMARY])] = { | ||
def getSummaryForGeom(geom: Geometry, mykwargs: Map[String, Any]): ValidatedSummary[SUMMARY] = { | ||
val summary: Either[JobError, PolygonalSummaryResult[SUMMARY]] = | ||
maybeRaster.flatMap { raster => | ||
Either.catchNonFatal { | ||
runPolygonalSummary( | ||
raster, | ||
geom, | ||
ErrorSummaryRDD.rasterizeOptions, | ||
kwargs) | ||
mykwargs) | ||
}.left.map{ | ||
// TODO: these should be moved into left side of PolygonalSummaryResult in GT | ||
case NoYieldException(msg) => NoYieldError(msg) | ||
case ise: java.lang.IllegalStateException => | ||
GeometryError(s"IllegalStateException") | ||
case te: org.locationtech.jts.geom.TopologyException => | ||
|
@@ -140,31 +125,60 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable { | |
} | ||
// Converting to Validated so errors across partial results can be accumulated | ||
// @see https://typelevel.org/cats/datatypes/validated.html#validated-vs-either | ||
featureIds.map { id => (id, summary.toValidated) } | ||
summary.toValidated | ||
} | ||
|
||
// for partial windows, we need to calculate summary for each geometry, | ||
// since they all may have unique intersections with the window | ||
val partialWindowResults = partialWindowFeatures.flatMap { | ||
case feature => | ||
getSummaryForGeom(List(feature.data), feature.geom) | ||
} | ||
if (kwargs.get("includeFeatureId").isDefined) { | ||
// Include the featureId in the kwargs passed to the polygonalSummary | ||
// code. This is to handle the case where the featureId may include | ||
// one or more columns that are used by the analysis. In that case, | ||
// we can't do the optimization where we share fullWindow results | ||
// across features. | ||
val partialSummaries: Array[(FEATUREID, ValidatedSummary[SUMMARY])] = | ||
features.map { feature: Feature[Geometry, FEATUREID] => | ||
val id: FEATUREID = feature.data | ||
(id, getSummaryForGeom(feature.geom, kwargs + ("featureId" -> id))) | ||
} | ||
partialSummaries | ||
} else { | ||
// Split features into those that completely contain the current window | ||
// and those that only partially contain it | ||
val windowGeom: Extent = windowLayout.mapTransform.keyToExtent(windowKey) | ||
val (fullWindowFeatures, partialWindowFeatures) = features.partition { | ||
feature => | ||
try { | ||
feature.geom.contains(windowGeom) | ||
} catch { | ||
case e: org.locationtech.jts.geom.TopologyException => | ||
// fallback if JTS can't do the intersection because of a wonky geometry, | ||
// just skip the optimization | ||
false | ||
} | ||
} | ||
|
||
// if there are any full window intersections, we only need to calculate | ||
// the summary for the window, and then tie it to each feature ID | ||
val fullWindowIds = fullWindowFeatures.map { case feature => feature.data}.toList | ||
//if (fullWindowIds.size >= 2) { | ||
// println(s"Re-using results from same full tile ${windowKey} for featureIds ${fullWindowIds}") | ||
//} | ||
val fullWindowResults = | ||
if (fullWindowFeatures.nonEmpty) { | ||
getSummaryForGeom(fullWindowIds, windowGeom) | ||
} else { | ||
List.empty | ||
} | ||
// for partial windows, we need to calculate summary for each geometry, | ||
// since they all may have unique intersections with the window | ||
val partialWindowResults = partialWindowFeatures.map { | ||
case feature => | ||
(feature.data, getSummaryForGeom(feature.geom, kwargs)) | ||
} | ||
|
||
// combine results | ||
partialWindowResults ++ fullWindowResults | ||
// if there are any full window intersections, we only need to calculate | ||
// the summary for the window, and then tie it to each feature ID | ||
val fullWindowIds = fullWindowFeatures.map { case feature => feature.data}.toList | ||
//if (fullWindowIds.size >= 2) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it comment meant to be here still? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed it. Left over from a while ago. |
||
// println(s"Re-using results from same full tile ${windowKey} for featureIds ${fullWindowIds}") | ||
//} | ||
val fullWindowResults = | ||
if (fullWindowFeatures.nonEmpty) { | ||
fullWindowIds.map { id => (id, getSummaryForGeom(windowGeom, kwargs)) } | ||
} else { | ||
List.empty | ||
} | ||
|
||
// combine results | ||
partialWindowResults ++ fullWindowResults | ||
} | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ import org.globalforestwatch.summarystats.gladalerts.GladAlertsCommand.gladAlert | |
import org.globalforestwatch.summarystats.treecoverloss.TreeCoverLossCommand.treeCoverLossCommand | ||
import org.globalforestwatch.summarystats.integrated_alerts.IntegratedAlertsCommand.integratedAlertsCommand | ||
import org.globalforestwatch.summarystats.afi.AFiCommand.afiCommand | ||
import org.globalforestwatch.summarystats.ghg.GHGCommand.ghgCommand | ||
import com.monovore.decline._ | ||
|
||
object SummaryMain { | ||
|
@@ -25,16 +26,18 @@ object SummaryMain { | |
gladAlertsCommand orElse | ||
treeCoverLossCommand orElse | ||
integratedAlertsCommand orElse | ||
afiCommand | ||
afiCommand orElse | ||
ghgCommand | ||
} | ||
val command = Command(name, header, true)(main) | ||
|
||
final def main(args: Array[String]): Unit = { | ||
// Print out environment variables (for debugging purposes) | ||
val environmentVars = System.getenv().forEach { | ||
case (key, value) => println(s"$key = $value") | ||
// Print out environment variables (if needed for debugging) | ||
if (false) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this meant to be here still? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I left the code in there (with 'if (false)') in case sometime later we want to print out environment variables for debugging purposes. |
||
val environmentVars = System.getenv().forEach { | ||
case (key, value) => println(s"$key = $value") | ||
} | ||
} | ||
|
||
command.parse(args, sys.env) match { | ||
case Left(help) => | ||
System.err.println(help) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
package org.globalforestwatch.summarystats.ghg | ||
|
||
import cats.data.Validated.{Invalid, Valid} | ||
import cats.implicits._ | ||
|
||
import geotrellis.vector.{Feature, Geometry} | ||
import org.locationtech.jts.geom.Geometry | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.sql.SparkSession | ||
import org.globalforestwatch.summarystats.{Location, NoIntersectionError, SummaryAnalysis, ValidatedLocation} | ||
import org.apache.spark.storage.StorageLevel | ||
import org.globalforestwatch.ValidatedWorkflow | ||
|
||
object GHGAnalysis extends SummaryAnalysis { | ||
|
||
val name = "ghg" | ||
|
||
/** Greenhouse gas analysis of input features in a TSV file. The TSV file contains | ||
* the individual list items (location IDs >= 0) and optional merged ("dissolved") | ||
* list geometries (location id -1). | ||
* | ||
* This function assumes that all features have already been split by 1x1 degree | ||
* grid, so each location and merged list may have a single or multiple rows. | ||
*/ | ||
def apply( | ||
features: RDD[ValidatedLocation[Geometry]], | ||
kwargs: Map[String, Any] | ||
)(implicit spark: SparkSession): RDD[ValidatedLocation[GHGData]] = { | ||
features.persist(StorageLevel.MEMORY_AND_DISK) | ||
|
||
try { | ||
val partialResult: RDD[ValidatedLocation[GHGData]] = { | ||
ValidatedWorkflow(features) | ||
.flatMap { locationGeometries => | ||
val locationSummaries: RDD[ValidatedLocation[GHGSummary]] = { | ||
val tmp = locationGeometries.map { case Location(id, geom) => Feature(geom, id) } | ||
|
||
// This is where the main analysis happens, in ErrorSummaryRDD.apply(), | ||
// which eventually calls into GHGSummary via runPolygonalSummary(). | ||
GHGRDD(tmp, GHGGrid.blockTileGrid, kwargs) | ||
} | ||
|
||
// For all rows that didn't get an error from the GHG analysis, do the | ||
// transformation from GHGSummary to GHGData | ||
ValidatedWorkflow(locationSummaries).mapValid { summaries => | ||
summaries | ||
.mapValues { | ||
case summary: GHGSummary => | ||
val data = summary.toGHGData() | ||
data | ||
} | ||
} | ||
} | ||
.unify | ||
.persist(StorageLevel.MEMORY_AND_DISK) | ||
} | ||
|
||
// If a location has empty GHGData results, then the geometry | ||
// must not have intersected the centroid of any pixels, so report the location | ||
// as NoIntersectionError. | ||
partialResult.map { | ||
case Valid(Location(fid, data)) if data.equals(GHGData.empty) => | ||
Invalid(Location(fid, NoIntersectionError)) | ||
case data => data | ||
} | ||
} catch { | ||
case e: StackOverflowError => | ||
e.printStackTrace() | ||
throw e | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this would be messy, but if you wanted to keep the optimization and not split the logic, theoretically every tile should produce the same results per commodity unless the user specifies the yield manually. In which case, couldn't you just apply the yield constant to your summary result per feature after doing runPolygonalSummary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is analysis-independent code, so I would not want to put anything in here that is specific to yield/commodity/GHG, etc.
Since GHG specifies a commodity or yield, it must be done on specific farms with specific crops. So, it will not be called on very large areas (which would not all be one farm with one kind of crop). So, the full window optimization would never actually be applicable for GHG anyway.
So, I don't think it would be worth trying to get this optimization to work in a general way in the case the featureId is passed down, since it would never be used in the one case (GHG) that we have.