Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GTC-3055 Added new GHG (green-house gas) analysis #252

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/main/resources/raster-catalog-pro.json
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,38 @@
{
"name":"col_frontera_agricola",
"source_uri":"s3://gfw-data-lake/col_frontera_agricola/v2024/raster/epsg-4326/{grid_size}/{row_count}/category/gdal-geotiff/{tile_id}.tif"
},
{
"name":"gfw_forest_flux_full_extent_gross_emissions_co2_only_biomass_soil",
"source_uri": "s3://gfw-data-lake/gfw_forest_flux_full_extent_gross_emissions_co2_only_biomass_soil/v20240402/raster/epsg-4326/{grid_size}/{row_count}/Mg_CO2_ha-1/geotiff/{tile_id}.tif"
},
{
"name":"gfw_forest_flux_full_extent_gross_emissions_non_co2_biomass_soil",
"source_uri": "s3://gfw-data-lake/gfw_forest_flux_full_extent_gross_emissions_non_co2_biomass_soil/v20240402/raster/epsg-4326/{grid_size}/{row_count}/Mg_CO2e_ha-1/geotiff/{tile_id}.tif"
},
{
"name":"mapspam_yield_coco",
"source_uri":"s3://gfw-data-lake/mapspam_yield_coco/v2020/raster/epsg-4326/{grid_size}/{row_count}/yield/gdal-geotiff/{tile_id}.tif"
},
{
"name":"mapspam_yield_coff",
"source_uri":"s3://gfw-data-lake/mapspam_yield_coff/v2020/raster/epsg-4326/{grid_size}/{row_count}/yield/gdal-geotiff/{tile_id}.tif"
},
{
"name":"mapspam_yield_oilp",
"source_uri":"s3://gfw-data-lake/mapspam_yield_oilp/v2020/raster/epsg-4326/{grid_size}/{row_count}/yield/gdal-geotiff/{tile_id}.tif"
},
{
"name":"mapspam_yield_rubb",
"source_uri":"s3://gfw-data-lake/mapspam_yield_rubb/v2020/raster/epsg-4326/{grid_size}/{row_count}/yield/gdal-geotiff/{tile_id}.tif"
},
{
"name":"mapspam_yield_soyb",
"source_uri":"s3://gfw-data-lake/mapspam_yield_soyb/v2020/raster/epsg-4326/{grid_size}/{row_count}/yield/gdal-geotiff/{tile_id}.tif"
},
{
"name":"mapspam_yield_sugc",
"source_uri":"s3://gfw-data-lake/mapspam_yield_sugc/v2020/raster/epsg-4326/{grid_size}/{row_count}/yield/gdal-geotiff/{tile_id}.tif"
}
]
}
5 changes: 3 additions & 2 deletions src/main/scala/org/globalforestwatch/features/Feature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ object Feature {
case "modis" => FireAlertModisFeature
case "burned_areas" => BurnedAreasFeature
case "gfwpro" => GfwProFeature
case "gfwpro_ext" => GfwProFeatureExt
case value =>
throw new IllegalArgumentException(
s"FeatureType must be one of 'gadm', 'wdpa', 'geostore', 'gfwpro', 'feature', 'viirs', 'modis', or 'burned_areas'. Got $value."
s"FeatureType must be one of 'gadm', 'wdpa', 'geostore', 'gfwpro', 'gfwpro_ext', 'feature', 'viirs', 'modis', or 'burned_areas'. Got $value."
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ object FeatureId {
case "wdpa" => WdpaFeature.getFeatureId(values, true)
case "geostore" => GeostoreFeature.getFeatureId(values, true)
case "gfwpro" => GfwProFeature.getFeatureId(values, true)
case "gfwpro_ext" => GfwProFeatureExt.getFeatureId(values, true)
case "burned_areas" => BurnedAreasFeature.getFeatureId(values, true)
case "viirs" => FireAlertViirsFeature.getFeatureId(values, true)
case "modis" => FireAlertModisFeature.getFeatureId(values, true)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.globalforestwatch.features

// GfwPro Feature that includes commodity and yield columns for GHG analysis.

object GfwProFeatureExt extends Feature {
val listIdPos = 0
val locationIdPos = 1
val geomPos = 2
val commodityPos = 3
val yieldPos = 4

val featureIdExpr = "list_id as listId, cast(location_id as int) as locationId, commodity as commodity, yield as yield"

def getFeatureId(i: Array[String], parsed: Boolean = false): FeatureId = {

val listId: String = i(0)
val locationId: Int = i(1).toInt
val commodity: String = i(2)
val yieldVal: Float = i(3).toFloat

GfwProFeatureExtId(listId, locationId, commodity, yieldVal)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.globalforestwatch.features

// GfwPro FeatureId that includes commodity and yield for GHG analysis.

case class GfwProFeatureExtId(listId: String, locationId: Int, commodity: String, yieldVal: Float) extends FeatureId {
override def toString: String = s"$listId, $locationId, $commodity, $yieldVal"
}
17 changes: 17 additions & 0 deletions src/main/scala/org/globalforestwatch/layers/MapspamYield.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.globalforestwatch.layers

import org.globalforestwatch.grids.GridTile

/** Parameterized layer for all the various Mapspam commodity yield datasets.
* 'commodity' should be one of the four-letter uppercase Mapspam commodity codes,
* such as 'COCO' or 'OILP'.
*/
case class MapspamYield(commodity: String, gridTile: GridTile, kwargs: Map[String, Any])
extends FloatLayer
with OptionalFLayer {

val datasetName = s"mapspam_yield_${commodity.toLowerCase()}"

val uri: String =
uriForGrid(gridTile, kwargs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,32 +102,17 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable {
)
}

// Split features into those that completely contain the current window
// and those that only partially contain it
val windowGeom: Extent = windowLayout.mapTransform.keyToExtent(windowKey)
val (fullWindowFeatures, partialWindowFeatures) = features.partition {
feature =>
try {
feature.geom.contains(windowGeom)
} catch {
case e: org.locationtech.jts.geom.TopologyException =>
// fallback if JTS can't do the intersection because of a wonky geometry,
// just skip the optimization
false
}
}

def getSummaryForGeom(featureIds: List[FEATUREID], geom: Geometry): List[(FEATUREID, ValidatedSummary[SUMMARY])] = {
def getSummaryForGeom(geom: Geometry, mykwargs: Map[String, Any]): ValidatedSummary[SUMMARY] = {
val summary: Either[JobError, PolygonalSummaryResult[SUMMARY]] =
maybeRaster.flatMap { raster =>
Either.catchNonFatal {
runPolygonalSummary(
raster,
geom,
ErrorSummaryRDD.rasterizeOptions,
kwargs)
mykwargs)
}.left.map{
// TODO: these should be moved into left side of PolygonalSummaryResult in GT
case NoYieldException(msg) => NoYieldError(msg)
case ise: java.lang.IllegalStateException =>
GeometryError(s"IllegalStateException")
case te: org.locationtech.jts.geom.TopologyException =>
Expand All @@ -140,31 +125,57 @@ trait ErrorSummaryRDD extends LazyLogging with java.io.Serializable {
}
// Converting to Validated so errors across partial results can be accumulated
// @see https://typelevel.org/cats/datatypes/validated.html#validated-vs-either
featureIds.map { id => (id, summary.toValidated) }
summary.toValidated
}

// for partial windows, we need to calculate summary for each geometry,
// since they all may have unique intersections with the window
val partialWindowResults = partialWindowFeatures.flatMap {
case feature =>
getSummaryForGeom(List(feature.data), feature.geom)
}
if (kwargs.get("includeFeatureId").isDefined) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this would be messy, but if you wanted to keep the optimization and not split the logic, theoretically every tile should produce the same results per commodity unless the user specifies the yield manually. In which case, couldn't you just apply the yield constant to your summary result per feature after doing runPolygonalSummary?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is analysis-independent code, so I would not want to put anything in here that is specific to yield/commodity/GHG, etc.

Since GHG specifies a commodity or yield, it must be done on specific farms with specific crops. So, it will not be called on very large areas (which would not all be one farm with one kind of crop). So, the full window optimization would never actually be applicable for GHG anyway.

So, I don't think it would be worth trying to get this optimization to work in a general way in the case the featureId is passed down, since it would never be used in the one case (GHG) that we have.

// Include the featureId in the kwargs passed to the polygonalSummary
// code. This is to handle the case where the featureId may include
// one or more columns that are used by the analysis. In that case,
// we can't do the optimization where we share fullWindow results
// across features.
val partialSummaries: Array[(FEATUREID, ValidatedSummary[SUMMARY])] =
features.map { feature: Feature[Geometry, FEATUREID] =>
val id: FEATUREID = feature.data
(id, getSummaryForGeom(feature.geom, kwargs + ("featureId" -> id)))
}
partialSummaries
} else {
// Split features into those that completely contain the current window
// and those that only partially contain it
val windowGeom: Extent = windowLayout.mapTransform.keyToExtent(windowKey)
val (fullWindowFeatures, partialWindowFeatures) = features.partition {
feature =>
try {
feature.geom.contains(windowGeom)
} catch {
case e: org.locationtech.jts.geom.TopologyException =>
// fallback if JTS can't do the intersection because of a wonky geometry,
// just skip the optimization
false
}
}

// if there are any full window intersections, we only need to calculate
// the summary for the window, and then tie it to each feature ID
val fullWindowIds = fullWindowFeatures.map { case feature => feature.data}.toList
//if (fullWindowIds.size >= 2) {
// println(s"Re-using results from same full tile ${windowKey} for featureIds ${fullWindowIds}")
//}
val fullWindowResults =
if (fullWindowFeatures.nonEmpty) {
getSummaryForGeom(fullWindowIds, windowGeom)
} else {
List.empty
}
// for partial windows, we need to calculate summary for each geometry,
// since they all may have unique intersections with the window
val partialWindowResults = partialWindowFeatures.map {
case feature =>
(feature.data, getSummaryForGeom(feature.geom, kwargs))
}

// combine results
partialWindowResults ++ fullWindowResults
// if there are any full window intersections, we only need to calculate
// the summary for the window, and then tie it to each feature ID
val fullWindowIds = fullWindowFeatures.map { case feature => feature.data}.toList
val fullWindowResults =
if (fullWindowFeatures.nonEmpty) {
fullWindowIds.map { id => (id, getSummaryForGeom(windowGeom, kwargs)) }
} else {
List.empty
}

// combine results
partialWindowResults ++ fullWindowResults
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,16 @@ trait JobError

case class RasterReadError(msg: String) extends JobError
case class GeometryError(msg: String) extends JobError

/* Error indicating that the location did not intersect the centroid of any
* raster pixels, hence there are no results. */
case object NoIntersectionError extends JobError

/* Error and exception indicating that no yield could be determined for the specified
* location for use in GHG analysis */
case class NoYieldError(msg: String) extends JobError
case class NoYieldException(msg: String) extends Exception(msg)

case class MultiError(errors: Set[String]) extends JobError {
def addError(err: JobError): MultiError = MultiError(errors + err.toString)
def addError(other: MultiError): MultiError = MultiError(errors ++ other.errors)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.globalforestwatch.summarystats.gladalerts.GladAlertsCommand.gladAlert
import org.globalforestwatch.summarystats.treecoverloss.TreeCoverLossCommand.treeCoverLossCommand
import org.globalforestwatch.summarystats.integrated_alerts.IntegratedAlertsCommand.integratedAlertsCommand
import org.globalforestwatch.summarystats.afi.AFiCommand.afiCommand
import org.globalforestwatch.summarystats.ghg.GHGCommand.ghgCommand
import com.monovore.decline._

object SummaryMain {
Expand All @@ -25,16 +26,18 @@ object SummaryMain {
gladAlertsCommand orElse
treeCoverLossCommand orElse
integratedAlertsCommand orElse
afiCommand
afiCommand orElse
ghgCommand
}
val command = Command(name, header, true)(main)

final def main(args: Array[String]): Unit = {
// Print out environment variables (for debugging purposes)
val environmentVars = System.getenv().forEach {
case (key, value) => println(s"$key = $value")
// Print out environment variables (if needed for debugging)
if (false) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this meant to be here still?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I left the code in there (with 'if (false)') in case sometime later we want to print out environment variables for debugging purposes.

val environmentVars = System.getenv().forEach {
case (key, value) => println(s"$key = $value")
}
}

command.parse(args, sys.env) match {
case Left(help) =>
System.err.println(help)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.globalforestwatch.summarystats.ghg

import cats.data.Validated.{Invalid, Valid}
import cats.implicits._

import geotrellis.vector.{Feature, Geometry}
import org.locationtech.jts.geom.Geometry
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.globalforestwatch.summarystats.{Location, NoIntersectionError, SummaryAnalysis, ValidatedLocation}
import org.apache.spark.storage.StorageLevel
import org.globalforestwatch.ValidatedWorkflow

object GHGAnalysis extends SummaryAnalysis {

val name = "ghg"

/** Greenhouse gas analysis of input features in a TSV file. The TSV file contains
* the individual list items (location IDs >= 0) and optional merged ("dissolved")
* list geometries (location id -1).
*
* This function assumes that all features have already been split by 1x1 degree
* grid, so each location and merged list may have a single or multiple rows.
*/
def apply(
features: RDD[ValidatedLocation[Geometry]],
kwargs: Map[String, Any]
)(implicit spark: SparkSession): RDD[ValidatedLocation[GHGData]] = {
features.persist(StorageLevel.MEMORY_AND_DISK)

try {
val partialResult: RDD[ValidatedLocation[GHGData]] = {
ValidatedWorkflow(features)
.flatMap { locationGeometries =>
val locationSummaries: RDD[ValidatedLocation[GHGSummary]] = {
val tmp = locationGeometries.map { case Location(id, geom) => Feature(geom, id) }

// This is where the main analysis happens, in ErrorSummaryRDD.apply(),
// which eventually calls into GHGSummary via runPolygonalSummary().
GHGRDD(tmp, GHGGrid.blockTileGrid, kwargs)
}

// For all rows that didn't get an error from the GHG analysis, do the
// transformation from GHGSummary to GHGData
ValidatedWorkflow(locationSummaries).mapValid { summaries =>
summaries
.mapValues {
case summary: GHGSummary =>
val data = summary.toGHGData()
data
}
}
}
.unify
.persist(StorageLevel.MEMORY_AND_DISK)
}

// If a location has empty GHGData results, then the geometry
// must not have intersected the centroid of any pixels, so report the location
// as NoIntersectionError.
partialResult.map {
case Valid(Location(fid, data)) if data.equals(GHGData.empty) =>
Invalid(Location(fid, NoIntersectionError))
case data => data
}
} catch {
case e: StackOverflowError =>
e.printStackTrace()
throw e
}
}
}
Loading
Loading